Commit 09cbe38a authored by id13 blissadm's avatar id13 blissadm

added icepap case in kmap_lut

parent 81e5e9e6
__all__=["CScanMusstChanTrigCalc", "CScanMusstChanStepCalc", "CScanDisplay", "_get_musst_template", "_get_musst_store_list", "_step_per_unit"]
import numpy
import gevent
import sys
import string
import exceptions
from bliss.scanning.chain import AcquisitionChain, AcquisitionChannel
from bliss.scanning.scan import Scan
from bliss.scanning.acquisition.musst import MusstAcquisitionDevice
from bliss.scanning.acquisition.motor import MotorMaster
from bliss.scanning.acquisition.calc import CalcAcquisitionDevice
from bliss.scanning.acquisition.lima import LimaAcquisitionMaster
from bliss.scanning.acquisition.ct2 import CT2AcquisitionMaster
from bliss.scanning.acquisition.mca import McaAcquisitionDevice
from bliss.common import session
from bliss.scanning.writer import hdf5
from bliss.config.settings import Parameters
from bliss import setup_globals
from bliss.data.node import is_zerod
from bliss.common.utils import grouped
from bliss.controllers.ct2.device import AcqMode as P201AcqMode
from bliss.common.measurement import Counter
from bliss.scanning.acquisition.counter import IntegratingCounterAcquisitionDevice
_step_per_unit = lambda mot: mot.encoder.steps_per_unit if mot.encoder else mot.steps_per_unit
class CScanMusstChanTrigCalc(object):
""" Musst Trig Data Channel
trigger up value
trigger down value
mean trigger value ((up+down)/2)
delta position (down - up)
"""
def __init__(self, source_name, factor, dest_name):
self._source_name = source_name
self._factor = factor
self._dest_name = dest_name
self._data = numpy.zeros((0), dtype=numpy.int32)
def prepare(self):
self._data = numpy.zeros((0), dtype=numpy.int32)
def __call__(self, sender, data_dict):
data = data_dict.get(self._source_name)
if data is None:
return {}
self._data = numpy.append(self._data, data)
up_data = list()
down_data = list()
mean_data = list()
delta_data = list()
for up,down in grouped(self._data,2):
up /= self._factor
down /= self._factor
up_data.append(up)
down_data.append(down)
mean_data.append((up+down)/2.)
delta_data.append(down-up)
if not len(up_data):
return {}
self._data= self._data[2*len(up_data):]
return {'%s_%s' % (self._dest_name,x) : numpy.array(data, dtype=numpy.float)\
for x,data in (('mean',mean_data), ('up',up_data), ('down',down_data), ('delta',delta_data))}
@property
def acquisition_channels(self):
return [AcquisitionChannel('%s_%s' % (self._dest_name,x),numpy.float,()) \
for x in ['up','down','mean','delta']]
class CScanMusstChanStepCalc(object):
""" Musst Step Data Channel
channel value between 2 consecutive up triggers
"""
def __init__(self, source_name, factor, dest_name):
self._source_name = source_name
self._factor = factor
self._dest_name = dest_name
self._data = numpy.zeros((0), dtype=numpy.int32)
def __call__(self, sender, data_dict):
data = data_dict.get(self._source_name)
if data is None:
return {}
self._data = numpy.append(self._data, data)
step_data = list()
last_up = self._data[0] / self._factor
for (_, up) in grouped(self._data[1:],2):
up /= self._factor
step_data.append(up - last_up)
last_up = up
if not len(step_data):
return {}
self._data= self._data[2*len(step_data):]
return {'%s_step'%self._dest_name : numpy.array(step_data, dtype=numpy.float)}
@property
def acquisition_channels(self):
return [AcquisitionChannel('%s_step' % self._dest_name,numpy.float,())]
class CScanDisplay(object):
def __init__(self,motors,nb_points):
try:
iter(motors)
except TypeError:
motors = (motors,)
self.motors = motors
self.motor_start_position = dict(((x.name,x.position()) for x in motors))
self.motors_name = dict(((x.name,x) for x in motors))
self.last_motor_pos = dict(((x.name,(x.position() * _step_per_unit(x),-1,0))
for x in motors))
self.motor_display_pos = dict()
self.nb_points = nb_points
def on_state(self,state):
return True
def __call__(self,data_events,nodes,info):
if info.get('state') == Scan.PREPARE_STATE:
for motor in self.motors:
self.motor_display_pos[motor.name] = '%.4f'%motor.position()
for acq_device,events in data_events.iteritems():
data_node = nodes.get(acq_device)
if is_zerod(data_node):
channel_name = data_node.name
try:
type_name,name = channel_name.split('_')
except:
continue
if type_name == 'enc' and name in self.motors_name:
channel = data_node
mot_pos = channel.get(-1) # last
nb_pos = len(channel)
last_pos,last_nb_pos,acc_cnt = self.last_motor_pos.get(name)
if last_nb_pos == nb_pos:
continue
acc_cnt += numpy.int32(mot_pos) - numpy.int32(last_pos)
self.last_motor_pos[name] = (mot_pos,nb_pos,acc_cnt)
start_pos = self.motor_start_position.get(name)
mot = self.motors_name.get(name)
current_pos = start_pos + acc_cnt / _step_per_unit(mot)
display = '%.4f' % current_pos
self.motor_display_pos[name] = display
display = ''
motor_name = None
for motor_name,disp in self.motor_display_pos.iteritems():
display += '%s -> %s ' % (motor_name,disp)
if motor_name is not None:
_,musst_trig_position,_ = self.last_motor_pos.get(motor_name)
display += 'musst trig (%d/%d) ' % (musst_trig_position/2,self.nb_points)
if display:
print display,'\r',
sys.stdout.flush()
def _get_musst_template(master_motor, *other_chans):
""" Return musst replacement template """
(master_name, master_chan)= master_motor
data_alias= []
data_store= []
for (name, chan) in other_chans:
data_alias.append("ALIAS DATA%d = CH%d\n"%(chan, chan))
data_store.append("DATA%d "%(chan))
template_replacement= { \
"$MOTOR_CHANNEL$": "CH%d"%master_chan,
"$DATA_ALIAS$": string.join(data_alias, "\n"),
"$DATA_STORE$": string.join(data_store, " "),
}
return template_replacement
def _get_musst_store_list(master_motor, *other_chans):
""" Return list of channel name ordered by channel index """
chan_list= [ master_motor ] + list(other_chans)
def _chan_sort(x, y):
return cmp(x[1], y[1])
chan_list.sort(_chan_sort)
return ["timer"] + [ name for (name,_) in chan_list ]
......@@ -14,6 +14,8 @@ from bliss.scanning.acquisition.lima import LimaAcquisitionMaster
from bliss.common.cleanup import cleanup, axis as cleanup_axis
from bliss import setup_globals
from contextlib import contextmanager
from .cscantools import *
def akmap(fast_mot, xmin, xmax, x_nb_points,
slow_mot, ymin, ymax, y_nb_points,
......@@ -45,7 +47,6 @@ def akmap(fast_mot, xmin, xmax, x_nb_points,
(c_slow.channel_id,slow_mot.name)))]
store_list += encoder_channel + ['raw_z_adc']
timer_factor = musst_device.get_timer_factor()
x_interval = int(x_nb_points-1)
musst_acq = MusstAcquisitionDevice(musst_device,
program = 'kmap_simple.mprg',
program_start_name = 'KMAP',
......@@ -263,8 +264,14 @@ def akmap_lut(fast_mot, xmin, xmax, x_nb_points,expo_time,
chain = AcquisitionChain(parallel_prepare=True)
fast_mot.controller.output_position_gate(fast_mot, xmin, xmax)
fast_master = MotorMaster(fast_mot, xmin, xmax, x_nb_points*expo_time, undershoot=0.1)
ICEPAP_MODE = fast_mot.controller.__class__.__name__ == 'Icepap'
if not ICEPAP_MODE:
fast_mot.controller.output_position_gate(fast_mot, xmin, xmax)
undershoot=0.1 # piezo
else:
undershoot=None
fast_master = MotorMaster(fast_mot, xmin, xmax, x_nb_points*expo_time, undershoot=undershoot)
top_master = VariableStepTriggerMaster(*slow_motors_positions,
broadcast_len=x_nb_points)
......@@ -277,31 +284,61 @@ def akmap_lut(fast_mot, xmin, xmax, x_nb_points,expo_time,
gate_width = 1e-6 if not vlms_cameras_name else 100e-6
musst_device.ABORT
c_fast = musst_device.get_channel_by_name(fast_mot.name)
store_list = ['time','counter1','counter2','raw_adc3']
source_name = 'raw_%s_adc' % fast_mot.name
store_list.append(source_name)
store_list += ['raw_y_adc','raw_z_adc']
timer_factor = musst_device.get_timer_factor()
x_interval = int(x_nb_points-1)
vars = [{"NPOINTS_LINE" : x_nb_points,
"NLINES" : 1,
"SAMPLING_PERIOD" : int(numpy.ceil(expo_time * timer_factor)),
"GATE_WIDTH" : int(numpy.ceil(gate_width * timer_factor)),
if not ICEPAP_MODE:
store_list = ['time','counter1','counter2','raw_adc3']
source_name = 'raw_%s_adc' % fast_mot.name
store_list.append(source_name)
store_list += ['raw_y_adc','raw_z_adc']
vars = [{"NPOINTS_LINE" : x_nb_points,
"NLINES" : 1,
"SAMPLING_PERIOD" : int(numpy.ceil(expo_time * timer_factor)),
"GATE_WIDTH" : int(numpy.ceil(gate_width * timer_factor)),
}]
vars += [] * top_master.npoints
musst_acq = MusstAcquisitionDevice(musst_device,
program = 'kmap_simple.mprg',
program_start_name = 'KMAP',
#program_abort_name = 'CLEANUP',
store_list = store_list,
vars = vars)
else:
enc_name = 'enc_%s' % fast_mot.name
enc_start = int(xmin * fast_mot.steps_per_unit)
template_replacement = _get_musst_template((enc_name, c_fast.channel_id))
store_list = _get_musst_store_list((enc_name, c_fast.channel_id))
vars = [{"POSSTART" : enc_start,
"TIMEDELTA" : int((expo_time + gate_width) * timer_factor),
"SCANMODE" : 0,
"NPULSES" : x_nb_points,
"GATEWIDTH" : int(numpy.ceil(expo_time*timer_factor)),
}]
vars += [] * top_master.npoints
musst_acq = MusstAcquisitionDevice(musst_device,
program = 'kmap_simple.mprg',
program_start_name = 'KMAP',
#program_abort_name = 'CLEANUP',
store_list = store_list,
vars = vars)
vars += [] * top_master.npoints
musst_acq = MusstAcquisitionDevice(musst_device,
program = 'contscan.mprg',
program_start_name = 'CONTSCAN',
#program_abort_name = 'CONTSCAN_CLEAN',
store_list = store_list,
program_template_replacement=template_replacement,
vars = vars)
chain.add(fast_master, musst_acq)
#Piezo position calculation
scaling, offset = c_fast.switch.scaling_and_offset
dest_name = '%s_position' % fast_mot.name
conversion = lambda data : (data * (10. / 0x7fffffff)) / scaling - offset
fast_master.add_external_channel(musst_acq, source_name, dtype=numpy.float,
rename=dest_name, conversion=conversion)
if not ICEPAP_MODE:
scaling, offset = c_fast.switch.scaling_and_offset
dest_name = '%s_position' % fast_mot.name
conversion = lambda data : (data * (10. / 0x7fffffff)) / scaling - offset
fast_master.add_external_channel(musst_acq, source_name, dtype=numpy.float,
rename=dest_name, conversion=conversion)
else:
# synchronize the encoder
c_fast.value = fast_mot.position() * fast_mot.steps_per_unit
calc_up_counter = '%s_up' % fast_mot.name
mean_pos = CScanMusstChanTrigCalc(enc_name,_step_per_unit(fast_mot), fast_mot.name)
calc_device = CalcAcquisitionDevice('mean_pos',(musst_acq,),
mean_pos,mean_pos.acquisition_channels)
chain.add(fast_master,calc_device)
dest_name = '%s_position' % fast_mot.name
fast_master.add_external_channel(calc_device, calc_up_counter,rename=dest_name)
nb_points = x_nb_points * top_master.npoints
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment