Commit 7a5f28b6 authored by Sebastien Petitdemange's avatar Sebastien Petitdemange
Browse files

scanning tools: moved into the new proposed file structure.

|── common
│   ├── scans.py => ascan, dscan, timescan, etc.
│   └── standard.py => ct + exported functions from scans.py
├── data
│   ├── lima.py
│   ├── node.py => DataNode
│   └── scan.py
└── scanning
    ├── acquisition
    |    |── motor.py
    |    |── p201.py ...
    ├── chain.py => AcquisitionMaster, AcquisitionDevice, AcquisitionChain
    ├── scan.py  => Scan, ScanSaving, ScanRecorder, current writer.common.*
    └── writer
         ├── hdf5.py
parent 5cd93579
......@@ -37,16 +37,3 @@ __description__ = 'BeamLine Instrumentation Support Software'
from gevent import monkey
monkey.patch_all(thread=False)
## TODO: remove those exported functions ; need changes in motor tests
from bliss.config.motors import load_cfg, load_cfg_fromstring, get_axis, get_encoder
from bliss.controllers.motor_group import Group
##
from bliss.common.task_utils import cleanup, error_cleanup
from bliss.common.scans import *
from bliss.common.standard import *
from bliss.common.continuous_scan import Scan
from bliss.common.continuous_scan import AcquisitionChain
from bliss.common.continuous_scan import AcquisitionDevice
from bliss.common.continuous_scan import AcquisitionMaster
from bliss.common.axis import Axis
......@@ -15,8 +15,6 @@ This module gathers most common functionality to bliss (from
:toctree:
axis
continuous_scan
data_manager
encoder
event
log
......
import os, errno
import h5py
from bliss.common.continuous_scan import AcquisitionDevice,AcquisitionMaster
from bliss.scanning.chain import AcquisitionDevice,AcquisitionMaster
class FileOrganizer(object):
def __init__(self,root_path,
......
......@@ -14,7 +14,6 @@ Most common scan procedures (:func:`~bliss.common.scans.ascan`, \
__all__ = ['ascan', 'a2scan', 'dscan', 'd2scan', 'timescan', 'ct']
import time
import getpass
import logging
import numpy
......@@ -24,14 +23,14 @@ from bliss import setup_globals
from bliss.common.task_utils import *
from bliss.controllers.motor_group import Group
from bliss.common.measurement import CounterBase
from bliss.acquisition.counter import CounterAcqDevice
from bliss.common.continuous_scan import AcquisitionChain,Scan
from bliss.acquisition.timer import SoftwareTimerMaster
from bliss.acquisition.motor import LinearStepTriggerMaster
from bliss.scanning.acquisition.counter import CounterAcqDevice
from bliss.scanning.chain import AcquisitionChain
from bliss.scanning import scan as scan_module
from bliss.scanning.acquisition.timer import SoftwareTimerMaster
from bliss.scanning.acquisition.motor import LinearStepTriggerMaster
from bliss.session import session,measurementgroup
from bliss.data.writer import hdf5
from . import data_manager
from .event import dispatcher
from bliss.scanning.writer import hdf5
from .event import send
_log = logging.getLogger('bliss.scans')
......@@ -77,10 +76,10 @@ class _ScanDataWatch(object):
if self._last_point_display == -1:
counter_names = [x for x in self._channel_name_2_channel.keys() if x not in self._motors_name]
self._scan_info['counter_names'] = counter_names
dispatcher.send("scan_new",data_manager,
self._scan_info,self._root_path,
self._motors_name,self._scan_info['npoints'],
counter_names)
send(scan_module,"scan_new",
self._scan_info,self._root_path,
self._motors_name,self._scan_info['npoints'],
counter_names)
self._last_point_display += 1
min_nb_points = None
......@@ -100,8 +99,8 @@ class _ScanDataWatch(object):
values.extend((channel.get(point_nb)
for channel in self._channel_name_2_channel.values()
if channel not in motor_channels))
dispatcher.send("scan_data",data_manager,
self._scan_info,values)
send(scan_module,"scan_data",
self._scan_info,values)
if min_nb_points is not None:
self._last_point_display = min_nb_points
#check end
......@@ -111,11 +110,11 @@ class _ScanDataWatch(object):
if data_node.type() == 'zerod':
self._channel_end_nb += len(data_node.channel_name())
if self._channel_end_nb == len(self._channel_name_2_channel):
dispatcher.send("scan_end",self._scan_info)
send(scan_module,"scan_end",self._scan_info)
def _do_scan(chain,scan_info) :
scandata = data_manager.ScanSaving()
scandata = scan_module.ScanSaving()
config = scandata.get()
root_path = config['root_path']
writer = hdf5.Writer(root_path)
......@@ -123,11 +122,11 @@ def _do_scan(chain,scan_info) :
scan_info['session_name'] = scandata.session
scan_info['user_name'] = scandata.user_name
scan_data_watch = _ScanDataWatch(root_path,scan_info)
scan_recorder = data_manager.ScanRecorder(parent=config['parent'],
scan_info=scan_info,
writer=writer,
data_watch_callback=scan_data_watch)
scan = Scan(chain, scan_recorder)
scan_recorder = scan_module.ScanRecorder(parent=config['parent'],
scan_info=scan_info,
writer=writer,
data_watch_callback=scan_data_watch)
scan = scan_module.Scan(chain, scan_recorder)
scan.prepare()
scan.start()
......
......@@ -8,7 +8,7 @@
import struct
import numpy
from PyTango.gevent import DeviceProxy
from bliss.common.data_manager import DataNode
from bliss.data.node import DataNode
from bliss.config.settings import QueueObjSetting
from bliss.config.conductor import client
from bliss.config import channels
......
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import pkgutil
import inspect
import re
import datetime
import os
from bliss.common.event import dispatcher
from bliss.config.conductor import client
from bliss.config.settings import Struct, QueueSetting, HashObjSetting
def to_timestamp(dt, epoch=None):
if epoch is None:
epoch = datetime.datetime(1970,1,1)
td = dt - epoch
return td.microseconds / 10**6 + td.seconds + td.days * 86400
class _DataManager(object):
def __init__(self):
self._last_scan_data = None
def new_scan(self, motor, npoints, counters_list, env=None, save_flag=True):
from bliss.common.scans import ScanEnvironment
if env is None:
env = ScanEnvironment()
env['save'] = False
env['title'] = 'unnamed'
if isinstance(motor, list):
return Scan(motor, npoints, counters_list, env)
else:
# assuming old API
# motor: filename
# npoints: motor
# counters_list: npoints
# env: counters_list
# save_flag
scan_env = ScanEnvironment()
scan_env['save'] = save_flag
scan_env['filename'] = motor
scan_env['title'] = 'unnamed'
return Scan(npoints, counters_list, env, scan_env)
def new_timescan(self, counters_list, env):
return Timescan(counters_list, env)
def last_scan_data(self):
return self._last_scan_data
# From continuous scan
node_plugins = dict()
for importer, module_name, _ in pkgutil.iter_modules([os.path.join(os.path.dirname(__file__),'..','data')]):
node_plugins[module_name] = importer
def _get_node_object(node_type, name, parent, connection, create=False):
importer = node_plugins.get(node_type)
if importer is None:
return DataNode(node_type, name, parent, connection = connection, create = create)
else:
m = importer.find_module(node_type).load_module(node_type)
classes = inspect.getmembers(m, lambda x: inspect.isclass(x) and issubclass(x, DataNode) and x!=DataNode)
# there should be only 1 class inheriting from DataNode in the plugin
klass = classes[0][-1]
return klass(name, parent = parent, connection = connection, create = create)
def get_node(name, node_type = None, parent = None, connection = None):
if connection is None:
connection = client.get_cache(db=1)
data = Struct(name, connection=connection)
if node_type is None:
node_type = data.node_type
if node_type is None: # node has been deleted
return None
return _get_node_object(node_type, name, parent, connection)
def _create_node(name, node_type = None, parent = None, connection = None):
if connection is None:
connection = client.get_cache(db=1)
return _get_node_object(node_type, name, parent, connection, create=True)
def _get_or_create_node(name, node_type=None, parent=None, connection = None):
if connection is None:
connection = client.get_cache(db=1)
db_name = DataNode.exists(name, parent, connection)
if db_name:
return get_node(db_name, connection=connection)
else:
return _create_node(name, node_type, parent, connection)
class DataNodeIterator(object):
NEW_CHILD_REGEX = re.compile("^__keyspace@.*?:(.*)_children_list$")
def __init__(self, node):
self.node = node
self.last_child_id = dict()
def walk(self, filter=None, wait=True):
#print self.node.db_name(),id(self.node)
try:
it = iter(filter)
except TypeError:
if filter is not None:
filter = [filter]
if wait:
redis = self.node.db_connection
pubsub = redis.pubsub()
pubsub.psubscribe("__keyspace*__:%s*_children_list" % self.node.db_name())
db_name = self.node.db_name()
self.last_child_id[db_name]=0
if filter is None or self.node.type() in filter:
yield self.node
for i, child in enumerate(self.node.children()):
iterator = DataNodeIterator(child)
for n in iterator.walk(filter, wait=False):
self.last_child_id[db_name] = i
if filter is None or n.type() in filter:
yield n
if wait:
for msg in pubsub.listen():
if msg['data'] == 'rpush':
channel = msg['channel']
parent_db_name = DataNodeIterator.NEW_CHILD_REGEX.match(channel).groups()[0]
for child in get_node(parent_db_name).children(self.last_child_id.setdefault(parent_db_name, 0), -1):
self.last_child_id[parent_db_name]+=1
if filter is None or child.type() in filter:
yield child
class _TTL_setter(object):
def __init__(self,db_name):
self._db_name = db_name
self._disable = False
def disable(self):
self._disable = True
def __del__(self):
if not self._disable:
node = get_node(self._db_name)
if node is not None:
node.set_ttl()
class DataNode(object):
default_time_to_live = 24*3600 # 1 day
@staticmethod
def exists(name,parent = None, connection = None):
if connection is None:
connection = client.get_cache(db=1)
db_name = '%s:%s' % (parent.db_name(),name) if parent else name
return db_name if connection.exists(db_name) else None
def __init__(self,node_type,name,parent = None, connection = None, create=False):
if connection is None:
connection = client.get_cache(db=1)
db_name = '%s:%s' % (parent.db_name(),name) if parent else name
self._data = Struct(db_name,
connection=connection)
children_queue_name = '%s_children_list' % db_name
self._children = QueueSetting(children_queue_name,
connection=connection)
info_hash_name = '%s_info' % db_name
self._info = HashObjSetting(info_hash_name,
connection=connection)
self.db_connection = connection
if create:
self._data.name = name
self._data.db_name = db_name
self._data.node_type = node_type
if parent:
self._data.parent = parent.db_name()
parent.add_children(self)
self._ttl_setter = _TTL_setter(self.db_name())
else:
self._ttl_setter = None
def db_name(self):
return self._data.db_name
def name(self):
return self._data.name
def type(self):
return self._data.node_type
def iterator(self):
return DataNodeIterator(self)
def add_children(self,*child):
if len(child) > 1:
children_no = self._children.extend([c.db_name() for c in child])
else:
children_no = self._children.append(child[0].db_name())
def connect(self, signal, callback):
dispatcher.connect(callback, signal, self)
def parent(self):
parent_name = self._data.parent
if parent_name:
parent = get_node(parent_name)
if parent is None: # clean
del self._data.parent
return parent
#@brief iter over children
#@return an iterator
#@param from_id start child index
#@param to_id last child index
def children(self,from_id = 0,to_id = -1):
for child_name in self._children.get(from_id,to_id):
new_child = get_node(child_name)
if new_child is not None:
yield new_child
else:
self._children.remove(child_name) # clean
def last_child(self):
return get_node(self._children.get(-1))
def set_info(self,key,values):
self._info[keys] = values
if self._ttl > 0:
self._info.ttl(self._ttl)
def info_iteritems(self):
return self._info.iteritems()
def info_get(self,name):
return self._info.get(name)
def data_update(self,keys):
self._data.update(keys)
def set_ttl(self):
db_names = set(self._get_db_names())
self._set_ttl(db_names)
if self._ttl_setter is not None:
self._ttl_setter.disable()
@staticmethod
def _set_ttl(db_names):
redis_conn = client.get_cache(db=1)
pipeline = redis_conn.pipeline()
for name in db_names:
pipeline.expire(name,DataNode.default_time_to_live)
pipeline.execute()
def _get_db_names(self):
db_name = self.db_name()
children_queue_name = '%s_children_list' % db_name
info_hash_name = '%s_info' % db_name
db_names = [db_name,children_queue_name,info_hash_name]
parent = self.parent()
if parent:
db_names.extend(parent._get_db_names())
return db_names
def store(self, signal, event_dict):
pass
......@@ -5,9 +5,10 @@
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import datetime
from bliss.common.data_manager import DataNode,to_timestamp
import pickle
from bliss.data.node import DataNode,to_timestamp
def _transform_dict_obj(dict_object) :
return_dict = dict()
for key,value in dict_object.iteritems():
......
import os,errno
from bliss.common import event
from bliss.common.continuous_scan import AcquisitionDevice, AcquisitionMaster
class AcquisitionMasterEventReceiver(object):
def __init__(self, master, slave, parent):
self._master = master
self._parent = parent
for signal in ('start', 'end', 'new_data'):
event.connect(slave, signal, self)
@property
def parent(self):
return self._parent
@property
def master(self):
return self._master
def __call__(self, event_dict=None, signal=None, sender=None):
return self.on_event(event_dict, signal, sender)
def on_event(self, event_dict, signal, slave):
raise NotImplementedError
class AcquisitionDeviceEventReceiver(object):
def __init__(self, device, parent):
self._device = device
self._parent = parent
for signal in ('start', 'end', 'new_data'):
event.connect(device, signal, self)
@property
def parent(self):
return self._parent
@property
def device(self):
return self._device
def __call__(self, event_dict=None, signal=None, sender=None):
return self.on_event(event_dict, signal, sender)
def on_event(self, event_dict, signal, device):
raise NotImplementedError
class FileWriter(object):
def __init__(self,root_path,
windows_path_mapping=None,
detector_temporay_path=None,
master_event_receiver=None,
device_event_receiver=None,
**keys):
""" A default way to organize file structure
windows_path_mapping -- transform unix path to windows
i.e: {'/data/visitor/':'Y:/'}
detector_temporay_path -- temporary path for a detector
i.e: {detector: {'/data/visitor':'/tmp/data/visitor'}}
"""
self._root_path = root_path
self._windows_path_mapping = windows_path_mapping or dict()
self._detector_temporay_path = detector_temporay_path or dict()
if None in (master_event_receiver, device_event_receiver):
raise ValueError("master_event_receiver and device_event_receiver keyword arguments have to be specified.")
self._master_event_receiver = master_event_receiver
self._device_event_receiver = device_event_receiver
self._event_receivers = list()
def create_path(self, scan_recorder):
path_suffix = scan_recorder.node.name()
full_path = os.path.join(self._root_path, path_suffix)
try:
os.makedirs(full_path)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(full_path):
pass
else:
raise
return full_path
def new_file(self, scan_file_dir, scan_recorder):
pass
def new_master(self, master, scan_file_dir):
raise NotImplementedError
def prepare(self, scan_recorder, scan_info, devices_tree):
scan_file_dir = self.create_path(scan_recorder)
self.new_file(scan_file_dir, scan_recorder)
self._event_receivers = list()
for dev, node in scan_recorder.nodes.iteritems():
if isinstance(dev, AcquisitionMaster):
master_entry = self.new_master(dev, scan_file_dir)
for slave in dev.slaves:
if isinstance(slave, AcquisitionDevice):
if slave.type == 'lima':
lima_dev = slave.device
lima_dev.saving_format = 'EDF'
lima_dev.saving_mode = 'AUTO_FRAME'
lima_dev.saving_frame_per_file = 1
camera_name = lima_dev.camera_type
scan_name = scan_recorder.node.name()
lima_dev.saving_directory=full_path
lima_dev.saving_prefix='%s_%s' % (scan_name,camera_name)
lima_dev.saving_suffix='.edf'
pass # link
else:
self._event_receivers.append(self._device_event_receiver(slave, master_entry))
elif isinstance(slave,AcquisitionMaster):
self._event_receivers.append(self._master_event_receiver(slave, slave, master_entry))
self._event_receivers.append(self._device_event_receiver(dev, master_entry))
......@@ -7,7 +7,7 @@
import collections
from bliss.config.settings import QueueSetting
from bliss.common.data_manager import DataNode
from bliss.data.node import DataNode
class Dataset0D(DataNode):
class DataChannel(object):
......
from __future__ import absolute_import
from bliss.common.continuous_scan import AcquisitionDevice
from ..chain import AcquisitionDevice
from bliss.common.event import dispatcher
import bliss
import numpy
......
......@@ -9,7 +9,7 @@ import numpy
import time
from gevent import event,sleep
from bliss.common.event import dispatcher
from bliss.common.continuous_scan import AcquisitionDevice,AcquisitionChannel
from ..chain import AcquisitionDevice,AcquisitionChannel
class CounterAcqDevice(AcquisitionDevice):
def __init__(self,counter,
......
......@@ -5,7 +5,7 @@
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
from bliss.common.continuous_scan import AcquisitionDevice, AcquisitionMaster, AcquisitionChannel
from ..chain import AcquisitionDevice, AcquisitionMaster, AcquisitionChannel
from bliss.common.event import dispatcher
import gevent
import time
......
......@@ -6,7 +6,7 @@
# Distributed under the GNU LGPLv3. See LICENSE for more info.
from __future__ import absolute_import
from bliss.common.continuous_scan import AcquisitionMaster, AcquisitionChannel
from ..chain import AcquisitionMaster, AcquisitionChannel
from bliss.common import axis
from bliss.common import event
from bliss.common.event import dispatcher
......
......@@ -5,7 +5,7 @@
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
from bliss.common.continuous_scan import AcquisitionDevice, AcquisitionChannel
from ..chain import AcquisitionDevice, AcquisitionChannel
from bliss.common.event import dispatcher
import gevent
import numpy
......