Commit 4cebcdd1 authored by Vincent Michel's avatar Vincent Michel

Add mythen controller and counter

parent 5e1c11db
"""Controller for the mythen 1d detector."""
import numpy
from .mythenlib import MythenInterface
from bliss.common.measurement import IntegratingCounter, counter_namespace
def interface_property(name, mode=False):
getter_name = name + "_enabled" if mode else "get_" + name
setter_name = "enable_" + name if mode else "set_" + name
assert getter_name in dir(MythenInterface)
assert setter_name in dir(MythenInterface)
def getter(self):
return getattr(self._interface, getter_name)()
def setter(self, value):
return getattr(self._interface, setter_name)(value)
return property(getter, setter)
class Mythen(object):
_settings = [
# General configuration
"nmodules",
# Acquisition configuration
"delay_after_frame",
"nframes",
"nbits",
"exposure_time",
# Detector configuration
"energy",
"threshold",
# Data correction settings
"bad_channel_interpolation",
"flat_field_correction",
"rate_correction",
"rate_correction_deadtime",
# Trigger / Gate settings
"continuous_trigger_mode",
"single_trigger_mode",
"delay_before_frame",
"gate_mode",
"ngates",
"input_polarity",
"output_polarity",
]
def __init__(self, name, config):
self._name = name
self._config = config
self._hostname = config["hostname"]
self._interface = MythenInterface(self._hostname)
self._apply_configuration()
self._counter = MythenCounter(self)
def finalize(self):
self._interface.close()
# Counter access
@property
def counters(self):
return counter_namespace([self._counter])
# Manage configuration
def _apply_configuration(self):
if self._config.get("apply_defaults"):
self.reset()
for key, value in self._config.items():
if key in self._settings:
setattr(self, key, value)
def _get_configuration(self):
return [(key, getattr(self, key)) for key in self._settings]
def __repr__(self):
lines = ["Mythen on {}:".format(self._hostname)]
lines += [
" {:<25s} = {}".format(key, value)
for key, value in self._get_configuration()
]
return "\n".join(lines)
# Bliss properties
@property
def name(self):
return self._name
@property
def hostname(self):
return self._hostname
# General configuration
nmodules = interface_property("nmodules")
# Acquisition configuration
delay_after_frame = interface_property("delayafterframe")
nframes = interface_property("nframes")
nbits = interface_property("nbits")
exposure_time = interface_property("exposure_time")
# Detector configuration
energy = interface_property("energy")
threshold = interface_property("kthresh")
# Data correction configuration
bad_channel_interpolation = interface_property("badchannelinterpolation", mode=True)
flat_field_correction = interface_property("flatfieldcorrection", mode=True)
rate_correction = interface_property("ratecorrection", mode=True)
rate_correction_deadtime = interface_property("ratecorrection_deadtime")
# Trigger / Gate configuration
continuous_trigger_mode = interface_property("continuoustrigger", mode=True)
single_trigger_mode = interface_property("singletrigger", mode=True)
gate_mode = interface_property("gatemode", mode=True)
delay_before_frame = interface_property("delaybeforeframe")
ngates = interface_property("ngates")
input_polarity = interface_property("inputpolarity")
output_polarity = interface_property("outputpolarity")
# Expose all interface getters
def __getattr__(self, attr):
if attr.startswith("get_") and attr in dir(self._interface):
return getattr(self._interface, attr)
raise AttributeError(attr)
def __dir__(self):
lst = list(self.__dict__)
lst += dir(type(self))
lst += [key for key in dir(self._interface) if key.startswith("get_")]
return sorted(set(lst))
# Commands
def reset(self):
self._interface.reset()
def start(self):
self._interface.start()
def stop(self):
self._interface.stop()
def readout(self):
return self._interface.readout(1)[0]
# Acquisition routine
def run(self, acquisition_number=1, acquisition_time=1.):
self.nframes = acquisition_number
self.exposure_time = acquisition_time
try:
self.start()
for _ in range(acquisition_number):
yield self.readout()
finally:
self.stop()
# Mythen counter
class MythenCounter(IntegratingCounter):
def __init__(self, controller):
self.acquisition_device = None
super(MythenCounter, self).__init__("spectrum", controller)
# Data properties
@property
def dtype(self):
return numpy.int32
@property
def shape(self):
return (self.controller.get_nchannels(),)
# Get acquisition device
def create_acquisition_device(self, scan_pars, **settings):
self.acquisition_device = super(MythenCounter, self).create_acquisition_device(
scan_pars, **settings
)
return self.acquisition_device
# Flow control
def prepare(self):
self.nframes = self.acquisition_device.npoints
self.exposure_time = self.acquisition_device.count_time
def start(self):
self.controller.start()
def get_value(self, from_index):
return self.controller.readout()
def stop(self):
self.controller.stop()
self.acquisition_device = None
import pytest
import numpy as np
from mock import Mock
from bliss.common import scans
from bliss.controllers.mythen import Mythen
from bliss.controllers import mythenlib
@pytest.fixture
def run_command(monkeypatch):
setters = {}
commands = []
def run_command(sock, command, return_type="int", return_shape=(), payload=b""):
# General commands
if command == "-get version":
return "v3.0.0"
if command == "-get nmodules":
return 1
# Detect setters
args = command[1:].split(" ")
if args[0] == "readout":
assert args[1:] == ["1"]
args = args[:1]
if len(args) == 2 and args[0] != "get":
setters[args[0]] = args[1]
if len(args) == 1:
commands.append(args[0])
# Return values
if return_shape == ():
return {
"int": 0,
"float": 1.23e9 if "tau" in command else 4.56,
"long long": 31400000,
}[return_type]
if return_shape == (1,):
return np.array([run_command(sock, command, return_type, (), payload)])
if return_shape == (1, 1280):
return np.array(range(1280)).reshape(return_shape)
# Not managed
assert False
monkeypatch.setattr(mythenlib, "socket", Mock())
monkeypatch.setattr(mythenlib, "run_command", run_command)
run_command.setters = setters
run_command.commands = commands
yield run_command
def test_myten_basic(run_command):
m = Mythen("test", {"hostname": "mymythen"})
assert m.name == "test"
assert m.hostname == "mymythen"
m._interface._sock.connect.assert_called_once_with(("mymythen", 1030))
assert (
repr(m)
== """\
Mythen on mymythen:
nmodules = 1
delay_after_frame = 3.14
nframes = 0
nbits = 0
exposure_time = 3.14
energy = (4.56,)
threshold = (4.56,)
bad_channel_interpolation = False
flat_field_correction = False
rate_correction = False
rate_correction_deadtime = (1.23,)
continuous_trigger_mode = False
single_trigger_mode = False
delay_before_frame = 3.14
gate_mode = False
ngates = 0
input_polarity = 0
output_polarity = 0"""
)
m.finalize()
m._interface._sock.close.assert_called_once_with()
def test_myten_configuration(run_command):
Mythen("test", {"hostname": "mymythen", "energy": 8.88, "threshold": 4.44})
assert run_command.commands == []
assert run_command.setters == {"energy": "8.88", "kthresh": "4.44"}
def test_myten_reset_configuration(run_command):
Mythen("test", {"hostname": "mymythen", "apply_defaults": True, "gate_mode": True})
assert run_command.commands == ["reset"]
assert run_command.setters == {"gateen": "1"}
def test_mythen_readonly_getters(run_command):
m = Mythen("test", {"hostname": "mymythen"})
assert "get_version" in dir(m)
assert m.get_version() == (3, 0, 0)
def test_mythen_commands(run_command):
m = Mythen("test", {"hostname": "mymythen"})
m.reset()
m.start()
assert list(m.readout()) == list(range(1280))
m.stop()
assert run_command.commands == ["reset", "start", "readout", "stop"]
def test_mythen_run(run_command):
m = Mythen("test", {"hostname": "mymythen"})
data = list(m.run(10, 0.1))
assert len(data) == 10
for frame in data:
assert list(frame) == list(range(1280))
assert run_command.commands == ["start"] + ["readout"] * 10 + ["stop"]
assert run_command.setters == {"frames": "10", "time": "1000000"}
def test_myten_counter(run_command):
m = Mythen("test", {"hostname": "mymythen"})
counter = m.counters.spectrum
assert counter.name == "spectrum"
assert counter.dtype == np.int32
assert counter.shape == (1280,)
assert counter.controller == m
def test_mythen_from_config(run_command, beacon):
m = beacon.get("mythen1")
assert m.hostname == "mymythen"
assert run_command.commands == ["reset"]
def test_mythen_ct_scan(run_command, beacon):
mythen = beacon.get("mythen1")
scan = scans.ct(0.1, mythen, return_scan=True, save=False)
data = scan.get_data()["spectrum"]
assert data.shape == (1, 1280)
assert list(data[0]) == list(range(1280))
def test_mca_default_chain_with_counter_namespace(run_command, beacon):
m0 = beacon.get("m0")
mythen = beacon.get("mythen1")
scan = scans.ascan(m0, 0, 10, 3, 0.1, mythen, return_scan=True, save=False)
data = scan.get_data()["spectrum"]
assert data.shape == (3, 1280)
assert np.array_equal(data, [list(range(1280))] * 3)
name: mythen1
module: mythen
class: Mythen
plugin: bliss
hostname: mymythen
apply_defaults: True
\ No newline at end of file
......@@ -23,6 +23,7 @@
- spec_m1
- mot_1_disable_cache
- mot_2_disable_cache
- mythen1
synoptic:
svg-file: ./test_synoptic.svg
elements:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment