Commit 5a11b366 authored by Sebastien Petitdemange's avatar Sebastien Petitdemange
Browse files

libdeep: removed

parent f19a1ca0
"""Handle communication with any Deep or IcePAP device"""
__all__ = ['device', 'utils', 'cmd', 'specwriter']
#__all__ = ['device', 'utils', 'cmd', 'specwriter', 'xnap']
#from deep.utils import *
#from deep.device import *
#from deep.cmd import *
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import string
from inspect import getmembers, ismethod
from deep.device import *
__all__ = ["DeepCmd"]
"""A generic class to build line-oriented command interpreters.
Interpreters constructed with this class obey the following conventions:
1. End of file on input is processed as the command 'EOF'.
2. A command is parsed out of each line by collecting the prefix composed
of characters in the identchars member.
3. A command `foo' is dispatched to a method 'do_foo()'; the do_ method
is passed a single argument consisting of the remainder of the line.
4. Typing an empty line repeats the last command. (Actually, it calls the
method `emptyline', which may be overridden in a subclass.)
5. There is a predefined `help' method. Given an argument `topic', it
calls the command `help_topic'. With no arguments, it lists all topics
with defined help_ functions, broken into up to three topics; documented
commands, miscellaneous help topics, and undocumented commands.
6. The command '?' is a synonym for `help'. The command '!' is a synonym
for `shell', if a do_shell method exists.
7. If completion is enabled, completing commands will be done automatically,
and completing of commands args is done by calling complete_foo() with
arguments text, line, begidx, endidx. text is string we are matching
against, all returned matches must begin with it. line is the current
input line (lstripped), begidx and endidx are the beginning and end
indexes of the text being matched, which could be used to provide
different completion depending upon which position the argument is in.
The `default' method may be overridden to intercept commands for which there
is no do_ method.
The `completedefault' method may be overridden to intercept completions for
commands that have no complete_ method.
The data member `self.ruler' sets the character used to draw separator lines
in the help messages. If empty, no ruler line is drawn. It defaults to "=".
If the value of `self.intro' is nonempty when the cmdloop method is called,
it is printed out on interpreter startup. This value may be overridden
via an optional argument to the cmdloop() method.
The data members `self.doc_header', `self.misc_header', and
`self.undoc_header' set the headers used for the help function's
listings of documented functions, miscellaneous topics, and undocumented
functions respectively.
These interpreters use raw_input; thus, if the readline module is loaded,
they automatically support Emacs-like command history and editing features.
"""
PROMPT = '(DeepCmd) '
IDENTCHARS = string.ascii_letters + string.digits + '_*'
class DeepCmd():
"""A simple framework for writing line-oriented command interpreters.
These are often useful for test harnesses, administrative tools, and
prototypes that will later be wrapped in a more sophisticated interface.
A Cmd instance or subclass instance is a line-oriented interpreter
framework. There is no good reason to instantiate Cmd itself; rather,
it's useful as a superclass of an interpreter class you define yourself
in order to inherit Cmd's methods and encapsulate action methods.
"""
prompt = PROMPT
identchars = IDENTCHARS
ruler = '='
lastcmd = ''
intro = None
doc_leader = ""
doc_header = "Documented commands (type .help <topic>):"
misc_header = "Miscellaneous help topics:"
undoc_header = "Undocumented commands:"
nohelp = "*** No help on %s"
use_rawinput = 1
def __init__(self, device, completekey='tab', stdin=None, stdout=None):
"""Instantiate aline-oriented interpreter framework for Deep devices.
The optional argument 'completekey' is the readline name of a
completion key; it defaults to the Tab key. If completekey is
not None and the readline module is available, command completion
is done automatically. The optional arguments stdin and stdout
specify alternate input and output file objects; if not specified,
sys.stdin and sys.stdout are used.
"""
import sys
if stdin is not None:
self.stdin = stdin
else:
self.stdin = sys.stdin
if stdout is not None:
self.stdout = stdout
else:
self.stdout = sys.stdout
self.cmdqueue = []
self.completekey = completekey
if not isinstance(device, DeepDevice):
raise Exception
else:
self.device = device
self.devcomm = device.commands
self.mycomm = self.extract_mynames()
def precmd(self, line):
"""Hook method executed just before the command line is
interpreted, but after the input prompt is generated and issued.
"""
return line
def postcmd(self, stop, line):
"""Hook method executed just after a command dispatch is finished."""
if line != "":
self.cmd_n += 1
self.buildprompt()
return stop
def preloop(self):
"""Hook method executed once when the cmdloop() method is called."""
self.cmd_n = 1
self.buildprompt()
def postloop(self):
"""Hook method executed once when the cmdloop() method is about to
return.
"""
pass
def buildprompt(self):
"""Builds the complete prompt"""
self.prompt = "\n" + str(self.cmd_n) + "." + self.newprompt()
def newprompt(self):
"""Returns the current prompt string"""
return "deep >> "
def processdevcmd(self, cmd):
""""..."""
i = len(cmd) if not " " in cmd else str.index(cmd, " ")
cmd = cmd[:i].upper() + cmd[i:]
try:
print "===>", cmd
answer = self.device.ackcommand(cmd)
print answer
except DeviceError, errmsg:
print "Error:", errmsg
except:
raise
def do__nothing(self, arg):
return False
def do_quit(self, arg):
"""..."""
print
print "Exit ..."
return True
def help_help(self):
print "toma help ..."
return False
def extract_mynames(self):
return [a[0] for a in getmembers(self, ismethod) \
if (a[0].startswith("do_") and a[0][3] != "_")]
def get_names(self):
return self.mycomm
def parseline(self, line, complete = False):
"""Parse the line into a command name and a string containing
the arguments. Returns a tuple containing (command, args, line).
'command' and 'args' may be None if the line couldn't be parsed.
"""
line = line.strip()
if not line:
return None, None, line
elif line.startswith('? ') or (line == '?' and not complete):
line = '.help' + line[1:]
elif line[0] == '!':
if hasattr(self, 'do_shell'):
line = '.shell ' + line[1:]
else:
return None, None, line
i, n = 0, len(line)
if line[0] == '.' or line[0] == '?': i = 1
while i < n and line[i] in self.identchars: i = i+1
if i < n and not line[i].isspace():
return None, None, line
else:
cmd, arg = line[:i], line[i:].strip()
return cmd, arg, line
def onecmd(self, line):
"""Interpret the argument as though it had been typed in response
to the prompt.
This may be overridden, but should not normally need to be;
see the precmd() and postcmd() methods for useful execution hooks.
The return value is a flag indicating whether interpretation of
commands by the interpreter should stop.
"""
cmd, arg, line = self.parseline(line)
if not line:
return self.emptyline()
if cmd is None:
return self.default(line)
self.lastcmd = line
if cmd == '':
return self.default(line)
elif cmd[0] == '.':
try:
func = getattr(self, 'do_' + cmd[1:])
except AttributeError:
return self.default(line)
try:
return func(arg)
except KeyboardInterrupt:
return False
elif self.device.isvalidcommand(cmd):
if cmd.find('*') < 0:
self.processdevcmd(line)
else:
print "Cannot execute binary transfer from the command line"
return False
else:
print "executing (eval): ", line
try:
print eval(line)
except Exception as e:
print "Python exception:", e
return False
def emptyline(self):
"""Called when an empty line is entered in response to the prompt.
If this method is not overridden, it repeats the last nonempty
command entered.
"""
#if self.lastcmd:
# return self.onecmd(self.lastcmd)
def default(self, line):
"""Called on an input line when the command prefix is not recognized.
If this method is not overridden, it prints an error message and
returns.
"""
self.stdout.write('*** Unknown syntax: %s\n'%line)
def completedefault(self, *ignored):
"""Method called to complete an input line when no command-specific
complete_*() method is available.
By default, it returns an empty list.
"""
#print "completedefault()", ignored
return []
def completenames(self, text, *ignored):
#print "completenames()", text, ignored
list = []
if text == "" or text[0] == '.':
dotext = 'do_' + text[1:]
list.extend(['.'+a[3:] for a in self.get_names() \
if a.startswith(dotext)])
if text == "" or text[0] != '.':
text = text.upper()
list.extend([a for a in self.devcomm if a.startswith(text)])
return(list)
def complete(self, text, state):
"""Return the next possible completion for 'text'.
If a command has not been entered, then complete against command list.
Otherwise try to call complete_<command> to get list of completions.
"""
#print "complete()", text, "-", state
if state == 0:
import readline
origline = readline.get_line_buffer()
line = origline.lstrip()
stripped = len(origline) - len(line)
begidx = readline.get_begidx() - stripped
endidx = readline.get_endidx() - stripped
#print "-->", line, begidx, endidx
if begidx>0:
cmd, args, foo = self.parseline(line, True)
if cmd == '':
compfunc = self.completedefault
else:
try:
compfunc = getattr(self, 'complete_' + cmd)
except AttributeError:
compfunc = self.completedefault
else:
compfunc = self.completenames
self.completion_matches = compfunc(text, line, begidx, endidx)
try:
return self.completion_matches[state]
except IndexError:
return None
def cmdloop(self, intro=None):
"""Repeatedly issue a prompt, accept input, parse an initial prefix
off the received input, and dispatch to action methods, passing them
the remainder of the line as argument.
"""
self.preloop()
if self.use_rawinput and self.completekey:
try:
import readline
self.old_completer = readline.get_completer()
readline.set_completer(self.complete)
readline.set_completer_delims(" ")
readline.parse_and_bind(self.completekey+": complete")
except ImportError:
pass
try:
if intro is not None:
self.intro = intro
if self.intro:
self.stdout.write(str(self.intro)+"\n")
stop = None
while not stop:
if self.cmdqueue:
line = self.cmdqueue.pop(0)
else:
if self.use_rawinput:
try:
line = raw_input(self.prompt)
except KeyboardInterrupt:
print
line = ""
except EOFError:
print
line = '.quit'
else:
self.stdout.write(self.prompt)
self.stdout.flush()
line = self.stdin.readline()
if not len(line):
line = '.quit'
else:
line = line[:-1] # chop \n
line = self.precmd(line)
stop = self.onecmd(line)
stop = self.postcmd(stop, line)
self.postloop()
finally:
if self.use_rawinput and self.completekey:
try:
import readline
readline.set_completer(self.old_completer)
except ImportError:
pass
def complete_help(self, *args):
print "complete_help()"
return self.completenames(*args)
def do_help(self, arg):
if arg:
# XXX check arg syntax
try:
func = getattr(self, 'help_' + arg)
except AttributeError:
try:
doc=getattr(self, 'do_' + arg).__doc__
if doc:
self.stdout.write("%s\n"%str(doc))
return
except AttributeError:
pass
self.stdout.write("%s\n"%str(self.nohelp % (arg,)))
return
func()
else:
names = self.get_names()
cmds_doc = []
cmds_undoc = []
help = {}
for name in names:
if name[:5] == 'help_':
help[name[5:]]=1
names.sort()
# There can be duplicates if routines overridden
prevname = ''
for name in names:
if name[:3] == 'do_':
if name == prevname:
continue
prevname = name
cmd=name[3:]
if cmd in help:
cmds_doc.append(cmd)
del help[cmd]
elif getattr(self, name).__doc__:
cmds_doc.append(cmd)
else:
cmds_undoc.append(cmd)
self.stdout.write("%s\n"%str(self.doc_leader))
self.print_topics(self.doc_header, cmds_doc, 15,80)
self.print_topics(self.misc_header, help.keys(),15,80)
self.print_topics(self.undoc_header, cmds_undoc, 15,80)
def print_topics(self, header, cmds, cmdlen, maxcol):
if cmds:
self.stdout.write("%s\n"%str(header))
if self.ruler:
self.stdout.write("%s\n"%str(self.ruler * len(header)))
self.columnize(cmds, maxcol-1)
self.stdout.write("\n")
def columnize(self, list, displaywidth=80):
"""Display a list of strings as a compact set of columns.
Each column is only as wide as necessary.
Columns are separated by two spaces (one was not legible enough).
"""
if not list:
self.stdout.write("<empty>\n")
return
nonstrings = [i for i in range(len(list))
if not isinstance(list[i], str)]
if nonstrings:
raise TypeError, ("list[i] not a string for i in %s" %
", ".join(map(str, nonstrings)))
size = len(list)
if size == 1:
self.stdout.write('%s\n'%str(list[0]))
return
# Try every row count from 1 upwards
for nrows in range(1, len(list)):
ncols = (size+nrows-1) // nrows
colwidths = []
totwidth = -2
for col in range(ncols):
colwidth = 0
for row in range(nrows):
i = row + nrows*col
if i >= size:
break
x = list[i]
colwidth = max(colwidth, len(x))
colwidths.append(colwidth)
totwidth += colwidth + 2
if totwidth > displaywidth:
break
if totwidth <= displaywidth:
break
else:
nrows = len(list)
ncols = 1
colwidths = [0]
for row in range(nrows):
texts = []
for col in range(ncols):
i = row + nrows*col
if i >= size:
x = ""
else:
x = list[i]
texts.append(x)
while texts and not texts[-1]:
del texts[-1]
for col in range(len(texts)):
texts[col] = texts[col].ljust(colwidths[col])
self.stdout.write("%s\n"%str(" ".join(texts)))
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""Handle communication with any Deep or IcePAP device"""
# Standard modules
import string
import time
import numpy
import sys
import pdb
from threading import Lock #import gevent
# DEEP modules
from . import log
# Get python modules to communicate with an DEEP device
from sockdeep import SockDeep
from sldeep import SLDeep
# End of Command character and other special ones
COMM_EOL = "\n"
COMM_ACK = "#"
COMM_REQ = "?"
COMM_ADR = ":"
COMM_BIN = "*"
COMM_MLI = "$"
# Device long answer timeout in seconds
COMM_LONG_TIMEOUT = 20
# Device generic command
COMM_ALIVE_CMD = "?PING"
COMM_ALIVE_ICECMD = "?_SOCKPING"
COMM_ALIVE_ANS = "OK"
# Binary protocol
BIN_HEAD_SIGNATURE = 0xa5a50000
BIN_HEAD_ICESIGNATURE = 0xa5aa555a
BIN_HEAD_SIGNMASK = 0xffff0000
BIN_HEAD_NOCHECKSUM = 0x00000010
BIN_HEAD_BIG_ENDIAN = 0x00000020
BIN_HEAD_UNITMASK = 0x0000000f
# Binary value type given in bytes per single value
BIN_8 = 1
BIN_16 = 2
BIN_32 = 4
BIN_64 = 8
#
#
#
class DeviceError(Exception):
#
#
def __init__(self, device, message):
Exception.__init__(self, device.hostname() + ': ' + message)
#
#
#
class DeepDevice(object):
_icepapmode = False
#
#
def __init__(self, dev, argin_str="", timeout=None):
# parse options given at object creation
argins = string.split(argin_str)
for argin in argins:
try: