...
 
Commits (18)
...@@ -23,6 +23,7 @@ from .connection import SpecConnection ...@@ -23,6 +23,7 @@ from .connection import SpecConnection
from .reply import SpecReply from .reply import SpecReply
from .wait import waitConnection from .wait import waitConnection
from .error import SpecClientTimeoutError, SpecClientError, SpecClientNotConnectedError from .error import SpecClientTimeoutError, SpecClientError, SpecClientNotConnectedError
from bliss.common import event
class wrap_errors(object): class wrap_errors(object):
...@@ -157,7 +158,7 @@ class SpecCommandA(BaseSpecCommand): ...@@ -157,7 +158,7 @@ class SpecCommandA(BaseSpecCommand):
event.disconnect(self.connection, "connected", self._connected) event.disconnect(self.connection, "connected", self._connected)
event.disconnect(self.connection, "disconnected", self._disconnected) event.disconnect(self.connection, "disconnected", self._disconnected)
self.connection = connection.SpecConnection(specVersion) self.connection = SpecConnection(specVersion)
self.specVersion = specVersion self.specVersion = specVersion
event.connect(self.connection, "connected", self._connected) event.connect(self.connection, "connected", self._connected)
...@@ -193,22 +194,14 @@ class SpecCommandA(BaseSpecCommand): ...@@ -193,22 +194,14 @@ class SpecCommandA(BaseSpecCommand):
self.beginWait() self.beginWait()
with gevent.Timeout(timeout, SpecClientTimeoutError): with gevent.Timeout(timeout, SpecClientTimeoutError):
waiter = SpecWaitObject(self.connection) if isinstance(command, str):
waiter.waitConnection()
if self.connection.serverVersion < 3:
id = self.connection.send_msg_cmd_with_return( id = self.connection.send_msg_cmd_with_return(
command, self.replyArrived command, self.replyArrived
) )
else: else:
if isinstance(command, str): id = self.connection.send_msg_func_with_return(
id = self.connection.send_msg_cmd_with_return( command, self.replyArrived
command, self.replyArrived )
)
else:
id = self.connection.send_msg_func_with_return(
command, self.replyArrived
)
t = gevent.spawn(wrap_errors(wait_end_of_spec_cmd), self) t = gevent.spawn(wrap_errors(wait_end_of_spec_cmd), self)
...@@ -250,30 +243,3 @@ class SpecCommandA(BaseSpecCommand): ...@@ -250,30 +243,3 @@ class SpecCommandA(BaseSpecCommand):
return return
self.connection.abort() self.connection.abort()
class SpecCommand(SpecCommandA):
def __init__(self, *args, **kwargs):
SpecCommandA.__init__(self, *args, **kwargs)
def connectToSpec(self, specVersion, timeout):
SpecCommandA.connectToSpec(self, specVersion, timeout)
if not self.connection.isSpecConnected():
with gevent.Timeout(timeout, SpecClientTimeoutError):
waitConnection(self.connection, timeout)
self._connected()
def abort(self):
if self.connection is None or not self.connection.isSpecConnected():
return
self.connection.abort(wait=True)
def __call__(self, *args, **kwargs):
wait = kwargs.get("wait", True)
timeout = kwargs.get("timeout", None)
return SpecCommandA.__call__(self, *args, wait=wait, timeout=timeout)
def executeCommand(self, command, wait=True, timeout=None):
return SpecCommandA.executeCommand(self, command, wait, timeout)
...@@ -223,20 +223,6 @@ class Shutter(object): ...@@ -223,20 +223,6 @@ class Shutter(object):
def __exit__(self, exc_type, exc_value, traceback): def __exit__(self, exc_type, exc_value, traceback):
self.close() self.close()
@property
def is_open(self):
return self.state == self.OPEN
@property
def is_closed(self):
return self.state == self.CLOSED
def __enter__(self):
self.open()
def __exit__(self, exc_type, exc_value, traceback):
self.close()
@property @property
def external_control(self): def external_control(self):
return self._external_ctrl return self._external_ctrl
......
...@@ -196,7 +196,7 @@ class Icepap(Controller): ...@@ -196,7 +196,7 @@ class Icepap(Controller):
status = axis._state() status = axis._state()
else: else:
last_power_time = self._last_axis_power_time.get(axis, 0) last_power_time = self._last_axis_power_time.get(axis, 0)
if time.time() - last_power_time < 1.0: if time.time() - last_power_time < 1.0 and not isinstance(axis, LinkedAxis):
status = int(_command(self._cnx, "%s:?STATUS" % axis.address), 16) status = int(_command(self._cnx, "%s:?STATUS" % axis.address), 16)
else: else:
self._last_axis_power_time.pop(axis, None) self._last_axis_power_time.pop(axis, None)
......
...@@ -5,8 +5,15 @@ ...@@ -5,8 +5,15 @@
# Copyright (c) 2017 Beamline Control Unit, ESRF # Copyright (c) 2017 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info. # Distributed under the GNU LGPLv3. See LICENSE for more info.
from bliss.common.axis import Axis, NoSettingsAxis, DEFAULT_POLLING_TIME, lazy_init from bliss.common.axis import (
Axis,
NoSettingsAxis,
DEFAULT_POLLING_TIME,
lazy_init,
Motion,
)
from . import _ackcommand, _command from . import _ackcommand, _command
import gevent
class LinkedAxis(Axis): class LinkedAxis(Axis):
...@@ -133,15 +140,20 @@ class LinkedAxis(Axis): ...@@ -133,15 +140,20 @@ class LinkedAxis(Axis):
# create motion object for hooks # create motion object for hooks
motion = Motion(self, switch, None, "homing") motion = Motion(self, switch, None, "homing")
self.__execute_pre_move_hook(motion) self._Axis__execute_pre_move_hook(motion)
def start_one(controller, motions): def start_one(controller, motions):
cnx = controller._cnx cnx = controller._cnx
cmd = "HOME STRICT %s %s" % ( home_dir = "+1" if motions[0].target_pos > 0 else "-1"
motions[0].axis.address, cmd = "#HOME STRICT %s" % " ".join(
("+1" if motions[0].target_pos > 0 else "-1"), (str(rm.address) + " " + home_dir for rm in self.real_axes)
)
_command(
cnx,
cmd,
pre_cmd="#DISPROT ALL %s ; "
% " ".join((str(rm.address) for rm in self.real_axes)),
) )
_ackcommand(cnx, cmd)
# IcePAP status is not immediately MOVING after home search command is sent # IcePAP status is not immediately MOVING after home search command is sent
gevent.sleep(0.2) gevent.sleep(0.2)
......
...@@ -461,7 +461,15 @@ class musst(object): ...@@ -461,7 +461,15 @@ class musst(object):
buffer_size, nb_buffer = self.get_event_buffer_size() buffer_size, nb_buffer = self.get_event_buffer_size()
buffer_memory = buffer_size * nb_buffer buffer_memory = buffer_size * nb_buffer
current_offset, current_buffer_id = self.get_event_memory_pointer() for i in range(10):
curr_state = self.STATE
current_offset, current_buffer_id = self.get_event_memory_pointer()
if current_buffer_id == 0 and current_offset != 64:
break
if curr_state != self.RUN_STATE:
break
gevent.sleep(100e-3) # wait a little bit before re-asking
current_offset = current_buffer_id * buffer_size + current_offset current_offset = current_buffer_id * buffer_size + current_offset
from_offset = (from_event_id * nb_counters) % buffer_memory from_offset = (from_event_id * nb_counters) % buffer_memory
......
"""Controller for the mythen 1d detector."""
import numpy
from .mythenlib import MythenInterface
from bliss.common.measurement import BaseCounter, counter_namespace
from bliss.scanning.chain import AcquisitionDevice, AcquisitionChannel
def interface_property(name, mode=False):
getter_name = name + "_enabled" if mode else "get_" + name
setter_name = "enable_" + name if mode else "set_" + name
assert getter_name in dir(MythenInterface)
assert setter_name in dir(MythenInterface)
def getter(self):
return getattr(self._interface, getter_name)()
def setter(self, value):
return getattr(self._interface, setter_name)(value)
return property(getter, setter)
class Mythen(object):
_settings = [
# General configuration
"nmodules",
# Acquisition configuration
"delay_after_frame",
"nframes",
"nbits",
"exposure_time",
# Detector configuration
"energy",
"threshold",
# Data correction settings
"bad_channel_interpolation",
"flat_field_correction",
"rate_correction",
"rate_correction_deadtime",
# Trigger / Gate settings
"continuous_trigger_mode",
"single_trigger_mode",
"delay_before_frame",
"gate_mode",
"ngates",
"input_polarity",
"output_polarity",
]
def __init__(self, name, config):
self._name = name
self._config = config
self._hostname = config["hostname"]
self._interface = MythenInterface(self._hostname)
self._apply_configuration()
self._counter = MythenCounter(self)
def finalize(self):
self._interface.close()
# Counter access
@property
def counters(self):
return counter_namespace([self._counter])
# Manage configuration
def _apply_configuration(self):
if self._config.get("apply_defaults"):
self.reset()
for key, value in self._config.items():
if key in self._settings:
setattr(self, key, value)
def _get_configuration(self):
return [(key, getattr(self, key)) for key in self._settings]
def __repr__(self):
lines = ["Mythen on {}:".format(self._hostname)]
lines += [
" {:<25s} = {}".format(key, value)
for key, value in self._get_configuration()
]
return "\n".join(lines)
# Bliss properties
@property
def name(self):
return self._name
@property
def hostname(self):
return self._hostname
# General configuration
nmodules = interface_property("nmodules")
# Acquisition configuration
delay_after_frame = interface_property("delayafterframe")
nframes = interface_property("nframes")
nbits = interface_property("nbits")
exposure_time = interface_property("exposure_time")
# Detector configuration
energy = interface_property("energy")
threshold = interface_property("kthresh")
# Data correction configuration
bad_channel_interpolation = interface_property("badchannelinterpolation", mode=True)
flat_field_correction = interface_property("flatfieldcorrection", mode=True)
rate_correction = interface_property("ratecorrection", mode=True)
rate_correction_deadtime = interface_property("ratecorrection_deadtime")
# Trigger / Gate configuration
continuous_trigger_mode = interface_property("continuoustrigger", mode=True)
single_trigger_mode = interface_property("singletrigger", mode=True)
gate_mode = interface_property("gatemode", mode=True)
delay_before_frame = interface_property("delaybeforeframe")
ngates = interface_property("ngates")
input_polarity = interface_property("inputpolarity")
output_polarity = interface_property("outputpolarity")
# Expose all interface getters
def __getattr__(self, attr):
if attr.startswith("get_") and attr in dir(self._interface):
return getattr(self._interface, attr)
raise AttributeError(attr)
def __dir__(self):
lst = list(self.__dict__)
lst += dir(type(self))
lst += [key for key in dir(self._interface) if key.startswith("get_")]
return sorted(set(lst))
# Commands
def reset(self):
self._interface.reset()
def start(self):
self._interface.start()
def stop(self):
self._interface.stop()
def readout(self):
return self._interface.readout(1)[0]
# Acquisition routine
def run(self, acquisition_number=1, acquisition_time=1.):
self.nframes = acquisition_number
self.exposure_time = acquisition_time
try:
self.start()
for _ in range(acquisition_number):
yield self.readout()
finally:
self.stop()
# Mythen counter
class MythenCounter(BaseCounter):
# Initialization
def __init__(self, controller):
self._name = "spectrum"
self._controller = controller
@property
def name(self):
return self._name
@property
def controller(self):
return self._controller
# Data properties
@property
def dtype(self):
return numpy.int32
@property
def shape(self):
return (self.controller.get_nchannels(),)
# Get acquisition device
def create_acquisition_device(self, scan_pars, **settings):
scan_pars.update(settings)
count_time = scan_pars.pop("count_time")
return MythenAcquistionDevice(self, count_time, **scan_pars)
class MythenAcquistionDevice(AcquisitionDevice):
# Initialization
def __init__(self, counter, count_time, **kwargs):
self.kwargs = kwargs
self.counter = counter
self.count_time = count_time
valid_names = ("npoints", "trigger_type", "prepare_once", "start_once")
valid_kwargs = {
key: value for key, value in kwargs.items() if key in valid_names
}
super(MythenAcquistionDevice, self).__init__(
counter.controller, counter.controller.name, **valid_kwargs
)
self.channels.append(
AcquisitionChannel(counter.name, counter.dtype, counter.shape)
)
def add_counter(self, counter):
assert self.counter == counter
# Flow control
def prepare(self):
self.device.nframes = self.npoints
self.device.exposure_time = self.count_time
def start(self):
self.device.start()
def trigger(self):
# Software trigger is not supported by the mythen
pass
def reading(self):
spectrum = self.device.readout()
self.channels.update({self.counter.name: spectrum})
def stop(self):
self.device.stop()
This diff is collapsed.
...@@ -58,24 +58,32 @@ class Opiom: ...@@ -58,24 +58,32 @@ class Opiom:
self.__program = config_tree["program"] self.__program = config_tree["program"]
self.__base_path = config_tree.get("opiom_prg_root", OPIOM_PRG_ROOT) self.__base_path = config_tree.get("opiom_prg_root", OPIOM_PRG_ROOT)
self.__debug = False self.__debug = False
try:
msg = self.comm("?VER", timeout=50e-3)
except serial.SerialTimeout:
msg = self.comm("?VER", timeout=50e-3)
if not msg.startswith("OPIOM"): # Sometimes, have to talk twice to the OPIOM in order to get the proper first answer.
for ii in range(2):
try:
msg = self.comm("?VER", timeout=50e-3)
except serial.SerialTimeout:
msg = ""
if msg.startswith("OPIOM"):
break
else:
raise IOError("No opiom connected at %s" % serial) raise IOError("No opiom connected at %s" % serial)
self.comm("MODE normal") self.comm("MODE normal")
def __repr__(self): def __repr__(self):
return "Opiom : %s with program %s" % (self._cnx, self.__program) return "opiom: %s" % self._cnx
def setDebug(self, flag): @property
self.__debug = flag is True def debug(self):
def getDebug(self):
return self.__debug return self.__debug
@debug.setter
def debug(self, flag):
self.__debug = bool(flag)
def __debugMsg(self, wr, msg): def __debugMsg(self, wr, msg):
if self.__debug: if self.__debug:
print "%-5.5s on %s > %s" % (wr, self.name, msg) print "%-5.5s on %s > %s" % (wr, self.name, msg)
...@@ -142,59 +150,80 @@ class Opiom: ...@@ -142,59 +150,80 @@ class Opiom:
self.__debugMsg("Read", msg.strip("\n\r")) self.__debugMsg("Read", msg.strip("\n\r"))
return msg.strip("\r\n") return msg.strip("\r\n")
def load_program(self): def load_program(self, prog_name=None):
pldid = self.comm("?PLDID") pldid = self.comm("?PLDID")
file_pldid, file_project = self._getFilePLDIDandPROJECT() if prog_name is None:
if file_pldid and file_pldid != pldid: prog_name = self.__program
print "Load program:", self.__program if prog_name == "default":
srcsz = int(self.comm("?SRCSZ").split()[0]) if pldid == "255":
offsets, opmfile = self._getoffset() # already default
if (offsets["src_c"] - offsets["src_cc"]) < srcsz: return
SRCST = offsets["src_cc"]
srcsz = offsets["src_c"] - offsets["src_cc"]
else: else:
SRCST = offsets["src_c"] print "Uploading default program"
srcsz = offsets["jed"] - offsets["src_c"] else:
binsz = offsets["size"] - offsets["jed"] try:
file_pldid, file_project = self._getFilePLDIDandPROJECT(prog_name)
sendarray = opmfile[SRCST : SRCST + srcsz] except ValueError:
sendarray += opmfile[offsets["jed"] :] # invalid unpacking
raise IOError(
if self.comm_ack("MODE program") != "OK": "opiom %s: cannot find program %s" % (str(self), prog_name)
raise IOError("Can't program opiom %s" % str(self)) )
if ( if file_pldid and file_pldid != pldid:
self.comm_ack( print "Uploading opiom program, please wait"
'PROG %d %d %d %d "%s"' srcsz = int(self.comm("?SRCSZ").split()[0])
% (binsz, srcsz, self.FSIZE, int(file_pldid), file_project) offsets, opmfile = self._getoffset(prog_name)
if (offsets["src_c"] - offsets["src_cc"]) < srcsz:
SRCST = offsets["src_cc"]
srcsz = offsets["src_c"] - offsets["src_cc"]
else:
SRCST = offsets["src_c"]
srcsz = offsets["jed"] - offsets["src_c"]
binsz = offsets["size"] - offsets["jed"]
sendarray = opmfile[SRCST : SRCST + srcsz]
sendarray += opmfile[offsets["jed"] :]
else:
# program already loaded
return
if self.comm_ack("MODE program") != "OK":
raise IOError("Can't program opiom %s" % str(self))
if prog_name == "default":
ans = self.comm_ack("PROG DEFAULT")
sendarray = []
else:
ans = self.comm_ack(
'PROG %d %d %d %d "%s"'
% (binsz, srcsz, self.FSIZE, int(file_pldid), file_project)
)
if ans != "OK":
self.comm("MODE normal")
raise IOError("Can't start programming opiom %s" % str(self))
for frame_n, index in enumerate(range(0, len(sendarray), self.FSIZE)):
with KillMask():
cmd = "#*FRM %d\r" % frame_n
self.raw_write(cmd)
self.raw_bin_write(sendarray[index : index + self.FSIZE])
answer = self._cnx.readline("\r\n")
if answer == "OK":
continue
raise RuntimeError(
"Load program: [%s] returned [%s]" % (cmd.strip(), answer)
) )
!= "OK"
): # waiting end programming
self.comm("MODE normal") while True:
raise IOError("Can't start programming opiom %s" % str(self)) stat_num = self.comm("?PSTAT")
self.__debugMsg("Load", stat_num)
for frame_n, index in enumerate(range(0, len(sendarray), self.FSIZE)): try:
with KillMask(): stat, percent = stat_num.split()
cmd = "#*FRM %d\r" % frame_n except ValueError:
self.raw_write(cmd) stat = stat_num
self.raw_bin_write(sendarray[index : index + self.FSIZE]) break
answer = self._cnx.readline("\r\n") return stat == "DONE"
if answer == "OK":
continue
raise RuntimeError(
"Load program: [%s] returned [%s]" % (cmd.strip(), answer)
)
# waiting end programming
while 1:
stat_num = self.comm("?PSTAT")
self.__debugMsg("Load", stat_num)
try:
stat, percent = stat_num.split()
except ValueError:
stat = stat_num
break
return stat == "DONE"
def _display_bits(self, prefix, bits): def _display_bits(self, prefix, bits):
for i in range(1, 9): for i in range(1, 9):
...@@ -208,8 +237,8 @@ class Opiom: ...@@ -208,8 +237,8 @@ class Opiom:
print print
def _getoffset(self): def _getoffset(self, prog_name):
with remote_open(os.path.join(self.__base_path, self.__program + ".opm")) as f: with remote_open(os.path.join(self.__base_path, prog_name + ".opm")) as f:
line = f.read(14) line = f.read(14)
f.seek(0) f.seek(0)
opmfile = f.read() opmfile = f.read()
...@@ -227,10 +256,10 @@ class Opiom: ...@@ -227,10 +256,10 @@ class Opiom:
opmfile, opmfile,
) )
def _getFilePLDIDandPROJECT(self): def _getFilePLDIDandPROJECT(self, prog_name):
TOKEN = "#pldid#" TOKEN = "#pldid#"
PROJECT_TOKEN = "#project#" PROJECT_TOKEN = "#project#"
with remote_open(os.path.join(self.__base_path, self.__program + ".opm")) as f: with remote_open(os.path.join(self.__base_path, prog_name + ".opm")) as f:
begin = -1 begin = -1
for line in f: for line in f:
begin = line.find(TOKEN) begin = line.find(TOKEN)
......
...@@ -32,7 +32,8 @@ provided by the controller itself:: ...@@ -32,7 +32,8 @@ provided by the controller itself::
Here's an working example:: Here's an working example::
>>> from bliss.config.static import get_config >>> from bliss.config.static import get_config
>>> from bliss.common.scans import timescan, get_data >>> from bliss.common.scans import timescan
>>> from bliss.data.scan import get_data
>>> config = get_config() >>> config = get_config()
>>> pepu = config.get('pepudcm2') >>> pepu = config.get('pepudcm2')
......
...@@ -244,3 +244,4 @@ def session(beacon): ...@@ -244,3 +244,4 @@ def session(beacon):
def pytest_addoption(parser): def pytest_addoption(parser):
parser.addoption("--pepu", help="pepu host name") parser.addoption("--pepu", help="pepu host name")
parser.addoption("--ct2", help="ct2 address") parser.addoption("--ct2", help="ct2 address")
parser.addoption("--mythen", action="store", help="mythen host name")
...@@ -5,8 +5,6 @@ ...@@ -5,8 +5,6 @@
# Copyright (c) 2016 Beamline Control Unit, ESRF # Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info. # Distributed under the GNU LGPLv3. See LICENSE for more info.
import pytest
def pytest_collection_modifyitems(config, items): def pytest_collection_modifyitems(config, items):
devices = ["pepu", "ct2"] devices = ["pepu", "ct2"]
......
"""Hardware testing module for the Mythen."""
import socket
import pytest
from bliss.controllers.mythenlib import MythenInterface, Polarity
from bliss.controllers.mythenlib import MythenCompatibilityError, MythenCommandError
@pytest.fixture(params=[socket.SOCK_STREAM, socket.SOCK_DGRAM], ids=["TCP", "UDP"])
def mythen(request):
hostname = request.config.getoption("--mythen")
if hostname is None:
pytest.xfail("The --mythen hostname option has to be provided.")
mythen = MythenInterface(hostname, request.param)
yield mythen
mythen.close()
def test_errors(mythen):
with pytest.raises(MythenCompatibilityError) as info:
mythen.get_commandsetid()
assert (
str(info.value)
== "[Version 3.0.0] Command '-get commandsetid' requires version >= 4.0.0"
)
with pytest.raises(MythenCommandError) as info:
mythen._run_command("-idontexist", "int")
assert str(info.value) == "[Error -1] Unknown command ('-idontexist')"
def test_common_getters(mythen):
assert mythen.get_version() == (3, 0, 0)
assert mythen.get_nmodules() == 1
assert mythen.get_modchannels() == (1280,)
def test_general_getters(mythen):
assert mythen.get_assemblydate() == "Mon Aug 21 13:06:10 CEST 2017"
assert all(x in (0, 1) for x in mythen.get_badchannels())
assert mythen.get_commandid() + 1 == mythen.get_commandid()
with pytest.raises(MythenCompatibilityError):
mythen.get_commandsetid()
with pytest.raises(MythenCompatibilityError):
mythen.get_dcstemperature()
with pytest.raises(MythenCompatibilityError):
mythen.get_frameratemax()
with pytest.raises(MythenCompatibilityError):
mythen.get_fwversion()
with pytest.raises(MythenCompatibilityError):
mythen.get_humidity()
with pytest.raises(MythenCompatibilityError):
mythen.get_highvoltage()
with pytest.raises(MythenCompatibilityError):
mythen.get_modfwversion()
assert mythen.get_modnum() == ("0x192",)
assert mythen.get_module() == 0xffff
assert mythen.get_nmaxmodules() == 1
assert mythen.get_sensormaterial() == ("silicon",)
assert mythen.get_sensorthickness() == (1000,)
with pytest.raises(MythenCompatibilityError):
mythen.get_sensorwidth()
assert mythen.get_systemnum() == 256
with pytest.raises(MythenCompatibilityError):
mythen.get_temperature()
with pytest.raises(MythenCompatibilityError):
mythen.get_testversion()
def test_general_commands(mythen):
mythen.select_module(0)
assert mythen.get_module() == 0
mythen.select_all_modules()
assert mythen.get_module() == 0xffff
mythen.set_nmodules(1)
assert mythen.get_nmodules() == 1
mythen.reset()
def test_acquisition_settings(mythen):
mythen.set_delayafterframe(1.)
assert mythen.get_delayafterframe() == 1.
mythen.set_delayafterframe()
assert mythen.get_delayafterframe() == 0
mythen.set_nframes(2)
assert mythen.get_nframes() == 2
mythen.set_nframes()
assert mythen.get_nframes() == 1
mythen.set_nbits(4)
assert mythen.get_nbits() == 4
mythen.set_nbits()
assert mythen.get_nbits() == 24
mythen.set_exposure_time(2.)
assert mythen.get_exposure_time() == 2.
mythen.set_exposure_time()
assert mythen.get_exposure_time() == 1.
status = mythen.get_status()
assert not status.running
assert not status.inactive_exposure
assert status.empty_buffer
with pytest.raises(MythenCompatibilityError):
mythen.get_readouttimes()
def test_acquisition_control(mythen):
# Configuration
mythen.set_delayafterframe(0.05)
mythen.set_exposure_time(0.05)
mythen.set_nframes(5)
# Start acquisition
mythen.start()
# Check status
status = mythen.get_status()
assert status.running
assert not status.inactive_exposure
assert status.empty_buffer
# Wait for first frame
array = mythen.readout()
assert array.shape == (1, 1280)
# Check status
status = mythen.get_status()
assert status.running
assert not status.inactive_exposure
assert status.empty_buffer
# Wait for the last 4 frames
with pytest.raises(MythenCompatibilityError):
mythen.readout(4)
for _ in range(4):
array = mythen.readout(1)
assert array.shape == (1, 1280)
# Check status
status = mythen.get_status()
assert not status.running
assert not status.inactive_exposure
assert status.empty_buffer
# Stop acquisition
mythen.stop()
# Check status
status = mythen.get_status()
assert not status.running
assert not status.inactive_exposure
assert status.empty_buffer
def test_detector_settings(mythen):
mythen.set_energy(9.)
assert mythen.get_energy() == (pytest.approx(9.),)
mythen.set_energy()
assert mythen.get_energy() == (pytest.approx(8.05),)
assert mythen.get_energymin() == (pytest.approx(7.39),)
assert mythen.get_energymax() == (pytest.approx(24.),)
mythen.set_kthresh(7.)
assert mythen.get_kthresh() == (pytest.approx(7.),)
mythen.set_kthresh()
assert mythen.get_kthresh() == (pytest.approx(6.4),)
assert mythen.get_kthreshmin() == (pytest.approx(6.),)
assert mythen.get_kthreshmax() == (pytest.approx(12.),)
mythen.set_kthresh_and_energy(7., 9.)
assert mythen.get_kthresh() == (pytest.approx(7.),)
assert mythen.get_energy() == (pytest.approx(9.),)
mythen.set_kthresh_and_energy()
assert mythen.get_kthresh() == (pytest.approx(6.4),)
assert mythen.get_energy() == (pytest.approx(8.05),)
def test_load_predefined_settings(mythen):
with pytest.raises(ValueError):
mythen.load_predefined_settings("Ar")
for element in ("Cu", "Mo", "Ag"):
mythen.load_predefined_settings(element)
for element in ("Cr",):
with pytest.raises(MythenCommandError):
mythen.load_predefined_settings(element)
def test_data_correction(mythen):
mythen.enable_badchannelinterpolation(True)
assert mythen.badchannelinterpolation_enabled()
mythen.enable_badchannelinterpolation(False)
assert not mythen.badchannelinterpolation_enabled()
mythen.enable_flatfieldcorrection(True)
assert mythen.flatfieldcorrection_enabled()
mythen.enable_flatfieldcorrection(False)
assert not mythen.flatfieldcorrection_enabled()
with pytest.raises(MythenCompatibilityError):
mythen.set_flatfield(0, [1, 2, 3])
with pytest.raises(MythenCompatibilityError):
mythen.load_flatfield(1)
assert mythen.get_flatfield().shape == (1280,)
assert mythen.get_flatfield_cutoff() == 16777216
mythen.enable_ratecorrection(True)
assert mythen.ratecorrection_enabled()
mythen.enable_ratecorrection(False)
assert not mythen.ratecorrection_enabled()
mythen.set_ratecorrection_deadtime(400 * 1e-9)
assert mythen.get_ratecorrection_deadtime() == pytest.approx(400 * 1e-9)
mythen.set_ratecorrection_deadtime(-1)
default = 140.400573 * 1e-9
assert mythen.get_ratecorrection_deadtime() == pytest.approx(default)
def test_trigger_and_gate(mythen):
mythen.enable_continuoustrigger(True)
assert mythen.continuoustrigger_enabled()
mythen.enable_continuoustrigger(False)
assert not mythen.continuoustrigger_enabled()
mythen.enable_singletrigger(True)
assert mythen.singletrigger_enabled()
mythen.enable_singletrigger(False)
assert not mythen.singletrigger_enabled()
mythen.set_delaybeforeframe(.5)
assert mythen.get_delaybeforeframe() == pytest.approx(.5)
mythen.set_delaybeforeframe()
assert mythen.get_delaybeforeframe() == pytest.approx(0.)
mythen.enable_gatemode(True)
assert mythen.gatemode_enabled()
mythen.enable_gatemode(False)
assert not mythen.gatemode_enabled()
mythen.set_ngates(10)
assert mythen.get_ngates() == 10
mythen.set_ngates()
assert mythen.get_ngates() == 1
mythen.set_inputpolarity(Polarity.High)
assert mythen.get_inputpolarity() == Polarity.High
mythen.set_inputpolarity(Polarity.Low)
assert mythen.get_inputpolarity() == Polarity.Low
mythen.set_outputpolarity(Polarity.High)
assert mythen.get_outputpolarity() == Polarity.High
mythen.set_outputpolarity(Polarity.Low)
assert mythen.get_outputpolarity() == Polarity.Low
def test_debugging(mythen):
mythen.start_logging()
with pytest.raises(MythenCompatibilityError):
assert mythen.logging_running()
logs = mythen.stop_logging()
assert "Command: -log argument: stop" in logs
data = mythen.test_pattern()
assert data.shape == (1280,)
assert list(data) == list(range(1280))
@pytest.mark.parametrize("nbits", [4, 8, 16, 24])
def test_raw_readout(mythen, nbits):
# Configuration
mythen.set_nbits(nbits)
mythen.set_delayafterframe(0.05)
mythen.set_exposure_time(0.05)
mythen.set_nframes(1)
# Start acquisition
mythen.start()
# Wait for the frame
array = mythen.raw_readout()
assert array.shape == (1280,)
# Stop acquisition
mythen.stop()
import pytest
import numpy as np
from mock import Mock
from bliss.common import scans
from bliss.controllers.mythen import Mythen
from bliss.controllers import mythenlib
@pytest.fixture
def run_command(monkeypatch):
setters = {}
commands = []
def run_command(sock, command, return_type="int", return_shape=(), payload=b""):
# General commands
if command == "-get version":
return "v3.0.0"
if command == "-get nmodules":
return 1
# Detect setters
args = command[1:].split(" ")
if args[0] == "readout":
assert args[1:] == ["1"]
args = args[:1]
if len(args) == 2 and args[0] != "get":
setters[args[0]] = args[1]
if len(args) == 1:
commands.append(args[0])
# Return values
if return_shape == ():
return {
"int": 0,
"float": 1.23e9 if "tau" in command else 4.56,
"long long": 31400000,
}[return_type]
if return_shape == (1,):
return np.array([run_command(sock, command, return_type, (), payload)])
if return_shape == (1, 1280):
return np.array(range(1280)).reshape(return_shape)
# Not managed
assert False
monkeypatch.setattr(mythenlib, "socket", Mock())
monkeypatch.setattr(mythenlib, "run_command", run_command)
run_command.setters = setters
run_command.commands = commands
yield run_command
def test_myten_basic(run_command):
m = Mythen("test", {"hostname": "mymythen"})
assert m.name == "test"
assert m.hostname == "mymythen"
m._interface._sock.connect.assert_called_once_with(("mymythen", 1030))
assert (
repr(m)
== """\
Mythen on mymythen:
nmodules = 1
delay_after_frame = 3.14
nframes = 0
nbits = 0
exposure_time = 3.14
energy = (4.56,)
threshold = (4.56,)
bad_channel_interpolation = False
flat_field_correction = False
rate_correction = False
rate_correction_deadtime = (1.23,)
continuous_trigger_mode = False
single_trigger_mode = False
delay_before_frame = 3.14
gate_mode = False
ngates = 0
input_polarity = 0
output_polarity = 0"""
)
m.finalize()
m._interface._sock.close.assert_called_once_with()
def test_myten_configuration(run_command):
Mythen("test", {"hostname": "mymythen", "energy": 8.88, "threshold": 4.44})
assert run_command.commands == []
assert run_command.setters == {"energy": "8.88", "kthresh": "4.44"}
def test_myten_reset_configuration(run_command):
Mythen("test", {"hostname": "mymythen", "apply_defaults": True, "gate_mode": True})
assert run_command.commands == ["reset"]
assert run_command.setters == {"gateen": "1"}
def test_mythen_readonly_getters(run_command):
m = Mythen("test", {"hostname": "mymythen"})
assert "get_version" in dir(m)
assert m.get_version() == (3, 0, 0)
def test_mythen_commands(run_command):
m = Mythen("test", {"hostname": "mymythen"})
m.reset()
m.start()
assert list(m.readout()) == list(range(1280))
m.stop()
assert run_command.commands == ["reset", "start", "readout", "stop"]
def test_mythen_run(run_command):
m = Mythen("test", {"hostname": "mymythen"})
data = list(m.run(10, 0.1))
assert len(data) == 10
for frame in data:
assert list(frame) == list(range(1280))
assert run_command.commands == ["start"] + ["readout"] * 10 + ["stop"]
assert run_command.setters == {"frames": "10", "time": "1000000"}
def test_myten_counter(run_command):
m = Mythen("test", {"hostname": "mymythen"})
counter = m.counters.spectrum
assert counter.name == "spectrum"
assert counter.dtype == np.int32
assert counter.shape == (1280,)
assert counter.controller == m
def test_mythen_from_config(run_command, beacon):
m = beacon.get("mythen1")
assert m.hostname == "mymythen"
assert run_command.commands == ["reset"]
def test_mythen_ct_scan(run_command, beacon):
mythen = beacon.get("mythen1")
scan = scans.ct(0.1, mythen, return_scan=True, save=False)
data = scan.get_data()["spectrum"]
assert data.shape == (1, 1280)
assert list(data[0]) == list(range(1280))
def test_mythen_default_chain_with_counter_namespace(run_command, beacon):
m0 = beacon.get("m0")
mythen = beacon.get("mythen1")
scan = scans.ascan(m0, 0, 10, 3, 0.1, mythen, return_scan=True, save=False)
data = scan.get_data()["spectrum"]
assert data.shape == (3, 1280)
assert np.array_equal(data, [list(range(1280))] * 3)
name: mythen1
module: mythen
class: Mythen
plugin: bliss
hostname: mymythen
apply_defaults: True
\ No newline at end of file
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
- spec_m1 - spec_m1
- mot_1_disable_cache - mot_1_disable_cache
- mot_2_disable_cache - mot_2_disable_cache
- mythen1
synoptic: synoptic:
svg-file: ./test_synoptic.svg svg-file: ./test_synoptic.svg
elements: elements:
......