Commit 4cf3067d authored by Jose Tiago Macara Coutinho's avatar Jose Tiago Macara Coutinho Committed by Jose Tiago Macara Coutinho
Browse files

shell: customizable bliss shell

parent d48ddd3c
......@@ -53,32 +53,6 @@ def initialize(*session_names):
return user_ns,sessions
REPL_CONFIGS = []
def repl_config(func):
"""
Register decorated function to be called by ptpython's configure.
Here is an example on how to do it in your setup file::
from bliss.shell import repl_config
@repl_config
def configure(repl):
# Use the classic prompt. (Display '>>>' instead of 'In [1]'.)
repl.prompt_style = 'classic' # 'classic' or 'ipython'
Args:
func (callable): a callable with one argument: the repl
Returns:
the same func callable
"""
global REPL_CONFIGS
if func not in REPL_CONFIGS:
REPL_CONFIGS.append(func)
return func
def _find_unit(obj):
try:
if hasattr(obj, 'unit'):
......@@ -197,7 +171,7 @@ class ScanListener:
line = " ".join([self.col_templ[i].format(v) for i, v in enumerate(values)])
if self.term.is_a_tty:
monitor = scan_info.get('output_mode', 'tail') == 'monitor'
print_(line, end=monitor and '\r' or '\n', flush=True)
print_('\r' + line, end=monitor and '\r' or '\n', flush=True)
else:
print_(line)
......
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
from .repl import *
from .main import *
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""Bliss shell"""
from .repl import embed
embed()
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""Bliss ESRF machine status bar"""
from os import environ
from datetime import timedelta
from collections import namedtuple
import tango
import tango.gevent
from prompt_toolkit.token import Token
from bliss.session.session import get_default as default_session
from .layout import StatusToken, Separator
BEAMLINE = environ.get('BEAMLINENAME', 'ID99')
BEAMLINE_TYPE, BEAMLINE_NUMBER = '', '00'
for i, c in enumerate(BEAMLINE):
if c.isdigit():
BEAMLINE_TYPE = BEAMLINE[:i]
BEAMLINE_NUMBER = BEAMLINE[i:]
break
FE_DEVICE = 'orion:10000/fe/{0}/{1}'.format(BEAMLINE_TYPE, BEAMLINE_NUMBER)
ID_DEVICE = 'orion:10000/id/id/{0}'.format(BEAMLINE_NUMBER)
Attribute = namedtuple('Attribute', 'label attr_name unit display')
QMAP = {
tango.AttrQuality.ATTR_VALID: StatusToken.Ok,
tango.AttrQuality.ATTR_WARNING: StatusToken.Warning,
tango.AttrQuality.ATTR_ALARM: StatusToken.Alarm,
tango.AttrQuality.ATTR_CHANGING: StatusToken.Changing,
}
def tango_value(attr, value):
if value is None or value.has_failed or value.is_empty or \
value.quality == tango.AttrQuality.ATTR_INVALID:
token, v = StatusToken.Error, '-----'
elif attr.display is None:
token, v = QMAP[value.quality], value.value
else:
token, v = attr.display(value)
return token, v
class DeviceStatus(object):
attributes = ()
def __init__(self, device, attributes=None):
if attributes is not None:
self.attributes = attributes
self.device = tango.gevent.DeviceProxy(device)
def __call__(self, cli):
n = len(self.attributes)
try:
values = self.device.read_attributes([a.attr_name
for a in self.attributes])
except Exception as e:
values = n*[None]
result = []
for i, (attr, value) in enumerate(zip(self.attributes, values)):
if i > 0:
result.append(Separator)
token, value = tango_value(attr, value)
if cli.python_input.bliss_bar_format != 'compact':
result.append((StatusToken, attr.label))
value = '{0}{1}'.format(value, attr.unit)
result.append((token, value))
return result
class FEStatus(DeviceStatus):
def decode_fe_state(value):
lvalue = value.value.lower()
if 'open' in lvalue:
return Token.Toolbar.Status.Open, 'OPEN'
elif 'close' in lvalue:
return Token.Toolbar.Status.Close, 'CLOSED'
elif 'fault' in lvalue:
return Token.Toolbar.Status.Error, 'FAULT'
return QMAP[value.quality], value.value
current = Attribute('SRCurr: ', 'SR_Current', 'mA',
lambda x: (QMAP[x.quality],
'{0:07.3f}'.format(x.value)))
lifetime = Attribute('Lifetime: ', 'SR_Lifetime', '',
lambda x: (QMAP[x.quality],
str(timedelta(seconds=max(x.value, 0)))))
mode = Attribute('Mode: ', 'SR_Filling_Mode', '', None)
refill = Attribute('Refill in ', 'SR_Refill_Countdown', '',
lambda x: (QMAP[x.quality],
str(timedelta(seconds=max(x.value, 0)))))
state = Attribute('FE: ', 'FE_State', '', decode_fe_state)
message = Attribute('', 'SR_Operator_Mesg', '', None)
attributes = current, lifetime, mode, refill, state, message
def __init__(self, device=FE_DEVICE, **kwargs):
super(FEStatus, self).__init__(device, **kwargs)
class IDStatus(DeviceStatus):
def __init__(self, device=ID_DEVICE, **kwargs):
super(IDStatus, self).__init__(device, **kwargs)
session = default_session()
if session:
name = ' ' + session.name.upper()
else:
name = ''
self.title = u'ESRF-{beamline}{session}'.format(beamline=BEAMLINE,
session=name)
def __call__(self, cli):
if cli.python_input.bliss_sessions:
session = ' ' + cli.python_input.bliss_sessions[0].name.upper()
else:
session = ''
return [(Token.Toolbar.Status.Name, self.title), Separator] + \
super(IDStatus, self).__call__(cli)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
......@@ -6,43 +5,27 @@
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""
Usage: bliss [--log-level=<log_level>] [(-s | --session)] <name>...
bliss [--show-sessions]
bliss
bliss (-h | --help)
Options:
--log-level=<log_level> Log level [default: WARN].
--show-sessions Display available sessions.
-s, --session Starts with some session(s).
-h, --help Show this screen.
"""
import sys
__all__ = ['PosixGeventEventLoop']
import os
import logging
import gevent
import docopt
from ptpython import repl
from ptpython.prompt_style import PromptStyle
from prompt_toolkit.eventloop.base import EventLoop, INPUT_TIMEOUT
from gevent import select
from prompt_toolkit.utils import DummyContext
from prompt_toolkit.terminal.vt100_input import InputStream
from prompt_toolkit.eventloop.posix_utils import PosixStdinReader
from prompt_toolkit.eventloop.posix import call_on_sigwinch, DummyContext, in_main_thread
from prompt_toolkit.eventloop.posix import call_on_sigwinch, in_main_thread
from prompt_toolkit.eventloop.select import fd_to_int
from prompt_toolkit.token import Token
from gevent import select
import time
import signal
import functools
from bliss.shell import initialize, ScanListener, REPL_CONFIGS
from bliss.config import static
from prompt_toolkit.eventloop.posix_utils import PosixStdinReader
from prompt_toolkit.eventloop.base import EventLoop, INPUT_TIMEOUT
class GeventEventLoop(EventLoop):
class PosixGeventEventLoop(EventLoop):
def __init__(self, *args, **kwargs):
super(EventLoop, self).__init__()
self.readers = dict()
self._running = True
self._schedule_pipe_read,self._schedule_pipe_write = os.pipe()
self._schedule_pipe_read, self._schedule_pipe_write = os.pipe()
self._calls_from_executor = list()
self._callbacks = None
self._winch_callback_done = True
......@@ -166,129 +149,6 @@ class GeventEventLoop(EventLoop):
callback()
self._calls_from_executor.append(postpone)
try:
os.write(self._schedule_pipe_write,'x')
os.write(self._schedule_pipe_write, 'x')
except (AttributeError, IndexError, OSError):
pass
class BlissPrompt(PromptStyle):
"""
A prompt resembling the IPython prompt.
To activate it, simply set it in the shell configure method:
def configure(repl):
repl.prompt_style = 'bliss'
"""
def __init__(self, python_input, prompt_name):
self.python_input = python_input
self.prompt_name = prompt_name
def in_tokens(self, cli):
return [
(Token.In, self.prompt_name),
(Token.In, ' ['),
(Token.In.Number, '%s' % self.python_input.current_statement_index),
(Token.In, ']: '),
]
def in2_tokens(self, cli, width):
return [
(Token.In, '...: '.rjust(width)),
]
def out_tokens(self, cli):
return [
(Token.Out, '{0:>{width}}'.format('Out', width=len(self.prompt_name))),
(Token.Out, ' ['),
(Token.Out.Number, '%s' % self.python_input.current_statement_index),
(Token.Out, ']:'),
(Token, ' '),
]
CURRENT_TASK = None
def main():
try:
# Parse arguments, use file docstring as a parameter definition
arguments = docopt.docopt(__doc__)
sessions_name = arguments['<name>']
except docopt.DocoptExit as e:
print e.message
else:
log_level = getattr(logging, arguments['--log-level'].upper())
fmt = '%(levelname)s %(asctime)-15s %(name)s: %(message)s'
logging.basicConfig(level=log_level, format=fmt)
if arguments['--show-sessions']:
config = static.get_config()
print 'Session name(s):'
for name in config.names_list:
c = config.get_config(name)
if c.get('class') != 'Session': continue
if c.get_inherited('plugin') != 'session': continue
print ' '*4,name
exit(0)
repl.create_eventloop = GeventEventLoop
scan_listener = ScanListener()
user_ns, sessions = initialize(*sessions_name)
if sessions_name:
session_id = '_'.join(sessions_name)
session_title = u'Bliss shell ({0})'.format(', '.join(sessions_name))
history_filename = ".%s_%s_history" % (os.path.basename(sys.argv[0]), session_id)
prompt_name = sessions_name[0].upper()
else:
session_id = 'unnamed'
session_title = u'Bliss shell'
history_filename = ".%s_history" % os.path.basename(sys.argv[0])
prompt_name = 'BLISS'
history_filename = os.path.join(os.environ["HOME"], history_filename)
def patch_repl(repl):
repl.get_globals()['REPL'] = repl
prev_execute = repl._execute
def wrapped_f(self,*args, **kwargs):
try:
return prev_execute(self,*args,**kwargs)
except:
return sys.exc_info()
def patched_execute(*args, **keys):
global CURRENT_TASK
CURRENT_TASK = gevent.spawn(wrapped_f, *args, **keys)
try:
try:
return_value = CURRENT_TASK.get()
if isinstance(return_value, tuple) and len(return_value) >= 3:
if isinstance(return_value[1], (BaseException, Exception)):
raise return_value[0], return_value[1], return_value[2]
except gevent.Timeout as e: #gevent.Timeout doesn't inherit from Exception.
repl._handle_exception(*args)
finally:
CURRENT_TASK = None
repl._execute = patched_execute
repl.all_prompt_styles['bliss'] = BlissPrompt(repl, prompt_name)
for configure in REPL_CONFIGS:
try:
configure(repl)
except:
sys.excepthook(*sys.exc_info())
def stop_current_task(signum, frame, exception=gevent.GreenletExit):
if CURRENT_TASK:
CURRENT_TASK.kill(block=False, exception=exception)
signal.signal(signal.SIGINT, functools.partial(stop_current_task, exception=KeyboardInterrupt))
signal.signal(signal.SIGTERM, stop_current_task)
repl.embed(user_ns, None, vi_mode=False, history_filename=history_filename,
configure=patch_repl, title=session_title)
if __name__ == '__main__':
main()
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""Bliss command line interface"""
import gevent
from ptpython.python_input import PythonCommandLineInterface
__all__ = ('BlissCommandLineInterface',)
class BlissCommandLineInterface(PythonCommandLineInterface):
"""A python command line interface with a refresh loop"""
def __init__(self, *args, **kwargs):
self._refresh_interval = kwargs.pop('refresh_interval', None)
self.python_input = kwargs['python_input']
super(BlissCommandLineInterface, self).__init__(*args, **kwargs)
self._refresh_task = None
if self._refresh_interval:
self.on_start += self._start_refresh_loop
self.on_stop += self._stop_refresh_loop
@staticmethod
def _start_refresh_loop(cli):
cli._refresh_task = gevent.spawn(cli._refresh)
@staticmethod
def _stop_refresh_loop(cli):
if cli._refresh_task:
cli._refresh_task.kill()
def _refresh(self):
while self._refresh_interval:
self.invalidate()
gevent.sleep(self._refresh_interval)
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""Bliss status bar"""
import functools
import gevent
from prompt_toolkit.token import Token
from prompt_toolkit.layout.toolbars import TokenListToolbar
from prompt_toolkit.filters import IsDone, RendererHeightIsKnown, Condition
from prompt_toolkit.layout.screen import Char
from bliss.common.axis import Axis
from bliss.config.static import get_config
__all__ = ('status_bar', 'AxisStatus', 'StatusToken', 'Separator')
StatusToken = Token.Toolbar.Status
Separator = StatusToken, ' | '
class StatusToolbar(TokenListToolbar):
"""
Bliss status toolbar.
Example on how to add items in your setup::
from bliss.shell.cli import configure
from bliss.shell.layout import AxisStatus, StatusToken
@configure
def config(repl):
# use compact format (means no labels)
repl.bliss_bar_format = 'compact'
# add the theta axis status
repl.bliss_bar.items.append(AxisStatus('theta'))
# add a fixed message
repl.bliss_bar.items.append([[StatusToken, 'a fixed status message']])
"""
def __init__(self, items, *args, **kwargs):
self.items = list(items)
self.format = kwargs.pop('format', 'normal')
get_tokens = functools.partial(self.get_tokens, self)
super(StatusToolbar, self).__init__(get_tokens, *args, **kwargs)
@staticmethod
def get_tokens(bar, cli):
items = [gevent.spawn(item, cli) if callable(item) else item
for item in bar.items]
values = [item.get() if isinstance(item, gevent.Greenlet) else item
for item in items]
result = []
for i, value in enumerate(values):
if i > 0:
result.append(Separator)
result.extend(value)
return result
def status_bar(python_input, *items):
return StatusToolbar(items,
default_char=Char(token=StatusToken),
filter=~IsDone() & RendererHeightIsKnown() &
Condition(lambda cli: python_input.show_bliss_bar and
python_input.bliss_bar.items and
not python_input.show_exit_confirmation))
class AxisStatus(object):
def __init__(self, axis):
self.name = axis.name if isinstance(axis, Axis) else axis
def __call__(self, cli):
config = get_config()
axis = config.get(self.name)
label = axis.config.get('label', default=self.name)
unit = axis.config.get('unit', default='')
state, position = axis.state(), axis.position()
result = []
if cli.python_input.bliss_bar_format != 'compact':
result.append((StatusToken, label + ': '))
if state == 'MOVING':
token = StatusToken.Changing
else:
token = StatusToken.Ok
value = '{0:.4}{1}'.format(position, unit)
result.append((token, value))
return result
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""
Usage: bliss [--log-level=<log_level>] [(-s | --session)] <name>...
bliss [--show-sessions]
bliss
bliss (-h | --help)
Options:
--log-level=<log_level> Log level [default: WARN].
--show-sessions Display available sessions.
-s, --session Starts with some session(s).
-h, --help Show this screen.
"""
import logging
import docopt
from bliss.config import static
from .repl import embed
__all__ = ('main',)
def main():
try:
# Parse arguments, use file docstring as a parameter definition
arguments = docopt.docopt(__doc__)
session_names = arguments['<name>']
except docopt.DocoptExit as e:
print e.message
else:
log_level = getattr(logging, arguments['--log-level'].upper())
fmt = '%(levelname)s %(asctime)-15s %(name)s: %(message)s'
logging.basicConfig(level=log_level, format=fmt)
if arguments['--show-sessions']:
config = static.get_config()
print 'Session name(s):'
for name in config.names_list:
c = config.get_config(name)
if c.get('class') != 'Session': continue
if c.get_inherited('plugin') != 'session': continue
print ' '*4,name
exit(0)
embed(session_names=session_names)
if __name__ == '__main__':
main()
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""Bliss prompt"""
import time
from prompt_toolkit.token import Token
from ptpython.prompt_style import PromptStyle