Commit cc1c1344 authored by bliss administrator's avatar bliss administrator

Blackify PEL code

parent c1474216
Pipeline #10097 failed with stages
from bliss.common.scans import *
from bliss.common.plot import *
def PELscan(mot, p_st, p_en, nbp, t_int, *counters, **kwargs):
display_help = kwargs.pop('help', False)
display_help = kwargs.pop("help", False)
if display_help:
print "PELscan(mot, start, stop, nbp, int_time, interf1, counter.... nbscan=n, backlash=val, bidir=True/False, sleep_time=t(s)"
print(
"PELscan(mot, start, stop, nbp, int_time, interf1, counter.... nbscan=n, backlash=val, bidir=True/False, sleep_time=t(s)"
)
return
nbs = kwargs.pop('nbs', 1)
backlash = kwargs.pop('backlash', 0)
bidir = kwargs.pop('bidir', False)
return_scan = kwargs.get('return_scan', False)
kwargs['return_scan'] = True
nbs = kwargs.pop("nbs", 1)
backlash = kwargs.pop("backlash", 0)
bidir = kwargs.pop("bidir", False)
return_scan = kwargs.get("return_scan", False)
kwargs["return_scan"] = True
for loop in range(nbs):
if bidir and loop%2 == 1:
pos_bckl = p_en + backlash
if bidir and loop % 2 == 1:
pos_bckl = p_en + backlash
pos_start = p_en
pos_end = p_st
pos_end = p_st
else:
pos_bckl = p_st - backlash
pos_bckl = p_st - backlash
pos_start = p_st
pos_end = p_en
print "Move motor \"%s\" to %g"%(mot.name, pos_bckl)
pos_end = p_en
print('Move motor "%s" to %g' % (mot.name, pos_bckl))
mot.move(pos_bckl)
print mot.position()
print(mot.position())
scan = ascan(mot, pos_start, pos_end, nbp, t_int, *counters, **kwargs)
data = get_data(scan)
# plot motor pos vs. counters by default
# do not plot interferometer since it doesn't give a useful result
counters_to_plot = [name for name in data.dtype.fields
if name not in ('timestamp', mot.name) and not name.startswith('interf')]
counters_to_plot = [
name
for name in data.dtype.fields
if name not in ("timestamp", mot.name) and not name.startswith("interf")
]
# Plotting de-activated by CG 6 aug. 2018... causes crash with flint :(
# x = data[mot.name]
......
......@@ -22,80 +22,91 @@ from bliss.common.measurement import IntegratingCounter
from bliss.config.settings import HashSetting
from bliss.scanning.chain import AcquisitionMaster
class TrigCounter(IntegratingCounter):
def __init__(self, name, config):
IntegratingCounter.__init__(self, name, None, self)
trig_config = config['trigger_device']
self._opiom_card = trig_config.get('name')
self._opiom_counter = trig_config.\
get('counter_number', 1) # default 1
high_time = trig_config.get('high_time', 1)
period = trig_config.get('period', 1)
nb_pulses = trig_config.get('nb_pulses', 1)
self.__trigger_clock = config.get('_clock', 2000000)
self._counter_config = HashSetting('%s:counter_config' % self.name,
default_values={'high_time' : high_time,
'period' : period,
'nb_pulses' : nb_pulses})
trig_config = config["trigger_device"]
self._opiom_card = trig_config.get("name")
self._opiom_counter = trig_config.get("counter_number", 1) # default 1
high_time = trig_config.get("high_time", 1)
period = trig_config.get("period", 1)
nb_pulses = trig_config.get("nb_pulses", 1)
self.__trigger_clock = config.get("_clock", 2000000)
self._counter_config = HashSetting(
"%s:counter_config" % self.name,
default_values={
"high_time": high_time,
"period": period,
"nb_pulses": nb_pulses,
},
)
@property
def clock(self):
return self.__trigger_clock
@property
def high_time(self):
"""
High time in second for the pulse
"""
return self._counter_config['high_time']
return self._counter_config["high_time"]
@high_time.setter
def high_time(self, value):
self._counter_config['high_time'] = value
self._counter_config["high_time"] = value
@property
def period(self):
"""
period in second for the pulse (high + low level)
"""
return self._counter_config['period']
return self._counter_config["period"]
@period.setter
def period(self, value):
self._counter_config['period'] = value
self._counter_config["period"] = value
@property
def nb_pulses(self):
"""
Nb pulses generated when trigged
"""
return self._counter_config['nb_pulses']
return self._counter_config["nb_pulses"]
@nb_pulses.setter
def nb_pulses(self, value):
self._counter_config['nb_pulses'] = value
self._counter_config["nb_pulses"] = value
def get_value(self, from_index=0):
return [self.nb_pulses]
def create_master_device(self, scan_pars, **settings):
npoints = scan_pars.get('npoints', 1)
count_time = scan_pars.get('count_time', 1)
npoints = scan_pars.get("npoints", 1)
count_time = scan_pars.get("count_time", 1)
return _TrigCounterAcqMaster(self, self.name,
npoints=npoints,
count_time=count_time)
return _TrigCounterAcqMaster(
self, self.name, npoints=npoints, count_time=count_time
)
def __repr__(self):
params = ' '.join(['%s=%s' % (k, v) for k, v in
self._counter_config.iteritems()])
params = " ".join(
["%s=%s" % (k, v) for k, v in self._counter_config.iteritems()]
)
return "TrigCounter %s: %s" % (self.name, params)
class _TrigCounterAcqMaster(AcquisitionMaster):
def __init__(self, device, name, npoints=None, count_time=None):
AcquisitionMaster.__init__(self, device, name, npoints=npoints,
prepare_once=True, start_once=True)
AcquisitionMaster.__init__(
self, device, name, npoints=npoints, prepare_once=True, start_once=True
)
self._count_time = count_time
def prepare(self):
......@@ -105,12 +116,13 @@ class _TrigCounterAcqMaster(AcquisitionMaster):
low_time = (self.device.period - self.device.high_time) * self.device.clock
nb_pulses = self.device.nb_pulses
opiom.comm("CNT %d RESET" % counter)
opiom.comm("CNT %d CLK2 PULSE %d %d %d" %\
(counter, high_time, low_time, nb_pulses))
opiom.comm(
"CNT %d CLK2 PULSE %d %d %d" % (counter, high_time, low_time, nb_pulses)
)
def add_counter(self, counter):
pass
def start(self):
if not self.parent:
self.trigger()
......@@ -135,5 +147,3 @@ class _TrigCounterAcqMaster(AcquisitionMaster):
if not running:
break
gevent.idle()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment