...
 
Commits (18)
......@@ -23,6 +23,7 @@ from .connection import SpecConnection
from .reply import SpecReply
from .wait import waitConnection
from .error import SpecClientTimeoutError, SpecClientError, SpecClientNotConnectedError
from bliss.common import event
class wrap_errors(object):
......@@ -157,7 +158,7 @@ class SpecCommandA(BaseSpecCommand):
event.disconnect(self.connection, "connected", self._connected)
event.disconnect(self.connection, "disconnected", self._disconnected)
self.connection = connection.SpecConnection(specVersion)
self.connection = SpecConnection(specVersion)
self.specVersion = specVersion
event.connect(self.connection, "connected", self._connected)
......@@ -193,22 +194,14 @@ class SpecCommandA(BaseSpecCommand):
self.beginWait()
with gevent.Timeout(timeout, SpecClientTimeoutError):
waiter = SpecWaitObject(self.connection)
waiter.waitConnection()
if self.connection.serverVersion < 3:
if isinstance(command, str):
id = self.connection.send_msg_cmd_with_return(
command, self.replyArrived
)
else:
if isinstance(command, str):
id = self.connection.send_msg_cmd_with_return(
command, self.replyArrived
)
else:
id = self.connection.send_msg_func_with_return(
command, self.replyArrived
)
id = self.connection.send_msg_func_with_return(
command, self.replyArrived
)
t = gevent.spawn(wrap_errors(wait_end_of_spec_cmd), self)
......@@ -250,30 +243,3 @@ class SpecCommandA(BaseSpecCommand):
return
self.connection.abort()
class SpecCommand(SpecCommandA):
def __init__(self, *args, **kwargs):
SpecCommandA.__init__(self, *args, **kwargs)
def connectToSpec(self, specVersion, timeout):
SpecCommandA.connectToSpec(self, specVersion, timeout)
if not self.connection.isSpecConnected():
with gevent.Timeout(timeout, SpecClientTimeoutError):
waitConnection(self.connection, timeout)
self._connected()
def abort(self):
if self.connection is None or not self.connection.isSpecConnected():
return
self.connection.abort(wait=True)
def __call__(self, *args, **kwargs):
wait = kwargs.get("wait", True)
timeout = kwargs.get("timeout", None)
return SpecCommandA.__call__(self, *args, wait=wait, timeout=timeout)
def executeCommand(self, command, wait=True, timeout=None):
return SpecCommandA.executeCommand(self, command, wait, timeout)
......@@ -223,20 +223,6 @@ class Shutter(object):
def __exit__(self, exc_type, exc_value, traceback):
self.close()
@property
def is_open(self):
return self.state == self.OPEN
@property
def is_closed(self):
return self.state == self.CLOSED
def __enter__(self):
self.open()
def __exit__(self, exc_type, exc_value, traceback):
self.close()
@property
def external_control(self):
return self._external_ctrl
......
......@@ -196,7 +196,7 @@ class Icepap(Controller):
status = axis._state()
else:
last_power_time = self._last_axis_power_time.get(axis, 0)
if time.time() - last_power_time < 1.0:
if time.time() - last_power_time < 1.0 and not isinstance(axis, LinkedAxis):
status = int(_command(self._cnx, "%s:?STATUS" % axis.address), 16)
else:
self._last_axis_power_time.pop(axis, None)
......
......@@ -5,8 +5,15 @@
# Copyright (c) 2017 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
from bliss.common.axis import Axis, NoSettingsAxis, DEFAULT_POLLING_TIME, lazy_init
from bliss.common.axis import (
Axis,
NoSettingsAxis,
DEFAULT_POLLING_TIME,
lazy_init,
Motion,
)
from . import _ackcommand, _command
import gevent
class LinkedAxis(Axis):
......@@ -133,15 +140,20 @@ class LinkedAxis(Axis):
# create motion object for hooks
motion = Motion(self, switch, None, "homing")
self.__execute_pre_move_hook(motion)
self._Axis__execute_pre_move_hook(motion)
def start_one(controller, motions):
cnx = controller._cnx
cmd = "HOME STRICT %s %s" % (
motions[0].axis.address,
("+1" if motions[0].target_pos > 0 else "-1"),
home_dir = "+1" if motions[0].target_pos > 0 else "-1"
cmd = "#HOME STRICT %s" % " ".join(
(str(rm.address) + " " + home_dir for rm in self.real_axes)
)
_command(
cnx,
cmd,
pre_cmd="#DISPROT ALL %s ; "
% " ".join((str(rm.address) for rm in self.real_axes)),
)
_ackcommand(cnx, cmd)
# IcePAP status is not immediately MOVING after home search command is sent
gevent.sleep(0.2)
......
......@@ -461,7 +461,15 @@ class musst(object):
buffer_size, nb_buffer = self.get_event_buffer_size()
buffer_memory = buffer_size * nb_buffer
current_offset, current_buffer_id = self.get_event_memory_pointer()
for i in range(10):
curr_state = self.STATE
current_offset, current_buffer_id = self.get_event_memory_pointer()
if current_buffer_id == 0 and current_offset != 64:
break
if curr_state != self.RUN_STATE:
break
gevent.sleep(100e-3) # wait a little bit before re-asking
current_offset = current_buffer_id * buffer_size + current_offset
from_offset = (from_event_id * nb_counters) % buffer_memory
......
"""Controller for the mythen 1d detector."""
import numpy
from .mythenlib import MythenInterface
from bliss.common.measurement import BaseCounter, counter_namespace
from bliss.scanning.chain import AcquisitionDevice, AcquisitionChannel
def interface_property(name, mode=False):
getter_name = name + "_enabled" if mode else "get_" + name
setter_name = "enable_" + name if mode else "set_" + name
assert getter_name in dir(MythenInterface)
assert setter_name in dir(MythenInterface)
def getter(self):
return getattr(self._interface, getter_name)()
def setter(self, value):
return getattr(self._interface, setter_name)(value)
return property(getter, setter)
class Mythen(object):
_settings = [
# General configuration
"nmodules",
# Acquisition configuration
"delay_after_frame",
"nframes",
"nbits",
"exposure_time",
# Detector configuration
"energy",
"threshold",
# Data correction settings
"bad_channel_interpolation",
"flat_field_correction",
"rate_correction",
"rate_correction_deadtime",
# Trigger / Gate settings
"continuous_trigger_mode",
"single_trigger_mode",
"delay_before_frame",
"gate_mode",
"ngates",
"input_polarity",
"output_polarity",
]
def __init__(self, name, config):
self._name = name
self._config = config
self._hostname = config["hostname"]
self._interface = MythenInterface(self._hostname)
self._apply_configuration()
self._counter = MythenCounter(self)
def finalize(self):
self._interface.close()
# Counter access
@property
def counters(self):
return counter_namespace([self._counter])
# Manage configuration
def _apply_configuration(self):
if self._config.get("apply_defaults"):
self.reset()
for key, value in self._config.items():
if key in self._settings:
setattr(self, key, value)
def _get_configuration(self):
return [(key, getattr(self, key)) for key in self._settings]
def __repr__(self):
lines = ["Mythen on {}:".format(self._hostname)]
lines += [
" {:<25s} = {}".format(key, value)
for key, value in self._get_configuration()
]
return "\n".join(lines)
# Bliss properties
@property
def name(self):
return self._name
@property
def hostname(self):
return self._hostname
# General configuration
nmodules = interface_property("nmodules")
# Acquisition configuration
delay_after_frame = interface_property("delayafterframe")
nframes = interface_property("nframes")
nbits = interface_property("nbits")
exposure_time = interface_property("exposure_time")
# Detector configuration
energy = interface_property("energy")
threshold = interface_property("kthresh")
# Data correction configuration
bad_channel_interpolation = interface_property("badchannelinterpolation", mode=True)
flat_field_correction = interface_property("flatfieldcorrection", mode=True)
rate_correction = interface_property("ratecorrection", mode=True)
rate_correction_deadtime = interface_property("ratecorrection_deadtime")
# Trigger / Gate configuration
continuous_trigger_mode = interface_property("continuoustrigger", mode=True)
single_trigger_mode = interface_property("singletrigger", mode=True)
gate_mode = interface_property("gatemode", mode=True)
delay_before_frame = interface_property("delaybeforeframe")
ngates = interface_property("ngates")
input_polarity = interface_property("inputpolarity")
output_polarity = interface_property("outputpolarity")
# Expose all interface getters
def __getattr__(self, attr):
if attr.startswith("get_") and attr in dir(self._interface):
return getattr(self._interface, attr)
raise AttributeError(attr)
def __dir__(self):
lst = list(self.__dict__)
lst += dir(type(self))
lst += [key for key in dir(self._interface) if key.startswith("get_")]
return sorted(set(lst))
# Commands
def reset(self):
self._interface.reset()
def start(self):
self._interface.start()
def stop(self):
self._interface.stop()
def readout(self):
return self._interface.readout(1)[0]
# Acquisition routine
def run(self, acquisition_number=1, acquisition_time=1.):
self.nframes = acquisition_number
self.exposure_time = acquisition_time
try:
self.start()
for _ in range(acquisition_number):
yield self.readout()
finally:
self.stop()
# Mythen counter
class MythenCounter(BaseCounter):
# Initialization
def __init__(self, controller):
self._name = "spectrum"
self._controller = controller
@property
def name(self):
return self._name
@property
def controller(self):
return self._controller
# Data properties
@property
def dtype(self):
return numpy.int32
@property
def shape(self):
return (self.controller.get_nchannels(),)
# Get acquisition device
def create_acquisition_device(self, scan_pars, **settings):
scan_pars.update(settings)
count_time = scan_pars.pop("count_time")
return MythenAcquistionDevice(self, count_time, **scan_pars)
class MythenAcquistionDevice(AcquisitionDevice):
# Initialization
def __init__(self, counter, count_time, **kwargs):
self.kwargs = kwargs
self.counter = counter
self.count_time = count_time
valid_names = ("npoints", "trigger_type", "prepare_once", "start_once")
valid_kwargs = {
key: value for key, value in kwargs.items() if key in valid_names
}
super(MythenAcquistionDevice, self).__init__(
counter.controller, counter.controller.name, **valid_kwargs
)
self.channels.append(
AcquisitionChannel(counter.name, counter.dtype, counter.shape)
)
def add_counter(self, counter):
assert self.counter == counter
# Flow control
def prepare(self):
self.device.nframes = self.npoints
self.device.exposure_time = self.count_time
def start(self):
self.device.start()
def trigger(self):
# Software trigger is not supported by the mythen
pass
def reading(self):
spectrum = self.device.readout()
self.channels.update({self.counter.name: spectrum})
def stop(self):
self.device.stop()
This diff is collapsed.
......@@ -58,24 +58,32 @@ class Opiom:
self.__program = config_tree["program"]
self.__base_path = config_tree.get("opiom_prg_root", OPIOM_PRG_ROOT)
self.__debug = False
try:
msg = self.comm("?VER", timeout=50e-3)
except serial.SerialTimeout:
msg = self.comm("?VER", timeout=50e-3)
if not msg.startswith("OPIOM"):
# Sometimes, have to talk twice to the OPIOM in order to get the proper first answer.
for ii in range(2):
try:
msg = self.comm("?VER", timeout=50e-3)
except serial.SerialTimeout:
msg = ""
if msg.startswith("OPIOM"):
break
else:
raise IOError("No opiom connected at %s" % serial)
self.comm("MODE normal")
def __repr__(self):
return "Opiom : %s with program %s" % (self._cnx, self.__program)
return "opiom: %s" % self._cnx
def setDebug(self, flag):
self.__debug = flag is True
def getDebug(self):
@property
def debug(self):
return self.__debug
@debug.setter
def debug(self, flag):
self.__debug = bool(flag)
def __debugMsg(self, wr, msg):
if self.__debug:
print "%-5.5s on %s > %s" % (wr, self.name, msg)
......@@ -142,59 +150,80 @@ class Opiom:
self.__debugMsg("Read", msg.strip("\n\r"))
return msg.strip("\r\n")
def load_program(self):
def load_program(self, prog_name=None):
pldid = self.comm("?PLDID")
file_pldid, file_project = self._getFilePLDIDandPROJECT()
if file_pldid and file_pldid != pldid:
print "Load program:", self.__program
srcsz = int(self.comm("?SRCSZ").split()[0])
offsets, opmfile = self._getoffset()
if (offsets["src_c"] - offsets["src_cc"]) < srcsz:
SRCST = offsets["src_cc"]
srcsz = offsets["src_c"] - offsets["src_cc"]
if prog_name is None:
prog_name = self.__program
if prog_name == "default":
if pldid == "255":
# already default
return
else:
SRCST = offsets["src_c"]
srcsz = offsets["jed"] - offsets["src_c"]
binsz = offsets["size"] - offsets["jed"]
sendarray = opmfile[SRCST : SRCST + srcsz]
sendarray += opmfile[offsets["jed"] :]
if self.comm_ack("MODE program") != "OK":
raise IOError("Can't program opiom %s" % str(self))
print "Uploading default program"
else:
try:
file_pldid, file_project = self._getFilePLDIDandPROJECT(prog_name)
except ValueError:
# invalid unpacking
raise IOError(
"opiom %s: cannot find program %s" % (str(self), prog_name)
)
if (
self.comm_ack(
'PROG %d %d %d %d "%s"'
% (binsz, srcsz, self.FSIZE, int(file_pldid), file_project)
if file_pldid and file_pldid != pldid:
print "Uploading opiom program, please wait"
srcsz = int(self.comm("?SRCSZ").split()[0])
offsets, opmfile = self._getoffset(prog_name)
if (offsets["src_c"] - offsets["src_cc"]) < srcsz:
SRCST = offsets["src_cc"]
srcsz = offsets["src_c"] - offsets["src_cc"]
else:
SRCST = offsets["src_c"]
srcsz = offsets["jed"] - offsets["src_c"]
binsz = offsets["size"] - offsets["jed"]
sendarray = opmfile[SRCST : SRCST + srcsz]
sendarray += opmfile[offsets["jed"] :]
else:
# program already loaded
return
if self.comm_ack("MODE program") != "OK":
raise IOError("Can't program opiom %s" % str(self))
if prog_name == "default":
ans = self.comm_ack("PROG DEFAULT")
sendarray = []
else:
ans = self.comm_ack(
'PROG %d %d %d %d "%s"'
% (binsz, srcsz, self.FSIZE, int(file_pldid), file_project)
)
if ans != "OK":
self.comm("MODE normal")
raise IOError("Can't start programming opiom %s" % str(self))
for frame_n, index in enumerate(range(0, len(sendarray), self.FSIZE)):
with KillMask():
cmd = "#*FRM %d\r" % frame_n
self.raw_write(cmd)
self.raw_bin_write(sendarray[index : index + self.FSIZE])
answer = self._cnx.readline("\r\n")
if answer == "OK":
continue
raise RuntimeError(
"Load program: [%s] returned [%s]" % (cmd.strip(), answer)
)
!= "OK"
):
self.comm("MODE normal")
raise IOError("Can't start programming opiom %s" % str(self))
for frame_n, index in enumerate(range(0, len(sendarray), self.FSIZE)):
with KillMask():
cmd = "#*FRM %d\r" % frame_n
self.raw_write(cmd)
self.raw_bin_write(sendarray[index : index + self.FSIZE])
answer = self._cnx.readline("\r\n")
if answer == "OK":
continue
raise RuntimeError(
"Load program: [%s] returned [%s]" % (cmd.strip(), answer)
)
# waiting end programming
while 1:
stat_num = self.comm("?PSTAT")
self.__debugMsg("Load", stat_num)
try:
stat, percent = stat_num.split()
except ValueError:
stat = stat_num
break
return stat == "DONE"
# waiting end programming
while True:
stat_num = self.comm("?PSTAT")
self.__debugMsg("Load", stat_num)
try:
stat, percent = stat_num.split()
except ValueError:
stat = stat_num
break
return stat == "DONE"
def _display_bits(self, prefix, bits):
for i in range(1, 9):
......@@ -208,8 +237,8 @@ class Opiom:
print
def _getoffset(self):
with remote_open(os.path.join(self.__base_path, self.__program + ".opm")) as f:
def _getoffset(self, prog_name):
with remote_open(os.path.join(self.__base_path, prog_name + ".opm")) as f:
line = f.read(14)
f.seek(0)
opmfile = f.read()
......@@ -227,10 +256,10 @@ class Opiom:
opmfile,
)
def _getFilePLDIDandPROJECT(self):
def _getFilePLDIDandPROJECT(self, prog_name):
TOKEN = "#pldid#"
PROJECT_TOKEN = "#project#"
with remote_open(os.path.join(self.__base_path, self.__program + ".opm")) as f:
with remote_open(os.path.join(self.__base_path, prog_name + ".opm")) as f:
begin = -1
for line in f:
begin = line.find(TOKEN)
......
......@@ -32,7 +32,8 @@ provided by the controller itself::
Here's an working example::
>>> from bliss.config.static import get_config
>>> from bliss.common.scans import timescan, get_data
>>> from bliss.common.scans import timescan
>>> from bliss.data.scan import get_data
>>> config = get_config()
>>> pepu = config.get('pepudcm2')
......
......@@ -244,3 +244,4 @@ def session(beacon):
def pytest_addoption(parser):
parser.addoption("--pepu", help="pepu host name")
parser.addoption("--ct2", help="ct2 address")
parser.addoption("--mythen", action="store", help="mythen host name")
......@@ -5,8 +5,6 @@
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import pytest
def pytest_collection_modifyitems(config, items):
devices = ["pepu", "ct2"]
......
"""Hardware testing module for the Mythen."""
import socket
import pytest
from bliss.controllers.mythenlib import MythenInterface, Polarity
from bliss.controllers.mythenlib import MythenCompatibilityError, MythenCommandError
@pytest.fixture(params=[socket.SOCK_STREAM, socket.SOCK_DGRAM], ids=["TCP", "UDP"])
def mythen(request):
hostname = request.config.getoption("--mythen")
if hostname is None:
pytest.xfail("The --mythen hostname option has to be provided.")
mythen = MythenInterface(hostname, request.param)
yield mythen
mythen.close()
def test_errors(mythen):
with pytest.raises(MythenCompatibilityError) as info:
mythen.get_commandsetid()
assert (
str(info.value)
== "[Version 3.0.0] Command '-get commandsetid' requires version >= 4.0.0"
)
with pytest.raises(MythenCommandError) as info:
mythen._run_command("-idontexist", "int")
assert str(info.value) == "[Error -1] Unknown command ('-idontexist')"
def test_common_getters(mythen):
assert mythen.get_version() == (3, 0, 0)
assert mythen.get_nmodules() == 1
assert mythen.get_modchannels() == (1280,)
def test_general_getters(mythen):
assert mythen.get_assemblydate() == "Mon Aug 21 13:06:10 CEST 2017"
assert all(x in (0, 1) for x in mythen.get_badchannels())
assert mythen.get_commandid() + 1 == mythen.get_commandid()
with pytest.raises(MythenCompatibilityError):
mythen.get_commandsetid()
with pytest.raises(MythenCompatibilityError):
mythen.get_dcstemperature()
with pytest.raises(MythenCompatibilityError):
mythen.get_frameratemax()
with pytest.raises(MythenCompatibilityError):
mythen.get_fwversion()
with pytest.raises(MythenCompatibilityError):
mythen.get_humidity()
with pytest.raises(MythenCompatibilityError):
mythen.get_highvoltage()
with pytest.raises(MythenCompatibilityError):
mythen.get_modfwversion()
assert mythen.get_modnum() == ("0x192",)
assert mythen.get_module() == 0xffff
assert mythen.get_nmaxmodules() == 1
assert mythen.get_sensormaterial() == ("silicon",)
assert mythen.get_sensorthickness() == (1000,)
with pytest.raises(MythenCompatibilityError):
mythen.get_sensorwidth()
assert mythen.get_systemnum() == 256
with pytest.raises(MythenCompatibilityError):
mythen.get_temperature()
with pytest.raises(MythenCompatibilityError):
mythen.get_testversion()
def test_general_commands(mythen):
mythen.select_module(0)
assert mythen.get_module() == 0
mythen.select_all_modules()
assert mythen.get_module() == 0xffff
mythen.set_nmodules(1)
assert mythen.get_nmodules() == 1
mythen.reset()
def test_acquisition_settings(mythen):
mythen.set_delayafterframe(1.)
assert mythen.get_delayafterframe() == 1.
mythen.set_delayafterframe()
assert mythen.get_delayafterframe() == 0
mythen.set_nframes(2)
assert mythen.get_nframes() == 2
mythen.set_nframes()
assert mythen.get_nframes() == 1
mythen.set_nbits(4)
assert mythen.get_nbits() == 4
mythen.set_nbits()
assert mythen.get_nbits() == 24
mythen.set_exposure_time(2.)
assert mythen.get_exposure_time() == 2.
mythen.set_exposure_time()
assert mythen.get_exposure_time() == 1.
status = mythen.get_status()
assert not status.running
assert not status.inactive_exposure
assert status.empty_buffer
with pytest.raises(MythenCompatibilityError):
mythen.get_readouttimes()
def test_acquisition_control(mythen):
# Configuration
mythen.set_delayafterframe(0.05)
mythen.set_exposure_time(0.05)
mythen.set_nframes(5)
# Start acquisition
mythen.start()
# Check status
status = mythen.get_status()
assert status.running
assert not status.inactive_exposure
assert status.empty_buffer
# Wait for first frame
array = mythen.readout()
assert array.shape == (1, 1280)
# Check status
status = mythen.get_status()
assert status.running
assert not status.inactive_exposure
assert status.empty_buffer
# Wait for the last 4 frames
with pytest.raises(MythenCompatibilityError):
mythen.readout(4)
for _ in range(4):
array = mythen.readout(1)
assert array.shape == (1, 1280)
# Check status
status = mythen.get_status()
assert not status.running
assert not status.inactive_exposure
assert status.empty_buffer
# Stop acquisition
mythen.stop()
# Check status
status = mythen.get_status()
assert not status.running
assert not status.inactive_exposure
assert status.empty_buffer
def test_detector_settings(mythen):
mythen.set_energy(9.)
assert mythen.get_energy() == (pytest.approx(9.),)
mythen.set_energy()
assert mythen.get_energy() == (pytest.approx(8.05),)
assert mythen.get_energymin() == (pytest.approx(7.39),)
assert mythen.get_energymax() == (pytest.approx(24.),)
mythen.set_kthresh(7.)
assert mythen.get_kthresh() == (pytest.approx(7.),)
mythen.set_kthresh()
assert mythen.get_kthresh() == (pytest.approx(6.4),)
assert mythen.get_kthreshmin() == (pytest.approx(6.),)
assert mythen.get_kthreshmax() == (pytest.approx(12.),)
mythen.set_kthresh_and_energy(7., 9.)
assert mythen.get_kthresh() == (pytest.approx(7.),)
assert mythen.get_energy() == (pytest.approx(9.),)
mythen.set_kthresh_and_energy()
assert mythen.get_kthresh() == (pytest.approx(6.4),)
assert mythen.get_energy() == (pytest.approx(8.05),)
def test_load_predefined_settings(mythen):
with pytest.raises(ValueError):
mythen.load_predefined_settings("Ar")
for element in ("Cu", "Mo", "Ag"):
mythen.load_predefined_settings(element)
for element in ("Cr",):
with pytest.raises(MythenCommandError):
mythen.load_predefined_settings(element)
def test_data_correction(mythen):
mythen.enable_badchannelinterpolation(True)
assert mythen.badchannelinterpolation_enabled()
mythen.enable_badchannelinterpolation(False)
assert not mythen.badchannelinterpolation_enabled()
mythen.enable_flatfieldcorrection(True)
assert mythen.flatfieldcorrection_enabled()
mythen.enable_flatfieldcorrection(False)
assert not mythen.flatfieldcorrection_enabled()
with pytest.raises(MythenCompatibilityError):
mythen.set_flatfield(0, [1, 2, 3])
with pytest.raises(MythenCompatibilityError):
mythen.load_flatfield(1)
assert mythen.get_flatfield().shape == (1280,)
assert mythen.get_flatfield_cutoff() == 16777216
mythen.enable_ratecorrection(True)
assert mythen.ratecorrection_enabled()
mythen.enable_ratecorrection(False)
assert not mythen.ratecorrection_enabled()
mythen.set_ratecorrection_deadtime(400 * 1e-9)
assert mythen.get_ratecorrection_deadtime() == pytest.approx(400 * 1e-9)
mythen.set_ratecorrection_deadtime(-1)
default = 140.400573 * 1e-9
assert mythen.get_ratecorrection_deadtime() == pytest.approx(default)
def test_trigger_and_gate(mythen):
mythen.enable_continuoustrigger(True)
assert mythen.continuoustrigger_enabled()
mythen.enable_continuoustrigger(False)
assert not mythen.continuoustrigger_enabled()
mythen.enable_singletrigger(True)
assert mythen.singletrigger_enabled()
mythen.enable_singletrigger(False)
assert not mythen.singletrigger_enabled()
mythen.set_delaybeforeframe(.5)
assert mythen.get_delaybeforeframe() == pytest.approx(.5)
mythen.set_delaybeforeframe()
assert mythen.get_delaybeforeframe() == pytest.approx(0.)
mythen.enable_gatemode(True)
assert mythen.gatemode_enabled()
mythen.enable_gatemode(False)
assert not mythen.gatemode_enabled()
mythen.set_ngates(10)
assert mythen.get_ngates() == 10
mythen.set_ngates()
assert mythen.get_ngates() == 1
mythen.set_inputpolarity(Polarity.High)
assert mythen.get_inputpolarity() == Polarity.High
mythen.set_inputpolarity(Polarity.Low)
assert mythen.get_inputpolarity() == Polarity.Low
mythen.set_outputpolarity(Polarity.High)
assert mythen.get_outputpolarity() == Polarity.High
mythen.set_outputpolarity(Polarity.Low)
assert mythen.get_outputpolarity() == Polarity.Low
def test_debugging(mythen):
mythen.start_logging()
with pytest.raises(MythenCompatibilityError):
assert mythen.logging_running()
logs = mythen.stop_logging()
assert "Command: -log argument: stop" in logs
data = mythen.test_pattern()
assert data.shape == (1280,)
assert list(data) == list(range(1280))
@pytest.mark.parametrize("nbits", [4, 8, 16, 24])
def test_raw_readout(mythen, nbits):
# Configuration
mythen.set_nbits(nbits)
mythen.set_delayafterframe(0.05)
mythen.set_exposure_time(0.05)
mythen.set_nframes(1)
# Start acquisition
mythen.start()
# Wait for the frame
array = mythen.raw_readout()
assert array.shape == (1280,)
# Stop acquisition
mythen.stop()
import pytest
import numpy as np
from mock import Mock
from bliss.common import scans
from bliss.controllers.mythen import Mythen
from bliss.controllers import mythenlib
@pytest.fixture
def run_command(monkeypatch):
setters = {}
commands = []
def run_command(sock, command, return_type="int", return_shape=(), payload=b""):
# General commands
if command == "-get version":
return "v3.0.0"
if command == "-get nmodules":
return 1
# Detect setters
args = command[1:].split(" ")
if args[0] == "readout":
assert args[1:] == ["1"]
args = args[:1]
if len(args) == 2 and args[0] != "get":
setters[args[0]] = args[1]
if len(args) == 1:
commands.append(args[0])
# Return values
if return_shape == ():
return {
"int": 0,
"float": 1.23e9 if "tau" in command else 4.56,
"long long": 31400000,
}[return_type]
if return_shape == (1,):
return np.array([run_command(sock, command, return_type, (), payload)])
if return_shape == (1, 1280):
return np.array(range(1280)).reshape(return_shape)
# Not managed
assert False
monkeypatch.setattr(mythenlib, "socket", Mock())
monkeypatch.setattr(mythenlib, "run_command", run_command)
run_command.setters = setters
run_command.commands = commands
yield run_command
def test_myten_basic(run_command):
m = Mythen("test", {"hostname": "mymythen"})
assert m.name == "test"
assert m.hostname == "mymythen"
m._interface._sock.connect.assert_called_once_with(("mymythen", 1030))
assert (
repr(m)
== """\
Mythen on mymythen:
nmodules = 1
delay_after_frame = 3.14
nframes = 0
nbits = 0
exposure_time = 3.14
energy = (4.56,)
threshold = (4.56,)
bad_channel_interpolation = False
flat_field_correction = False
rate_correction = False
rate_correction_deadtime = (1.23,)
continuous_trigger_mode = False
single_trigger_mode = False
delay_before_frame = 3.14
gate_mode = False
ngates = 0
input_polarity = 0
output_polarity = 0"""
)
m.finalize()
m._interface._sock.close.assert_called_once_with()
def test_myten_configuration(run_command):
Mythen("test", {"hostname": "mymythen", "energy": 8.88, "threshold": 4.44})
assert run_command.commands == []
assert run_command.setters == {"energy": "8.88", "kthresh": "4.44"}
def test_myten_reset_configuration(run_command):
Mythen("test", {"hostname": "mymythen", "apply_defaults": True, "gate_mode": True})
assert run_command.commands == ["reset"]
assert run_command.setters == {"gateen": "1"}
def test_mythen_readonly_getters(run_command):
m = Mythen("test", {"hostname": "mymythen"})
assert "get_version" in dir(m)
assert m.get_version() == (3, 0, 0)
def test_mythen_commands(run_command):
m = Mythen("test", {"hostname": "mymythen"})
m.reset()
m.start()
assert list(m.readout()) == list(range(1280))
m.stop()
assert run_command.commands == ["reset", "start", "readout", "stop"]
def test_mythen_run(run_command):
m = Mythen("test", {"hostname": "mymythen"})
data = list(m.run(10, 0.1))
assert len(data) == 10
for frame in data:
assert list(frame) == list(range(1280))
assert run_command.commands == ["start"] + ["readout"] * 10 + ["stop"]
assert run_command.setters == {"frames": "10", "time": "1000000"}
def test_myten_counter(run_command):
m = Mythen("test", {"hostname": "mymythen"})
counter = m.counters.spectrum
assert counter.name == "spectrum"
assert counter.dtype == np.int32
assert counter.shape == (1280,)
assert counter.controller == m
def test_mythen_from_config(run_command, beacon):
m = beacon.get("mythen1")
assert m.hostname == "mymythen"
assert run_command.commands == ["reset"]
def test_mythen_ct_scan(run_command, beacon):
mythen = beacon.get("mythen1")
scan = scans.ct(0.1, mythen, return_scan=True, save=False)
data = scan.get_data()["spectrum"]
assert data.shape == (1, 1280)
assert list(data[0]) == list(range(1280))
def test_mythen_default_chain_with_counter_namespace(run_command, beacon):
m0 = beacon.get("m0")
mythen = beacon.get("mythen1")
scan = scans.ascan(m0, 0, 10, 3, 0.1, mythen, return_scan=True, save=False)
data = scan.get_data()["spectrum"]
assert data.shape == (3, 1280)
assert np.array_equal(data, [list(range(1280))] * 3)
name: mythen1
module: mythen
class: Mythen
plugin: bliss
hostname: mymythen
apply_defaults: True
\ No newline at end of file
......@@ -23,6 +23,7 @@
- spec_m1
- mot_1_disable_cache
- mot_2_disable_cache
- mythen1
synoptic:
svg-file: ./test_synoptic.svg
elements:
......