...
 
Commits (18)
......@@ -23,6 +23,7 @@ from .connection import SpecConnection
from .reply import SpecReply
from .wait import waitConnection
from .error import SpecClientTimeoutError, SpecClientError, SpecClientNotConnectedError
from bliss.common import event
class wrap_errors(object):
......@@ -157,7 +158,7 @@ class SpecCommandA(BaseSpecCommand):
event.disconnect(self.connection, "connected", self._connected)
event.disconnect(self.connection, "disconnected", self._disconnected)
self.connection = connection.SpecConnection(specVersion)
self.connection = SpecConnection(specVersion)
self.specVersion = specVersion
event.connect(self.connection, "connected", self._connected)
......@@ -193,22 +194,14 @@ class SpecCommandA(BaseSpecCommand):
self.beginWait()
with gevent.Timeout(timeout, SpecClientTimeoutError):
waiter = SpecWaitObject(self.connection)
waiter.waitConnection()
if self.connection.serverVersion < 3:
if isinstance(command, str):
id = self.connection.send_msg_cmd_with_return(
command, self.replyArrived
)
else:
if isinstance(command, str):
id = self.connection.send_msg_cmd_with_return(
command, self.replyArrived
)
else:
id = self.connection.send_msg_func_with_return(
command, self.replyArrived
)
id = self.connection.send_msg_func_with_return(
command, self.replyArrived
)
t = gevent.spawn(wrap_errors(wait_end_of_spec_cmd), self)
......@@ -250,30 +243,3 @@ class SpecCommandA(BaseSpecCommand):
return
self.connection.abort()
class SpecCommand(SpecCommandA):
def __init__(self, *args, **kwargs):
SpecCommandA.__init__(self, *args, **kwargs)
def connectToSpec(self, specVersion, timeout):
SpecCommandA.connectToSpec(self, specVersion, timeout)
if not self.connection.isSpecConnected():
with gevent.Timeout(timeout, SpecClientTimeoutError):
waitConnection(self.connection, timeout)
self._connected()
def abort(self):
if self.connection is None or not self.connection.isSpecConnected():
return
self.connection.abort(wait=True)
def __call__(self, *args, **kwargs):
wait = kwargs.get("wait", True)
timeout = kwargs.get("timeout", None)
return SpecCommandA.__call__(self, *args, wait=wait, timeout=timeout)
def executeCommand(self, command, wait=True, timeout=None):
return SpecCommandA.executeCommand(self, command, wait, timeout)
......@@ -223,20 +223,6 @@ class Shutter(object):
def __exit__(self, exc_type, exc_value, traceback):
self.close()
@property
def is_open(self):
return self.state == self.OPEN
@property
def is_closed(self):
return self.state == self.CLOSED
def __enter__(self):
self.open()
def __exit__(self, exc_type, exc_value, traceback):
self.close()
@property
def external_control(self):
return self._external_ctrl
......
......@@ -196,7 +196,7 @@ class Icepap(Controller):
status = axis._state()
else:
last_power_time = self._last_axis_power_time.get(axis, 0)
if time.time() - last_power_time < 1.0:
if time.time() - last_power_time < 1.0 and not isinstance(axis, LinkedAxis):
status = int(_command(self._cnx, "%s:?STATUS" % axis.address), 16)
else:
self._last_axis_power_time.pop(axis, None)
......
......@@ -5,8 +5,15 @@
# Copyright (c) 2017 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
from bliss.common.axis import Axis, NoSettingsAxis, DEFAULT_POLLING_TIME, lazy_init
from bliss.common.axis import (
Axis,
NoSettingsAxis,
DEFAULT_POLLING_TIME,
lazy_init,
Motion,
)
from . import _ackcommand, _command
import gevent
class LinkedAxis(Axis):
......@@ -133,15 +140,20 @@ class LinkedAxis(Axis):
# create motion object for hooks
motion = Motion(self, switch, None, "homing")
self.__execute_pre_move_hook(motion)
self._Axis__execute_pre_move_hook(motion)
def start_one(controller, motions):
cnx = controller._cnx
cmd = "HOME STRICT %s %s" % (
motions[0].axis.address,
("+1" if motions[0].target_pos > 0 else "-1"),
home_dir = "+1" if motions[0].target_pos > 0 else "-1"
cmd = "#HOME STRICT %s" % " ".join(
(str(rm.address) + " " + home_dir for rm in self.real_axes)
)
_command(
cnx,
cmd,
pre_cmd="#DISPROT ALL %s ; "
% " ".join((str(rm.address) for rm in self.real_axes)),
)
_ackcommand(cnx, cmd)
# IcePAP status is not immediately MOVING after home search command is sent
gevent.sleep(0.2)
......
......@@ -461,7 +461,15 @@ class musst(object):
buffer_size, nb_buffer = self.get_event_buffer_size()
buffer_memory = buffer_size * nb_buffer
current_offset, current_buffer_id = self.get_event_memory_pointer()
for i in range(10):
curr_state = self.STATE
current_offset, current_buffer_id = self.get_event_memory_pointer()
if current_buffer_id == 0 and current_offset != 64:
break
if curr_state != self.RUN_STATE:
break
gevent.sleep(100e-3) # wait a little bit before re-asking
current_offset = current_buffer_id * buffer_size + current_offset
from_offset = (from_event_id * nb_counters) % buffer_memory
......
"""Controller for the mythen 1d detector."""
import numpy
from .mythenlib import MythenInterface
from bliss.common.measurement import BaseCounter, counter_namespace
from bliss.scanning.chain import AcquisitionDevice, AcquisitionChannel
def interface_property(name, mode=False):
getter_name = name + "_enabled" if mode else "get_" + name
setter_name = "enable_" + name if mode else "set_" + name
assert getter_name in dir(MythenInterface)
assert setter_name in dir(MythenInterface)
def getter(self):
return getattr(self._interface, getter_name)()
def setter(self, value):
return getattr(self._interface, setter_name)(value)
return property(getter, setter)
class Mythen(object):
_settings = [
# General configuration
"nmodules",
# Acquisition configuration
"delay_after_frame",
"nframes",
"nbits",
"exposure_time",
# Detector configuration
"energy",
"threshold",
# Data correction settings
"bad_channel_interpolation",
"flat_field_correction",
"rate_correction",
"rate_correction_deadtime",
# Trigger / Gate settings
"continuous_trigger_mode",
"single_trigger_mode",
"delay_before_frame",
"gate_mode",
"ngates",
"input_polarity",
"output_polarity",
]
def __init__(self, name, config):
self._name = name
self._config = config
self._hostname = config["hostname"]
self._interface = MythenInterface(self._hostname)
self._apply_configuration()
self._counter = MythenCounter(self)
def finalize(self):
self._interface.close()
# Counter access
@property
def counters(self):
return counter_namespace([self._counter])
# Manage configuration
def _apply_configuration(self):
if self._config.get("apply_defaults"):
self.reset()
for key, value in self._config.items():
if key in self._settings:
setattr(self, key, value)
def _get_configuration(self):
return [(key, getattr(self, key)) for key in self._settings]
def __repr__(self):
lines = ["Mythen on {}:".format(self._hostname)]
lines += [
" {:<25s} = {}".format(key, value)
for key, value in self._get_configuration()
]
return "\n".join(lines)
# Bliss properties
@property
def name(self):
return self._name
@property
def hostname(self):
return self._hostname
# General configuration
nmodules = interface_property("nmodules")
# Acquisition configuration
delay_after_frame = interface_property("delayafterframe")
nframes = interface_property("nframes")
nbits = interface_property("nbits")
exposure_time = interface_property("exposure_time")
# Detector configuration
energy = interface_property("energy")
threshold = interface_property("kthresh")
# Data correction configuration
bad_channel_interpolation = interface_property("badchannelinterpolation", mode=True)
flat_field_correction = interface_property("flatfieldcorrection", mode=True)
rate_correction = interface_property("ratecorrection", mode=True)
rate_correction_deadtime = interface_property("ratecorrection_deadtime")
# Trigger / Gate configuration
continuous_trigger_mode = interface_property("continuoustrigger", mode=True)
single_trigger_mode = interface_property("singletrigger", mode=True)
gate_mode = interface_property("gatemode", mode=True)
delay_before_frame = interface_property("delaybeforeframe")
ngates = interface_property("ngates")
input_polarity = interface_property("inputpolarity")
output_polarity = interface_property("outputpolarity")
# Expose all interface getters
def __getattr__(self, attr):
if attr.startswith("get_") and attr in dir(self._interface):
return getattr(self._interface, attr)
raise AttributeError(attr)
def __dir__(self):
lst = list(self.__dict__)
lst += dir(type(self))
lst += [key for key in dir(self._interface) if key.startswith("get_")]
return sorted(set(lst))
# Commands
def reset(self):
self._interface.reset()
def start(self):
self._interface.start()
def stop(self):
self._interface.stop()
def readout(self):
return self._interface.readout(1)[0]
# Acquisition routine
def run(self, acquisition_number=1, acquisition_time=1.):
self.nframes = acquisition_number
self.exposure_time = acquisition_time
try:
self.start()
for _ in range(acquisition_number):
yield self.readout()
finally:
self.stop()
# Mythen counter
class MythenCounter(BaseCounter):
# Initialization
def __init__(self, controller):
self._name = "spectrum"
self._controller = controller
@property
def name(self):
return self._name
@property
def controller(self):
return self._controller
# Data properties
@property
def dtype(self):
return numpy.int32
@property
def shape(self):
return (self.controller.get_nchannels(),)
# Get acquisition device
def create_acquisition_device(self, scan_pars, **settings):
scan_pars.update(settings)
count_time = scan_pars.pop("count_time")
return MythenAcquistionDevice(self, count_time, **scan_pars)
class MythenAcquistionDevice(AcquisitionDevice):
# Initialization
def __init__(self, counter, count_time, **kwargs):
self.kwargs = kwargs
self.counter = counter
self.count_time = count_time
valid_names = ("npoints", "trigger_type", "prepare_once", "start_once")
valid_kwargs = {
key: value for key, value in kwargs.items() if key in valid_names
}
super(MythenAcquistionDevice, self).__init__(
counter.controller, counter.controller.name, **valid_kwargs
)
self.channels.append(
AcquisitionChannel(counter.name, counter.dtype, counter.shape)
)
def add_counter(self, counter):
assert self.counter == counter
# Flow control
def prepare(self):
self.device.nframes = self.npoints
self.device.exposure_time = self.count_time
def start(self):
self.device.start()
def trigger(self):
# Software trigger is not supported by the mythen
pass
def reading(self):
spectrum = self.device.readout()
self.channels.update({self.counter.name: spectrum})
def stop(self):
self.device.stop()
import enum
import socket
import operator
from functools import partial
from collections import namedtuple
from six.moves import reduce
import numpy as np
from gevent.lock import Semaphore as Lock
# Constants
UDP_PORT = 1030
TCP_PORT = 1031
RETURN_TYPES = {
"char": (1, bytes.decode),
"int": (4, partial(np.frombuffer, dtype=np.int32)),
"float": (4, partial(np.frombuffer, dtype=np.float32)),
"long long": (8, partial(np.frombuffer, dtype=np.int64)),
}
MATERIALS = {0: "silicon"}
ERROR_CODES = {
-1: "Unknown command",
-2: "Invalid argument",
-3: "Unknown settings",
-4: "Out of memory",
-5: "Module calibration files not found",
-6: "Readout failed",
-7: "Acquisition not finished",
-8: "Failure while reading temperature and humidity sensor",
-9: "Invalid license key",
-10: "Flatfield file not found",
-11: "Bad channel file not found",
-12: "Energy calibration file not found",
-13: "Noise file not found",
-14: "Trimbit file not found",
-15: "Invalid format of the flatfield file",
-16: "Invalid format of the bad channel file",
-17: "Invalid format of the energy calibration file",
-18: "Invalid format of the noise file",
-19: "Invalid format of the trimbit file",
-20: "Version file not found",
-21: "Invalid format of the version file",
-22: "Gain calibration file not found",
-23: "Invalid format of the gain calibration file",
-24: "Dead time file not found",
-25: "Invalid format of the dead time file",
-26: "High voltage file not found",
-27: "Invalid format of high voltage file",
-28: "Energy threshold relation file not found",
-29: "Invalid format of the energy threshold relation file",
-30: "Could not create log file",
-31: "Could not close log file",
-32: "Could not read log file",
-50: "No modules connected",
-51: "Error during module communication",
-52: "DCS initialization failed",
-53: "Could not store customer flatfield",
}
# Types
class MythenError(IOError):
pass
class MythenCommandError(MythenError):
def __init__(self, error_code, command):
self.error_code = error_code
self.error_message = ERROR_CODES.get(error_code)
self.command = command
self.args = error_code, self.error_message, command
def __str__(self):
return "[Error {}] {} ({!r})".format(
self.error_code, self.error_message, self.command
)
class MythenCompatibilityError(MythenError):
def __init__(self, version, requirement, command):
self.version = version
self.requirement = requirement
self.command = command
self.args = version, requirement, command
def __str__(self):
str_version = ".".join(map(str, self.version))
str_requirement = self.requirement[:2] + " " + self.requirement[2:]
return "[Version {}] Command {!r} requires version {}".format(
str_version, self.command, str_requirement
)
Status = namedtuple("Status", "running inactive_exposure empty_buffer")
Polarity = enum.IntEnum("Polarity", "Low High", start=0)
# Helpers
def run_command(sock, command, return_type="int", return_shape=(), payload=b""):
# Get return info
return_size = reduce(operator.mul, return_shape, 1)
nbytes, decode = RETURN_TYPES[return_type]
nbytes_error, decode_error = RETURN_TYPES["int"]
# Craft the command
if not isinstance(command, str):
command = " ".join(map(str, command))
# Send the command
raw_command = command.encode()
if payload:
raw_command += b" " + payload
sock.sendall(raw_command)
# Get the reply
raw_data = b""
total_size = nbytes * return_size
while total_size != len(raw_data):
raw_data += sock.recv(nbytes * return_size)
# Detect error packet
if len(raw_data) != nbytes_error:
continue
# Detect error code
error_code = np.asscalar(decode_error(raw_data))
if error_code >= 0:
continue
# Raise mythen error
raise MythenCommandError(error_code, command)
# Decode the data
data = decode(raw_data)
# Return string
if return_type == "char":
return data.split("\x00")[0]
# Return scalar
if return_shape == ():
return np.asscalar(data)
# Return array
data.shape = return_shape
return data
def convert(array, nbits):
dtypes = {24: "int32", 16: "int16", 8: "int8", 4: "int8"}
array.dtype = dtypes[nbits]
if nbits == 24:
array = array << 8 >> 8
if nbits == 4:
array = np.stack([array << 4 >> 4, array >> 4], axis=-1)
array.shape = (array.size,)
return array
# Mythen interface
class MythenInterface:
def __init__(self, hostname, protocol=socket.SOCK_STREAM, timeout=15.):
self._lock = Lock()
self._hostname = hostname
self._port = TCP_PORT if protocol == socket.SOCK_STREAM else UDP_PORT
self._sock = socket.socket(socket.AF_INET, protocol)
self._sock.settimeout(timeout)
self._sock.connect((self._hostname, self._port))
self._version = self.get_version()
def close(self):
self._sock.close()
def _run_command(self, *args, **kwargs):
with self._lock:
return run_command(self._sock, *args, **kwargs)
def _check_version(self, requirement, command):
sign, version = requirement[:2], requirement[2:]
sign = {">=": operator.ge, "<=": operator.le}[sign]
version = tuple(map(int, version.split(".")))
if not sign(self._version, version):
raise MythenCompatibilityError(self._version, requirement, command)
# Special getters
def get_version(self):
version = self._run_command("-get version", "char", (7,))
return tuple(map(int, version[1:].split(".")))
def get_nmodules(self):
return self._run_command("-get nmodules", "int")
def get_modchannels(self):
nmodules = self.get_nmodules()
if self._version < (4,):
return (1280,) * nmodules
shape = (nmodules,)
modchannels = self._run_command("-get modchannels", "int", shape)
return tuple(modchannels)
def get_nchannels(self):
modchannels = self.get_modchannels()
return sum(modchannels)
# General getters
def get_assemblydate(self):
assemblydate = self._run_command("-get assemblydate", "char", (50,))
return assemblydate.rstrip("\n")
def get_badchannels(self):
shape = (self.get_nchannels(),)
return self._run_command("-get badchannels", "int", shape)
def get_commandid(self):
return self._run_command("-get commandid", "int")
def get_commandsetid(self):
command = "-get commandsetid"
self._check_version(">=4.0.0", command)
return self._run_command(command, "int")
def get_dcstemperature(self):
command = "-get dcstemperature"
self._check_version(">=4.0.0", command)
return self._run_command(command, "float")
def get_frameratemax(self):
command = "-get frameratemax"
self._check_version(">=4.0.0", command)
return self._run_command(command, "float")
def get_fwversion(self):
command = "-get fwversion"
self._check_version(">=4.0.0", command)
return self._run_command(command, "char", (9,))
def get_humidity(self):
command = "-get humidity"
self._check_version(">=4.0.0", command)
shape = (self.get_nmodules(),)
array = self._run_command(command, "float", shape)
return tuple(array)
def get_highvoltage(self):
command = "-get hv"
self._check_version(">=4.0.0", command)
shape = (self.get_nmodules(),)
array = self._run_command(command, "int", shape)
return tuple(array)
def get_modfwversion(self):
command = "-get modfwversion"
self._check_version(">=4.0.1", command)
shape = (self.get_nmodules() * 8 + 1,)
return self._run_command(command, "char", shape)
def get_modnum(self):
shape = (self.get_nmodules(),)
array = self._run_command("-get modnum", "int", shape)
return tuple(map(hex, array))
def get_module(self):
return self._run_command("-get module", "int")
def get_nmaxmodules(self):
return self._run_command("-get nmaxmodules", "int")
def get_sensormaterial(self):
shape = (self.get_nmodules(),)
array = self._run_command("-get sensormaterial", "int", shape)
return tuple(MATERIALS.get(x, x) for x in array)
def get_sensorthickness(self):
shape = (self.get_nmodules(),)
array = self._run_command("-get sensorthickness", "int", shape)
return tuple(array)
def get_sensorwidth(self):
command = "-get sensorwidth"
self._check_version(">=4.0.0", command)
shape = (self.get_nmodules(),)
array = self._run_command(command, "int", shape)
return tuple(array)
def get_systemnum(self):
return self._run_command("-get systemnum", "int")
def get_temperature(self):
command = "-get temperature"
self._check_version(">=4.0.0", command)
shape = (self.get_nmodules(),)
array = self._run_command(command, "float", shape)
return tuple(array)
def get_testversion(self):
command = "-get testversion"
self._check_version(">=4.0.0", command)
return self._run_command(command, "char", (7,))
# General commands
def select_module(self, module_id):
command = "-module {}".format(module_id)
self._run_command(command, "int")
def select_all_modules(self):
self.select_module(0xffff)
def set_nmodules(self, nmodules):
command = "-nmodules {}".format(nmodules)
self._run_command(command, "int")
def reset(self):
self._run_command("-reset", "int")
# Acquisition control
def get_delayafterframe(self):
time_100ns = self._run_command("-get delafter", "long long")
return float(time_100ns) / 1e7
def set_delayafterframe(self, time=0.):
time_100ns = int(time * 1e7)
command = "-delafter {}".format(time_100ns)
self._run_command(command, "int")
def get_nframes(self):
return self._run_command("-get frames", "int")
def set_nframes(self, nframes=1):
command = "-frames {}".format(nframes)
self._run_command(command, "int")
def get_nbits(self):
return self._run_command("-get nbits", "int")
def set_nbits(self, nbits=24):
command = "-nbits {}".format(nbits)
self._run_command(command, "int")
def get_exposure_time(self):
time_100ns = self._run_command("-get time", "long long")
return float(time_100ns) / 1e7
def set_exposure_time(self, time=1.):
time_100ns = int(time * 1e7)
command = "-time {}".format(time_100ns)
self._run_command(command, "int")
def get_status(self):
bits = self._run_command("-get status", "int")
args = bits & 0x00001, bits & 0x00008, bits & 0x10000
return Status(*map(bool, args))
def get_readouttimes(self):
command = "-get readouttimes"
self._check_version(">=4.0.0", command)
array = self._run_command(command, "long long", (4,))
return dict(zip([24, 16, 8, 4], array))
def start(self):
self._run_command("-start", "int")
def stop(self):
self._run_command("-stop", "int")
def readout(self, n=1):
command = "-readout {}".format(n)
if n > 1:
self._check_version(">=4.0.0", command)
shape = n, self.get_nchannels()
return self._run_command(command, "int", shape)
def raw_readout(self, nbits=None, nchannels=None):
command = "-readoutraw"
self._check_version("<=3.0.0", command)
# Get nbits argument
if nbits is None:
nbits = self.get_nbits()
# Get nchannels argument
if nchannels is None:
nchannels = self.get_nchannels()
# Get shape
nbits_to_factor = {24: 1, 16: 2, 8: 4, 4: 8}
shape = (nchannels // nbits_to_factor[nbits],)
# Run the command
array = self._run_command(command, "int", shape)
# Decode result
return convert(array, nbits)
# Detector settings
def get_energy(self):
shape = (self.get_nmodules(),)
array = self._run_command("-get energy", "float", shape)
return tuple(array)
def set_energy(self, energy=8.05):
command = "-energy {}".format(energy)
self._run_command(command, "int")
def get_energymin(self):
shape = (self.get_nmodules(),)
array = self._run_command("-get energymin", "float", shape)
return tuple(array)
def get_energymax(self):
shape = (self.get_nmodules(),)
array = self._run_command("-get energymax", "float", shape)
return tuple(array)
def get_kthresh(self):
shape = (self.get_nmodules(),)
array = self._run_command("-get kthresh", "float", shape)
return tuple(array)
def set_kthresh(self, kthresh=6.4):
command = "-kthresh {}".format(kthresh)
self._run_command(command, "int")
def get_kthreshmin(self):
shape = (self.get_nmodules(),)
array = self._run_command("-get kthreshmin", "float", shape)
return tuple(array)
def get_kthreshmax(self):
shape = (self.get_nmodules(),)
array = self._run_command("-get kthreshmax", "float", shape)
return tuple(array)
def set_kthresh_and_energy(self, kthresh=6.4, energy=8.05):
command = "-kthreshenergy {} {}".format(kthresh, energy)
self._run_command(command, "int")
def load_predefined_settings(self, element):
element = element.capitalize().strip()
if element not in ("Cu", "Mo", "Cr", "Ag"):
raise ValueError("Element {} is not supported".format(element))
command = "-settings {}".format(element)
self._run_command(command, "int")
# Data correction
def badchannelinterpolation_enabled(self):
command = "-get badchannelinterpolation"
value = self._run_command(command, "int")
return bool(value)
def enable_badchannelinterpolation(self, enable):
command = "-badchannelinterpolation {}".format(int(enable))
self._run_command(command, "int")
def flatfieldcorrection_enabled(self):
command = "-get flatfieldcorrection"
value = self._run_command(command, "int")
return bool(value)
def enable_flatfieldcorrection(self, enable):
command = "-flatfieldcorrection {}".format(int(enable))
self._run_command(command, "int")
def get_flatfield(self):
shape = (self.get_nchannels(),)
return self._run_command("-get flatfield", "int", shape)
def load_flatfield(self, slot):
command = "-loadflatfield {}".format(slot)
self._check_version(">=4.0.0", command)
self._run_command(command, "int")
def set_flatfield(self, slot, correction):
command = "-flatfield {}".format(slot)
self._check_version(">=4.0.0", command)
correction = np.array(correction, dtype="uint32")
correction.shape = (self.get_nchannels(),)
payload = correction.tobytes()
self._run_command(command, "int", payload=payload)
def get_flatfield_cutoff(self):
return self._run_command("-get cutoff", "int")
def ratecorrection_enabled(self):
command = "-get ratecorrection"
value = self._run_command(command, "int")
return bool(value)
def enable_ratecorrection(self, enable):
command = "-ratecorrection {}".format(int(enable))
self._run_command(command, "int")
def get_ratecorrection_deadtime(self):
shape = (self.get_nmodules(),)
tau_ns = self._run_command("-get tau", "float", shape)
return tuple(tau_ns / 1e9)
def set_ratecorrection_deadtime(self, tau):
# Use predefined values
if tau < 0:
tau_ns = -1
# Use provided argument
else:
tau_ns = int(tau * 1e9)
command = "-tau {}".format(tau_ns)
self._run_command(command, "int")
# Trigger / Gate
def enable_continuoustrigger(self, enable):
command = "-conttrigen {}".format(int(enable))
self._run_command(command, "int")
def continuoustrigger_enabled(self):
command = "-get conttrig"
value = self._run_command(command, "int")
return bool(value)
def enable_singletrigger(self, enable):
command = "-trigen {}".format(int(enable))
self._run_command(command, "int")
def singletrigger_enabled(self):
command = "-get trig"
value = self._run_command(command, "int")
return bool(value)
def get_delaybeforeframe(self):
command = "-get delbef"
delay_100ns = self._run_command(command, "long long")
return delay_100ns / 1e7
def set_delaybeforeframe(self, delay=0.):
delay_100ns = int(delay * 1e7)
command = "-delbef {}".format(delay_100ns)
self._run_command(command, "int")
def enable_gatemode(self, enable):
command = "-gateen {}".format(int(enable))
self._run_command(command, "int")
def gatemode_enabled(self):
command = "-get gate"
value = self._run_command(command, "int")
return bool(value)
def get_ngates(self):
command = "-get gates"
return self._run_command(command, "int")
def set_ngates(self, ngates=1):
command = "-gates {}".format(ngates)
self._run_command(command, "int")
def get_inputpolarity(self):
command = "-get inpol"
self._check_version("<=3.0.0", command)
value = self._run_command(command, "int")
return Polarity(value)
def set_inputpolarity(self, polarity):
command = "-inpol {}".format(int(polarity))
self._check_version("<=3.0.0", command)
self._run_command(command, "int")
def get_outputpolarity(self):
command = "-get outpol"
self._check_version("<=3.0.0", command)
value = self._run_command(command, "int")
return Polarity(value)
def set_outputpolarity(self, polarity):
command = "-outpol {}".format(int(polarity))
self._check_version("<=3.0.0", command)
self._run_command(command, "int")
# Debugging
def start_logging(self):
command = "-log start"
self._run_command(command, "int")
def stop_logging(self):
command = "-log stop"
size = self._run_command(command, "int")
command = "-log read"
return self._run_command(command, "char", (size,))
def logging_running(self):
command = "-log status"
self._check_version(">=4.0.1", command)
value = self._run_command(command, "int")
return bool(value)
def test_pattern(self):
command = "-testpattern"
shape = (self.get_nchannels(),)
return self._run_command(command, "int", shape)
......@@ -58,24 +58,32 @@ class Opiom:
self.__program = config_tree["program"]
self.__base_path = config_tree.get("opiom_prg_root", OPIOM_PRG_ROOT)
self.__debug = False
try:
msg = self.comm("?VER", timeout=50e-3)
except serial.SerialTimeout:
msg = self.comm("?VER", timeout=50e-3)
if not msg.startswith("OPIOM"):
# Sometimes, have to talk twice to the OPIOM in order to get the proper first answer.
for ii in range(2):
try:
msg = self.comm("?VER", timeout=50e-3)
except serial.SerialTimeout:
msg = ""
if msg.startswith("OPIOM"):
break
else:
raise IOError("No opiom connected at %s" % serial)
self.comm("MODE normal")
def __repr__(self):
return "Opiom : %s with program %s" % (self._cnx, self.__program)
return "opiom: %s" % self._cnx
def setDebug(self, flag):
self.__debug = flag is True
def getDebug(self):
@property
def debug(self):
return self.__debug
@debug.setter
def debug(self, flag):
self.__debug = bool(flag)
def __debugMsg(self, wr, msg):
if self.__debug:
print "%-5.5s on %s > %s" % (wr, self.name, msg)
......@@ -142,59 +150,80 @@ class Opiom:
self.__debugMsg("Read", msg.strip("\n\r"))
return msg.strip("\r\n")
def load_program(self):
def load_program(self, prog_name=None):
pldid = self.comm("?PLDID")
file_pldid, file_project = self._getFilePLDIDandPROJECT()
if file_pldid and file_pldid != pldid:
print "Load program:", self.__program
srcsz = int(self.comm("?SRCSZ").split()[0])
offsets, opmfile = self._getoffset()
if (offsets["src_c"] - offsets["src_cc"]) < srcsz:
SRCST = offsets["src_cc"]
srcsz = offsets["src_c"] - offsets["src_cc"]
if prog_name is None:
prog_name = self.__program
if prog_name == "default":
if pldid == "255":
# already default
return
else:
SRCST = offsets["src_c"]
srcsz = offsets["jed"] - offsets["src_c"]
binsz = offsets["size"] - offsets["jed"]
sendarray = opmfile[SRCST : SRCST + srcsz]
sendarray += opmfile[offsets["jed"] :]
if self.comm_ack("MODE program") != "OK":
raise IOError("Can't program opiom %s" % str(self))
print "Uploading default program"
else:
try:
file_pldid, file_project = self._getFilePLDIDandPROJECT(prog_name)
except ValueError:
# invalid unpacking
raise IOError(
"opiom %s: cannot find program %s" % (str(self), prog_name)
)
if (
self.comm_ack(
'PROG %d %d %d %d "%s"'
% (binsz, srcsz, self.FSIZE, int(file_pldid), file_project)
if file_pldid and file_pldid != pldid:
print "Uploading opiom program, please wait"
srcsz = int(self.comm("?SRCSZ").split()[0])
offsets, opmfile = self._getoffset(prog_name)
if (offsets["src_c"] - offsets["src_cc"]) < srcsz:
SRCST = offsets["src_cc"]
srcsz = offsets["src_c"] - offsets["src_cc"]
else:
SRCST = offsets["src_c"]
srcsz = offsets["jed"] - offsets["src_c"]
binsz = offsets["size"] - offsets["jed"]
sendarray = opmfile[SRCST : SRCST + srcsz]
sendarray += opmfile[offsets["jed"] :]
else:
# program already loaded
return
if self.comm_ack("MODE program") != "OK":
raise IOError("Can't program opiom %s" % str(self))
if prog_name == "default":
ans = self.comm_ack("PROG DEFAULT")
sendarray = []
else:
ans = self.comm_ack(
'PROG %d %d %d %d "%s"'
% (binsz, srcsz, self.FSIZE, int(file_pldid), file_project)
)
if ans != "OK":
self.comm("MODE normal")
raise IOError("Can't start programming opiom %s" % str(self))
for frame_n, index in enumerate(range(0, len(sendarray), self.FSIZE)):
with KillMask():
cmd = "#*FRM %d\r" % frame_n
self.raw_write(cmd)
self.raw_bin_write(sendarray[index : index + self.FSIZE])
answer = self._cnx.readline("\r\n")
if answer == "OK":
continue
raise RuntimeError(
"Load program: [%s] returned [%s]" % (cmd.strip(), answer)
)
!= "OK"
):
self.comm("MODE normal")
raise IOError("Can't start programming opiom %s" % str(self))
for frame_n, index in enumerate(range(0, len(sendarray), self.FSIZE)):
with KillMask():
cmd = "#*FRM %d\r" % frame_n
self.raw_write(cmd)
self.raw_bin_write(sendarray[index : index + self.FSIZE])
answer = self._cnx.readline("\r\n")
if answer == "OK":
continue
raise RuntimeError(
"Load program: [%s] returned [%s]" % (cmd.strip(), answer)
)
# waiting end programming
while 1:
stat_num = self.comm("?PSTAT")
self.__debugMsg("Load", stat_num)
try:
stat, percent = stat_num.split()
except ValueError:
stat = stat_num
break
return stat == "DONE"
# waiting end programming
while True:
stat_num = self.comm("?PSTAT")
self.__debugMsg("Load", stat_num)
try:
stat, percent = stat_num.split()
except ValueError:
stat = stat_num
break
return stat == "DONE"
def _display_bits(self, prefix, bits):
for i in range(1, 9):
......@@ -208,8 +237,8 @@ class Opiom:
print
def _getoffset(self):
with remote_open(os.path.join(self.__base_path, self.__program + ".opm")) as f:
def _getoffset(self, prog_name):
with remote_open(os.path.join(self.__base_path, prog_name + ".opm")) as f:
line = f.read(14)
f.seek(0)
opmfile = f.read()
......@@ -227,10 +256,10 @@ class Opiom:
opmfile,
)
def _getFilePLDIDandPROJECT(self):
def _getFilePLDIDandPROJECT(self, prog_name):
TOKEN = "#pldid#"
PROJECT_TOKEN = "#project#"
with remote_open(os.path.join(self.__base_path, self.__program + ".opm")) as f:
with remote_open(os.path.join(self.__base_path, prog_name + ".opm")) as f:
begin = -1
for line in f:
begin = line.find(TOKEN)
......
......@@ -32,7 +32,8 @@ provided by the controller itself::
Here's an working example::
>>> from bliss.config.static import get_config
>>> from bliss.common.scans import timescan, get_data
>>> from bliss.common.scans import timescan
>>> from bliss.data.scan import get_data
>>> config = get_config()
>>> pepu = config.get('pepudcm2')
......
......@@ -244,3 +244,4 @@ def session(beacon):
def pytest_addoption(parser):
parser.addoption("--pepu", help="pepu host name")
parser.addoption("--ct2", help="ct2 address")
parser.addoption("--mythen", action="store", help="mythen host name")
......@@ -5,8 +5,6 @@
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import pytest
def pytest_collection_modifyitems(config, items):
devices = ["pepu", "ct2"]
......
"""Hardware testing module for the Mythen."""
import socket
import pytest
from bliss.controllers.mythenlib import MythenInterface, Polarity
from bliss.controllers.mythenlib import MythenCompatibilityError, MythenCommandError
@pytest.fixture(params=[socket.SOCK_STREAM, socket.SOCK_DGRAM], ids=["TCP", "UDP"])
def mythen(request):
hostname = request.config.getoption("--mythen")
if hostname is None:
pytest.xfail("The --mythen hostname option has to be provided.")
mythen = MythenInterface(hostname, request.param)
yield mythen
mythen.close()
def test_errors(mythen):
with pytest.raises(MythenCompatibilityError) as info:
mythen.get_commandsetid()
assert (
str(info.value)
== "[Version 3.0.0] Command '-get commandsetid' requires version >= 4.0.0"
)
with pytest.raises(MythenCommandError) as info:
mythen._run_command("-idontexist", "int")
assert str(info.value) == "[Error -1] Unknown command ('-idontexist')"
def test_common_getters(mythen):
assert mythen.get_version() == (3, 0, 0)
assert mythen.get_nmodules() == 1
assert mythen.get_modchannels() == (1280,)
def test_general_getters(mythen):
assert mythen.get_assemblydate() == "Mon Aug 21 13:06:10 CEST 2017"
assert all(x in (0, 1) for x in mythen.get_badchannels())
assert mythen.get_commandid() + 1 == mythen.get_commandid()
with pytest.raises(MythenCompatibilityError):
mythen.get_commandsetid()
with pytest.raises(MythenCompatibilityError):
mythen.get_dcstemperature()
with pytest.raises(MythenCompatibilityError):
mythen.get_frameratemax()
with pytest.raises(MythenCompatibilityError):
mythen.get_fwversion()
with pytest.raises(MythenCompatibilityError):
mythen.get_humidity()
with pytest.raises(MythenCompatibilityError):
mythen.get_highvoltage()
with pytest.raises(MythenCompatibilityError):
mythen.get_modfwversion()
assert mythen.get_modnum() == ("0x192",)
assert mythen.get_module() == 0xffff
assert mythen.get_nmaxmodules() == 1
assert mythen.get_sensormaterial() == ("silicon",)
assert mythen.get_sensorthickness() == (1000,)
with pytest.raises(MythenCompatibilityError):
mythen.get_sensorwidth()
assert mythen.get_systemnum() == 256
with pytest.raises(MythenCompatibilityError):
mythen.get_temperature()
with pytest.raises(MythenCompatibilityError):
mythen.get_testversion()
def test_general_commands(mythen):
mythen.select_module(0)
assert mythen.get_module() == 0
mythen.select_all_modules()
assert mythen.get_module() == 0xffff
mythen.set_nmodules(1)
assert mythen.get_nmodules() == 1
mythen.reset()
def test_acquisition_settings(mythen):
mythen.set_delayafterframe(1.)
assert mythen.get_delayafterframe() == 1.
mythen.set_delayafterframe()
assert mythen.get_delayafterframe() == 0
mythen.set_nframes(2)
assert mythen.get_nframes() == 2
mythen.set_nframes()
assert mythen.get_nframes() == 1
mythen.set_nbits(4)
assert mythen.get_nbits() == 4
mythen.set_nbits()
assert mythen.get_nbits() == 24
mythen.set_exposure_time(2.)
assert mythen.get_exposure_time() == 2.
mythen.set_exposure_time()
assert mythen.get_exposure_time() == 1.
status = mythen.get_status()
assert not status.running
assert not status.inactive_exposure
assert status.empty_buffer
with pytest.raises(MythenCompatibilityError):
mythen.get_readouttimes()
def test_acquisition_control(mythen):
# Configuration
mythen.set_delayafterframe(0.05)
mythen.set_exposure_time(0.05)
mythen.set_nframes(5)
# Start acquisition
mythen.start()
# Check status
status = mythen.get_status()
assert status.running
assert not status.inactive_exposure
assert status.empty_buffer
# Wait for first frame
array = mythen.readout()
assert array.shape == (1, 1280)
# Check status
status = mythen.get_status()
assert status.running
assert not status.inactive_exposure
assert status.empty_buffer
# Wait for the last 4 frames
with pytest.raises(MythenCompatibilityError):
mythen.readout(4)
for _ in range(4):
array = mythen.readout(1)
assert array.shape == (1, 1280)
# Check status
status = mythen.get_status()
assert not status.running
assert not status.inactive_exposure
assert status.empty_buffer
# Stop acquisition
mythen.stop()
# Check status
status = mythen.get_status()
assert not status.running
assert not status.inactive_exposure
assert status.empty_buffer
def test_detector_settings(mythen):
mythen.set_energy(9.)
assert mythen.get_energy() == (pytest.approx(9.),)
mythen.set_energy()
assert mythen.get_energy() == (pytest.approx(8.05),)
assert mythen.get_energymin() == (pytest.approx(7.39),)
assert mythen.get_energymax() == (pytest.approx(24.),)
mythen.set_kthresh(7.)
assert mythen.get_kthresh() == (pytest.approx(7.),)
mythen.set_kthresh()
assert mythen.get_kthresh() == (pytest.approx(6.4),)
assert mythen.get_kthreshmin() == (pytest.approx(6.),)
assert mythen.get_kthreshmax() == (pytest.approx(12.),)
mythen.set_kthresh_and_energy(7., 9.)
assert mythen.get_kthresh() == (pytest.approx(7.),)
assert mythen.get_energy() == (pytest.approx(9.),)
mythen.set_kthresh_and_energy()
assert mythen.get_kthresh() == (pytest.approx(6.4),)
assert mythen.get_energy() == (pytest.approx(8.05),)
def test_load_predefined_settings(mythen):
with pytest.raises(ValueError):
mythen.load_predefined_settings("Ar")
for element in ("Cu", "Mo", "Ag"):
mythen.load_predefined_settings(element)
for element in ("Cr",):
with pytest.raises(MythenCommandError):
mythen.load_predefined_settings(element)
def test_data_correction(mythen):
mythen.enable_badchannelinterpolation(True)
assert mythen.badchannelinterpolation_enabled()
mythen.enable_badchannelinterpolation(False)
assert not mythen.badchannelinterpolation_enabled()
mythen.enable_flatfieldcorrection(True)
assert mythen.flatfieldcorrection_enabled()
mythen.enable_flatfieldcorrection(False)
assert not mythen.flatfieldcorrection_enabled()
with pytest.raises(MythenCompatibilityError):
mythen.set_flatfield(0, [1, 2, 3])
with pytest.raises(MythenCompatibilityError):