Commit f92dafdb authored by Matias Guijarro's avatar Matias Guijarro
Browse files

Merge branch 'pointscan' into 'master'

Add variable step scan - PointScan

See merge request !572
parents 406f0528 62975582
......@@ -33,6 +33,7 @@ from bliss.scanning.acquisition.counter import SamplingCounterAcquisitionDevice,
from bliss.scanning.chain import AcquisitionChain
from bliss.scanning import scan as scan_module
from bliss.scanning.acquisition.timer import SoftwareTimerMaster
from bliss.scanning.acquisition.motor import VariableStepTriggerMaster
from bliss.scanning.acquisition.motor import LinearStepTriggerMaster, MeshStepTriggerMaster
from bliss.scanning.acquisition.lima import LimaAcquisitionMaster
from bliss.scanning.acquisition.ct2 import CT2AcquisitionMaster
......@@ -718,3 +719,59 @@ def ct(count_time, *counters, **kwargs):
kwargs.setdefault("name","ct")
return timescan(count_time, *counters, **kwargs)
def pointscan(motor, positions, count_time, *counters, **kwargs):
"""
Point scan
Scans one motor, as specified by *motor*. The motor starts at the position
given by the first value in *positions* and ends at the position given by last value *positions*.
Count time is given by *count_time* (seconds).
Args:
motor (Axis): motor to scan
positions (list): a list of positions
count_time (float): count time (seconds)
counters (BaseCounter or
MeasurementGroup): change for those counters or measurement group.
if counter is empty use the active measurement group.
Keyword Args:
name (str): scan name in data nodes tree and directories [default: 'scan']
title (str): scan title [default: 'pointscan <motor> <positions>']
save (bool): save scan data to file [default: True]
return_scan (bool): False by default
"""
scan_info = {'type': kwargs.get('type', 'pointscan'),
'save': kwargs.get('save', True),
'title': kwargs.get('title')}
npoints = len(positions)
if scan_info['title'] is None:
args = scan_info['type'], motor.name, positions[0], positions[npoints-1], npoints, count_time
template = " ".join(['{{{0}}}'.format(i) for i in range(len(args))])
scan_info['title'] = template.format(*args)
counters,other_counters = _get_all_counters(counters)
scan_info.update({'npoints': npoints, 'total_acq_time': npoints * count_time,
'motors': [TimestampPlaceholder(), motor],
'start': positions[0], 'stop': positions[npoints-1],
'counters': counters,
'other_counters': other_counters,
'count_time': count_time})
chain = AcquisitionChain(parallel_prepare=True)
timer = default_chain(chain, scan_info, counters)
top_master = VariableStepTriggerMaster(motor, positions)
chain.add(top_master, timer)
_log.info("Scanning %s from %f to %f in %d points",
motor.name, positions[0], positions[npoints-1], npoints)
scan = step_scan(chain, scan_info,
name=kwargs.setdefault("name", "pointscan"), save=scan_info['save'])
scan.run()
if kwargs.get('return_scan', False):
return scan
......@@ -295,3 +295,63 @@ class LinearStepTriggerMaster(_StepTriggerMaster):
for axis, start, stop in grouped(args, 3):
params.extend((axis, start, stop, nb_point))
_StepTriggerMaster.__init__(self, *params, **keys)
class VariableStepTriggerMaster(AcquisitionMaster):
"""
Generic motor master helper for a variable step by step acquisition.
:param *args == mot1, positions,...
Example::
_VariableStepTriggerMaster(mot1, positions, mot2, positions2)
"""
def __init__(self, *args, **keys):
trigger_type = keys.pop('trigger_type', AcquisitionMaster.SOFTWARE)
self.next_mv_cmd_arg = list()
if len(args) % 2:
raise TypeError('_VariableStepTriggerMaster: argument is a mot, positions ...')
self._motor_pos = list()
self._axes = list()
for _axis, pos_list in grouped(args, 2):
self._axes.append(_axis)
self._motor_pos.append(pos_list)
mot_group = Group(*self._axes)
group_name = '/'.join((x.name for x in self._axes))
AcquisitionMaster.__init__(self, mot_group, group_name, "zerod",
trigger_type=trigger_type, **keys)
self.channels.extend(
(AcquisitionChannel(axis.name, numpy.double, ()) for axis in self._axes))
@property
def npoints(self):
return min((len(x) for x in self._motor_pos))
def __iter__(self):
iter_pos = [iter(x) for x in self._motor_pos]
while True:
self.next_mv_cmd_arg = list()
for _axis, pos in zip(self._axes, iter_pos):
self.next_mv_cmd_arg.extend((_axis, pos.next()))
yield self
def prepare(self):
self.device.move(*self.next_mv_cmd_arg)
def start(self):
self.trigger()
def stop(self):
self.device.stop()
def trigger(self):
self.trigger_slaves()
self.channels.update_from_iterable(
[axis.position() for axis in self._axes])
self.wait_slaves()
......@@ -48,6 +48,20 @@ def test_timescan(beacon):
assert numpy.array_equal(scan_data['gaussian'], counter.data)
def test_pointscan(beacon):
session = beacon.get("test_session")
session.setup()
m0 = getattr(setup_globals, 'm0')
counter_class = getattr(setup_globals, 'TestScanGaussianCounter')
counter = counter_class("gaussian", 10, cnt_time=0)
print counter.data
points = [0.0, 1.0, 3.0, 7.0, 8.0, 10.0, 12.0, 15.0, 20.0, 50.0]
s = scans.pointscan(m0, points, 0, counter, return_scan=True, save=False)
assert m0.position() == 50.0
scan_data = scans.get_data(s)
assert numpy.array_equal(scan_data['m0'], points)
assert numpy.array_equal(scan_data['gaussian'], counter.data)
def test_scan_callbacks(beacon):
session = beacon.get("test_session")
session.setup()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment