...
 
Commits (3)
"""Controller for the mythen 1d detector."""
import numpy
from .mythenlib import MythenInterface
from bliss.common.measurement import BaseCounter, counter_namespace
from bliss.scanning.chain import AcquisitionDevice, AcquisitionChannel
def interface_property(name, mode=False):
getter_name = name + "_enabled" if mode else "get_" + name
setter_name = "enable_" + name if mode else "set_" + name
assert getter_name in dir(MythenInterface)
assert setter_name in dir(MythenInterface)
def getter(self):
return getattr(self._interface, getter_name)()
def setter(self, value):
return getattr(self._interface, setter_name)(value)
return property(getter, setter)
class Mythen(object):
_settings = [
# General configuration
"nmodules",
# Acquisition configuration
"delay_after_frame",
"nframes",
"nbits",
"exposure_time",
# Detector configuration
"energy",
"threshold",
# Data correction settings
"bad_channel_interpolation",
"flat_field_correction",
"rate_correction",
"rate_correction_deadtime",
# Trigger / Gate settings
"continuous_trigger_mode",
"single_trigger_mode",
"delay_before_frame",
"gate_mode",
"ngates",
"input_polarity",
"output_polarity",
]
def __init__(self, name, config):
self._name = name
self._config = config
self._hostname = config["hostname"]
self._interface = MythenInterface(self._hostname)
self._apply_configuration()
self._counter = MythenCounter(self)
def finalize(self):
self._interface.close()
# Counter access
@property
def counters(self):
return counter_namespace([self._counter])
# Manage configuration
def _apply_configuration(self):
if self._config.get("apply_defaults"):
self.reset()
for key, value in self._config.items():
if key in self._settings:
setattr(self, key, value)
def _get_configuration(self):
return [(key, getattr(self, key)) for key in self._settings]
def __repr__(self):
lines = ["Mythen on {}:".format(self._hostname)]
lines += [
" {:<25s} = {}".format(key, value)
for key, value in self._get_configuration()
]
return "\n".join(lines)
# Bliss properties
@property
def name(self):
return self._name
@property
def hostname(self):
return self._hostname
# General configuration
nmodules = interface_property("nmodules")
# Acquisition configuration
delay_after_frame = interface_property("delayafterframe")
nframes = interface_property("nframes")
nbits = interface_property("nbits")
exposure_time = interface_property("exposure_time")
# Detector configuration
energy = interface_property("energy")
threshold = interface_property("kthresh")
# Data correction configuration
bad_channel_interpolation = interface_property("badchannelinterpolation", mode=True)
flat_field_correction = interface_property("flatfieldcorrection", mode=True)
rate_correction = interface_property("ratecorrection", mode=True)
rate_correction_deadtime = interface_property("ratecorrection_deadtime")
# Trigger / Gate configuration
continuous_trigger_mode = interface_property("continuoustrigger", mode=True)
single_trigger_mode = interface_property("singletrigger", mode=True)
gate_mode = interface_property("gatemode", mode=True)
delay_before_frame = interface_property("delaybeforeframe")
ngates = interface_property("ngates")
input_polarity = interface_property("inputpolarity")
output_polarity = interface_property("outputpolarity")
# Expose all interface getters
def __getattr__(self, attr):
if attr.startswith("get_") and attr in dir(self._interface):
return getattr(self._interface, attr)
raise AttributeError(attr)
def __dir__(self):
lst = list(self.__dict__)
lst += dir(type(self))
lst += [key for key in dir(self._interface) if key.startswith("get_")]
return sorted(set(lst))
# Commands
def reset(self):
self._interface.reset()
def start(self):
self._interface.start()
def stop(self):
self._interface.stop()
def readout(self):
return self._interface.readout(1)[0]
# Acquisition routine
def run(self, acquisition_number=1, acquisition_time=1.):
self.nframes = acquisition_number
self.exposure_time = acquisition_time
try:
self.start()
for _ in range(acquisition_number):
yield self.readout()
finally:
self.stop()
# Mythen counter
class MythenCounter(BaseCounter):
# Initialization
def __init__(self, controller):
self._name = "spectrum"
self._controller = controller
@property
def name(self):
return self._name
@property
def controller(self):
return self._controller
# Data properties
@property
def dtype(self):
return numpy.int32
@property
def shape(self):
return (self.controller.get_nchannels(),)
# Get acquisition device
def create_acquisition_device(self, scan_pars, **settings):
scan_pars.update(settings)
count_time = scan_pars.pop("count_time")
return MythenAcquistionDevice(self, count_time, **scan_pars)
class MythenAcquistionDevice(AcquisitionDevice):
# Initialization
def __init__(self, counter, count_time, **kwargs):
self.kwargs = kwargs
self.counter = counter
self.count_time = count_time
valid_names = ("npoints", "trigger_type", "prepare_once", "start_once")
valid_kwargs = {
key: value for key, value in kwargs.items() if key in valid_names
}
super(MythenAcquistionDevice, self).__init__(
counter.controller, counter.controller.name, **valid_kwargs
)
self.channels.append(
AcquisitionChannel(counter.name, counter.dtype, counter.shape)
)
def add_counter(self, counter):
assert self.counter == counter
# Flow control
def prepare(self):
self.device.nframes = self.npoints
self.device.exposure_time = self.count_time
def start(self):
self.device.start()
def trigger(self):
# Software trigger is not supported by the mythen
pass
def reading(self):
spectrum = self.device.readout()
self.channels.update({self.counter.name: spectrum})
def stop(self):
self.device.stop()
This diff is collapsed.
......@@ -244,3 +244,4 @@ def session(beacon):
def pytest_addoption(parser):
parser.addoption("--pepu", help="pepu host name")
parser.addoption("--ct2", help="ct2 address")
parser.addoption("--mythen", action="store", help="mythen host name")
......@@ -5,8 +5,6 @@
# Copyright (c) 2016 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import pytest
def pytest_collection_modifyitems(config, items):
devices = ["pepu", "ct2"]
......
"""Hardware testing module for the Mythen."""
import socket
import pytest
from bliss.controllers.mythenlib import MythenInterface, Polarity
from bliss.controllers.mythenlib import MythenCompatibilityError, MythenCommandError
@pytest.fixture(params=[socket.SOCK_STREAM, socket.SOCK_DGRAM], ids=["TCP", "UDP"])
def mythen(request):
hostname = request.config.getoption("--mythen")
if hostname is None:
pytest.xfail("The --mythen hostname option has to be provided.")
mythen = MythenInterface(hostname, request.param)
yield mythen
mythen.close()
def test_errors(mythen):
with pytest.raises(MythenCompatibilityError) as info:
mythen.get_commandsetid()
assert (
str(info.value)
== "[Version 3.0.0] Command '-get commandsetid' requires version >= 4.0.0"
)
with pytest.raises(MythenCommandError) as info:
mythen._run_command("-idontexist", "int")
assert str(info.value) == "[Error -1] Unknown command ('-idontexist')"
def test_common_getters(mythen):
assert mythen.get_version() == (3, 0, 0)
assert mythen.get_nmodules() == 1
assert mythen.get_modchannels() == (1280,)
def test_general_getters(mythen):
assert mythen.get_assemblydate() == "Mon Aug 21 13:06:10 CEST 2017"
assert all(x in (0, 1) for x in mythen.get_badchannels())
assert mythen.get_commandid() + 1 == mythen.get_commandid()
with pytest.raises(MythenCompatibilityError):
mythen.get_commandsetid()
with pytest.raises(MythenCompatibilityError):
mythen.get_dcstemperature()
with pytest.raises(MythenCompatibilityError):
mythen.get_frameratemax()
with pytest.raises(MythenCompatibilityError):
mythen.get_fwversion()
with pytest.raises(MythenCompatibilityError):
mythen.get_humidity()
with pytest.raises(MythenCompatibilityError):
mythen.get_highvoltage()
with pytest.raises(MythenCompatibilityError):
mythen.get_modfwversion()
assert mythen.get_modnum() == ("0x192",)
assert mythen.get_module() == 0xffff
assert mythen.get_nmaxmodules() == 1
assert mythen.get_sensormaterial() == ("silicon",)
assert mythen.get_sensorthickness() == (1000,)
with pytest.raises(MythenCompatibilityError):
mythen.get_sensorwidth()
assert mythen.get_systemnum() == 256
with pytest.raises(MythenCompatibilityError):
mythen.get_temperature()
with pytest.raises(MythenCompatibilityError):
mythen.get_testversion()
def test_general_commands(mythen):
mythen.select_module(0)
assert mythen.get_module() == 0
mythen.select_all_modules()
assert mythen.get_module() == 0xffff
mythen.set_nmodules(1)
assert mythen.get_nmodules() == 1
mythen.reset()
def test_acquisition_settings(mythen):
mythen.set_delayafterframe(1.)
assert mythen.get_delayafterframe() == 1.
mythen.set_delayafterframe()
assert mythen.get_delayafterframe() == 0
mythen.set_nframes(2)
assert mythen.get_nframes() == 2
mythen.set_nframes()
assert mythen.get_nframes() == 1
mythen.set_nbits(4)
assert mythen.get_nbits() == 4
mythen.set_nbits()
assert mythen.get_nbits() == 24
mythen.set_exposure_time(2.)
assert mythen.get_exposure_time() == 2.
mythen.set_exposure_time()
assert mythen.get_exposure_time() == 1.
status = mythen.get_status()
assert not status.running
assert not status.inactive_exposure
assert status.empty_buffer
with pytest.raises(MythenCompatibilityError):
mythen.get_readouttimes()
def test_acquisition_control(mythen):
# Configuration
mythen.set_delayafterframe(0.05)
mythen.set_exposure_time(0.05)
mythen.set_nframes(5)
# Start acquisition
mythen.start()
# Check status
status = mythen.get_status()
assert status.running
assert not status.inactive_exposure
assert status.empty_buffer
# Wait for first frame
array = mythen.readout()
assert array.shape == (1, 1280)
# Check status
status = mythen.get_status()
assert status.running
assert not status.inactive_exposure
assert status.empty_buffer
# Wait for the last 4 frames
with pytest.raises(MythenCompatibilityError):
mythen.readout(4)
for _ in range(4):
array = mythen.readout(1)
assert array.shape == (1, 1280)
# Check status
status = mythen.get_status()
assert not status.running
assert not status.inactive_exposure
assert status.empty_buffer
# Stop acquisition
mythen.stop()
# Check status
status = mythen.get_status()
assert not status.running
assert not status.inactive_exposure
assert status.empty_buffer
def test_detector_settings(mythen):
mythen.set_energy(9.)
assert mythen.get_energy() == (pytest.approx(9.),)
mythen.set_energy()
assert mythen.get_energy() == (pytest.approx(8.05),)
assert mythen.get_energymin() == (pytest.approx(7.39),)
assert mythen.get_energymax() == (pytest.approx(24.),)
mythen.set_kthresh(7.)
assert mythen.get_kthresh() == (pytest.approx(7.),)
mythen.set_kthresh()
assert mythen.get_kthresh() == (pytest.approx(6.4),)
assert mythen.get_kthreshmin() == (pytest.approx(6.),)
assert mythen.get_kthreshmax() == (pytest.approx(12.),)
mythen.set_kthresh_and_energy(7., 9.)
assert mythen.get_kthresh() == (pytest.approx(7.),)
assert mythen.get_energy() == (pytest.approx(9.),)
mythen.set_kthresh_and_energy()
assert mythen.get_kthresh() == (pytest.approx(6.4),)
assert mythen.get_energy() == (pytest.approx(8.05),)
def test_load_predefined_settings(mythen):
with pytest.raises(ValueError):
mythen.load_predefined_settings("Ar")
for element in ("Cu", "Mo", "Ag"):
mythen.load_predefined_settings(element)
for element in ("Cr",):
with pytest.raises(MythenCommandError):
mythen.load_predefined_settings(element)
def test_data_correction(mythen):
mythen.enable_badchannelinterpolation(True)
assert mythen.badchannelinterpolation_enabled()
mythen.enable_badchannelinterpolation(False)
assert not mythen.badchannelinterpolation_enabled()
mythen.enable_flatfieldcorrection(True)
assert mythen.flatfieldcorrection_enabled()
mythen.enable_flatfieldcorrection(False)
assert not mythen.flatfieldcorrection_enabled()
with pytest.raises(MythenCompatibilityError):
mythen.set_flatfield(0, [1, 2, 3])
with pytest.raises(MythenCompatibilityError):
mythen.load_flatfield(1)
assert mythen.get_flatfield().shape == (1280,)
assert mythen.get_flatfield_cutoff() == 16777216
mythen.enable_ratecorrection(True)
assert mythen.ratecorrection_enabled()
mythen.enable_ratecorrection(False)
assert not mythen.ratecorrection_enabled()
mythen.set_ratecorrection_deadtime(400 * 1e-9)
assert mythen.get_ratecorrection_deadtime() == pytest.approx(400 * 1e-9)
mythen.set_ratecorrection_deadtime(-1)
default = 140.400573 * 1e-9
assert mythen.get_ratecorrection_deadtime() == pytest.approx(default)
def test_trigger_and_gate(mythen):
mythen.enable_continuoustrigger(True)
assert mythen.continuoustrigger_enabled()
mythen.enable_continuoustrigger(False)
assert not mythen.continuoustrigger_enabled()
mythen.enable_singletrigger(True)
assert mythen.singletrigger_enabled()
mythen.enable_singletrigger(False)
assert not mythen.singletrigger_enabled()
mythen.set_delaybeforeframe(.5)
assert mythen.get_delaybeforeframe() == pytest.approx(.5)
mythen.set_delaybeforeframe()
assert mythen.get_delaybeforeframe() == pytest.approx(0.)
mythen.enable_gatemode(True)
assert mythen.gatemode_enabled()
mythen.enable_gatemode(False)
assert not mythen.gatemode_enabled()
mythen.set_ngates(10)
assert mythen.get_ngates() == 10
mythen.set_ngates()
assert mythen.get_ngates() == 1
mythen.set_inputpolarity(Polarity.High)
assert mythen.get_inputpolarity() == Polarity.High
mythen.set_inputpolarity(Polarity.Low)
assert mythen.get_inputpolarity() == Polarity.Low
mythen.set_outputpolarity(Polarity.High)
assert mythen.get_outputpolarity() == Polarity.High
mythen.set_outputpolarity(Polarity.Low)
assert mythen.get_outputpolarity() == Polarity.Low
def test_debugging(mythen):
mythen.start_logging()
with pytest.raises(MythenCompatibilityError):
assert mythen.logging_running()
logs = mythen.stop_logging()
assert "Command: -log argument: stop" in logs
data = mythen.test_pattern()
assert data.shape == (1280,)
assert list(data) == list(range(1280))
@pytest.mark.parametrize("nbits", [4, 8, 16, 24])
def test_raw_readout(mythen, nbits):
# Configuration
mythen.set_nbits(nbits)
mythen.set_delayafterframe(0.05)
mythen.set_exposure_time(0.05)
mythen.set_nframes(1)
# Start acquisition
mythen.start()
# Wait for the frame
array = mythen.raw_readout()
assert array.shape == (1280,)
# Stop acquisition
mythen.stop()
import pytest
import numpy as np
from mock import Mock
from bliss.common import scans
from bliss.controllers.mythen import Mythen
from bliss.controllers import mythenlib
@pytest.fixture
def run_command(monkeypatch):
setters = {}
commands = []
def run_command(sock, command, return_type="int", return_shape=(), payload=b""):
# General commands
if command == "-get version":
return "v3.0.0"
if command == "-get nmodules":
return 1
# Detect setters
args = command[1:].split(" ")
if args[0] == "readout":
assert args[1:] == ["1"]
args = args[:1]
if len(args) == 2 and args[0] != "get":
setters[args[0]] = args[1]
if len(args) == 1:
commands.append(args[0])
# Return values
if return_shape == ():
return {
"int": 0,
"float": 1.23e9 if "tau" in command else 4.56,
"long long": 31400000,
}[return_type]
if return_shape == (1,):
return np.array([run_command(sock, command, return_type, (), payload)])
if return_shape == (1, 1280):
return np.array(range(1280)).reshape(return_shape)
# Not managed
assert False
monkeypatch.setattr(mythenlib, "socket", Mock())
monkeypatch.setattr(mythenlib, "run_command", run_command)
run_command.setters = setters
run_command.commands = commands
yield run_command
def test_myten_basic(run_command):
m = Mythen("test", {"hostname": "mymythen"})
assert m.name == "test"
assert m.hostname == "mymythen"
m._interface._sock.connect.assert_called_once_with(("mymythen", 1030))
assert (
repr(m)
== """\
Mythen on mymythen:
nmodules = 1
delay_after_frame = 3.14
nframes = 0
nbits = 0
exposure_time = 3.14
energy = (4.56,)
threshold = (4.56,)
bad_channel_interpolation = False
flat_field_correction = False
rate_correction = False
rate_correction_deadtime = (1.23,)
continuous_trigger_mode = False
single_trigger_mode = False
delay_before_frame = 3.14
gate_mode = False
ngates = 0
input_polarity = 0
output_polarity = 0"""
)
m.finalize()
m._interface._sock.close.assert_called_once_with()
def test_myten_configuration(run_command):
Mythen("test", {"hostname": "mymythen", "energy": 8.88, "threshold": 4.44})
assert run_command.commands == []
assert run_command.setters == {"energy": "8.88", "kthresh": "4.44"}
def test_myten_reset_configuration(run_command):
Mythen("test", {"hostname": "mymythen", "apply_defaults": True, "gate_mode": True})
assert run_command.commands == ["reset"]
assert run_command.setters == {"gateen": "1"}
def test_mythen_readonly_getters(run_command):
m = Mythen("test", {"hostname": "mymythen"})
assert "get_version" in dir(m)
assert m.get_version() == (3, 0, 0)
def test_mythen_commands(run_command):
m = Mythen("test", {"hostname": "mymythen"})
m.reset()
m.start()
assert list(m.readout()) == list(range(1280))
m.stop()
assert run_command.commands == ["reset", "start", "readout", "stop"]
def test_mythen_run(run_command):
m = Mythen("test", {"hostname": "mymythen"})
data = list(m.run(10, 0.1))
assert len(data) == 10
for frame in data:
assert list(frame) == list(range(1280))
assert run_command.commands == ["start"] + ["readout"] * 10 + ["stop"]
assert run_command.setters == {"frames": "10", "time": "1000000"}
def test_myten_counter(run_command):
m = Mythen("test", {"hostname": "mymythen"})
counter = m.counters.spectrum
assert counter.name == "spectrum"
assert counter.dtype == np.int32
assert counter.shape == (1280,)
assert counter.controller == m
def test_mythen_from_config(run_command, beacon):
m = beacon.get("mythen1")
assert m.hostname == "mymythen"
assert run_command.commands == ["reset"]
def test_mythen_ct_scan(run_command, beacon):
mythen = beacon.get("mythen1")
scan = scans.ct(0.1, mythen, return_scan=True, save=False)
data = scan.get_data()["spectrum"]
assert data.shape == (1, 1280)
assert list(data[0]) == list(range(1280))
def test_mythen_default_chain_with_counter_namespace(run_command, beacon):
m0 = beacon.get("m0")
mythen = beacon.get("mythen1")
scan = scans.ascan(m0, 0, 10, 3, 0.1, mythen, return_scan=True, save=False)
data = scan.get_data()["spectrum"]
assert data.shape == (3, 1280)
assert np.array_equal(data, [list(range(1280))] * 3)
name: mythen1
module: mythen
class: Mythen
plugin: bliss
hostname: mymythen
apply_defaults: True
\ No newline at end of file
......@@ -23,6 +23,7 @@
- spec_m1
- mot_1_disable_cache
- mot_2_disable_cache
- mythen1
synoptic:
svg-file: ./test_synoptic.svg
elements:
......