Commit d4a42574 authored by Matias Guijarro's avatar Matias Guijarro
Browse files

Merge branch 'sim1d' into 'master'

Create a simulated 1D counter

See merge request !3689
parents f2a2b1ff 803de542
Pipeline #46755 failed with stages
in 152 minutes and 29 seconds
......@@ -5,13 +5,13 @@
# Copyright (c) 2015-2020 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import numpy as np
import numpy
from scipy import signal
from scipy.special import erf
from bliss.scanning.chain import AcquisitionSlave
from bliss.common.counter import Counter, SoftCounter
from bliss.controllers.counter import CounterController
from bliss.common.counter import Counter, SoftCounter, SamplingMode, SamplingCounter
from bliss.controllers.counter import CounterController, SamplingCounterController
from bliss.common.soft_axis import SoftAxis
from bliss.common.protocols import counter_namespace
......@@ -183,7 +183,7 @@ class SimulationCounterAcquisitionSlave(AcquisitionSlave):
log_debug(self, "SIMULATION_COUNTER_ACQ_DEV -- prepare()")
self._index = 0
#### Get scan paramerters
#### Get scan parameters
nbpoints = self.npoints
# npoints should be 0 only in case of timescan without 'npoints' parameter
......@@ -221,14 +221,14 @@ class SimulationCounterAcquisitionSlave(AcquisitionSlave):
#### Generation of the distribution
if self.distribution == "FLAT":
self.data = np.ones(nbpoints) * self.shape_param["height_factor"]
self.data = numpy.ones(nbpoints) * self.shape_param["height_factor"]
elif self.distribution == "LINEAR":
xdata = np.linspace(scan_start, scan_stop, nbpoints)
xdata = numpy.linspace(scan_start, scan_stop, nbpoints)
self.data = (
xdata * self.shape_param["sigma_factor"] + self.shape_param["mu_offset"]
)
else:
xdata = np.linspace(scan_start, scan_stop, nbpoints)
xdata = numpy.linspace(scan_start, scan_stop, nbpoints)
self.data = self.gauss(
xdata, self.shape_param["mu_offset"], self.shape_param["sigma_factor"]
)
......@@ -239,9 +239,9 @@ class SimulationCounterAcquisitionSlave(AcquisitionSlave):
# applying noise
if self.timescan_or_ct():
noise = (np.random.rand(1)[0] * self.noise_factor) + 1
noise = (numpy.random.rand(1)[0] * self.noise_factor) + 1
else:
noise = (np.random.rand(nbpoints) * self.noise_factor) + 1
noise = (numpy.random.rand(nbpoints) * self.noise_factor) + 1
self.data = self.data * noise
log_debug_data(
self, "SIMULATION_COUNTER_ACQ_DEV -- prepare() -- data+noise=", self.data
......@@ -252,8 +252,8 @@ class SimulationCounterAcquisitionSlave(AcquisitionSlave):
log_debug(self, "SIMULATION_COUNTER_ACQ_DEV -- prepare() END")
def calc_gaussian(self, x, mu, sigma):
one_over_sqtr = 1.0 / np.sqrt(2.0 * np.pi * np.square(sigma))
exp = np.exp(-np.square(x - mu) / (2.0 * np.square(sigma)))
one_over_sqtr = 1.0 / numpy.sqrt(2.0 * numpy.pi * numpy.square(sigma))
exp = numpy.exp(-numpy.square(x - mu) / (2.0 * numpy.square(sigma)))
_val = one_over_sqtr * exp
......@@ -283,7 +283,7 @@ class SimulationCounterAcquisitionSlave(AcquisitionSlave):
sigma = sigma_factor * (xmax - xmin) / 6.0
self.sigma = sigma
self.fwhm = 2 * np.sqrt(2 * np.log(2)) * sigma # ~ 2.35 * sigma
self.fwhm = 2 * numpy.sqrt(2 * numpy.log(2)) * sigma # ~ 2.35 * sigma
log_debug(
self,
......@@ -401,88 +401,131 @@ class SimulationCounter(Counter):
self.config = config
class FixedShapeCounter:
"""Counter which generates a signal of predefined shape.
The predefined shape can be obtained by scanning the
associated software axis from 0 to 1:
class _Signal:
"""Helper to provide a set of predefined 1D signals
s = ascan(self.axis, 0, 1, self.npoints, expo, self.counter)
Arguments:
name: Name of the shape of the signal. See `SIGNALS`.
npoints: Number of points generated for this signal
coef: Coefficient applied to the generator
poissonian: If true the signal is randomized with poissonian filter
"""
@staticmethod
def _missing_edge_of_gaussian_left(npoints, frac_missing):
p = npoints // 2
p2 = int(p * frac_missing)
return np.concatenate(
(signal.gaussian(p, .1 * npoints)[p2:], np.zeros(p2), np.zeros(npoints - p))
return numpy.concatenate(
(
signal.gaussian(p, .1 * npoints)[p2:],
numpy.zeros(p2),
numpy.zeros(npoints - p),
)
)
SIGNALS = {
"sawtooth": lambda npoints: signal.sawtooth(
np.arange(0, 2 * np.pi * 1.1, 2 * np.pi * 1.1 / npoints), width=.9
numpy.arange(0, 2 * numpy.pi * 1.1, 2 * numpy.pi * 1.1 / npoints), width=.9
),
"gaussian": lambda npoints: signal.gaussian(npoints, .2 * npoints),
"flat": lambda npoints: np.ones(npoints),
"off_center_gaussian": lambda npoints: np.concatenate(
"flat": lambda npoints: numpy.ones(npoints),
"off_center_gaussian": lambda npoints: numpy.concatenate(
(
np.zeros(npoints - npoints // 2),
numpy.zeros(npoints - npoints // 2),
signal.gaussian(npoints // 2, .1 * npoints),
)
),
"missing_edge_of_gaussian_left": lambda npoints: FixedShapeCounter._missing_edge_of_gaussian_left(
"missing_edge_of_gaussian_left": lambda npoints: _Signal._missing_edge_of_gaussian_left(
npoints, 0.25
),
"missing_edge_of_gaussian_right": lambda npoints: FixedShapeCounter._missing_edge_of_gaussian_left(
"missing_edge_of_gaussian_right": lambda npoints: _Signal._missing_edge_of_gaussian_left(
npoints, 0.25
)[
::-1
],
"half_gaussian_right": lambda npoints: FixedShapeCounter._missing_edge_of_gaussian_left(
"half_gaussian_right": lambda npoints: _Signal._missing_edge_of_gaussian_left(
npoints, 0.4
),
"half_gaussian_left": lambda npoints: FixedShapeCounter._missing_edge_of_gaussian_left(
"half_gaussian_left": lambda npoints: _Signal._missing_edge_of_gaussian_left(
npoints, 0.4
)[
::-1
],
"triangle": lambda npoints: np.concatenate(
)[::-1],
"triangle": lambda npoints: numpy.concatenate(
(
np.arange(0, 1, 1 / (npoints // 2)),
np.flip(np.arange(0, 1, 1 / (npoints - npoints // 2))),
numpy.arange(0, 1, 1 / (npoints // 2)),
numpy.flip(numpy.arange(0, 1, 1 / (npoints - npoints // 2))),
)
),
"square": lambda npoints: np.concatenate(
"square": lambda npoints: numpy.concatenate(
(
np.zeros(npoints // 3),
np.ones(npoints // 3),
np.zeros(npoints - 2 * (npoints // 3)),
numpy.zeros(npoints // 3),
numpy.ones(npoints // 3),
numpy.zeros(npoints - 2 * (npoints // 3)),
)
),
"bimodal": lambda npoints: np.concatenate(
"bimodal": lambda npoints: numpy.concatenate(
(
signal.gaussian(npoints - npoints // 2, .15 * npoints) * 1.5,
signal.gaussian(npoints // 2, .15 * npoints),
)
),
"step_down": lambda npoints: np.concatenate(
(np.ones(npoints // 2), np.zeros(npoints - npoints // 2))
"step_down": lambda npoints: numpy.concatenate(
(numpy.ones(npoints // 2), numpy.zeros(npoints - npoints // 2))
),
"step_up": lambda npoints: np.concatenate(
(np.zeros(npoints // 2), np.ones(npoints - npoints // 2))
"step_up": lambda npoints: numpy.concatenate(
(numpy.zeros(npoints // 2), numpy.ones(npoints - npoints // 2))
),
"erf_down": lambda npoints: 1 - erf(np.arange(-3, 3, 6 / (npoints))),
"erf_up": lambda npoints: erf(np.arange(-3, 3, 6 / (npoints))),
"erf_down": lambda npoints: 1 - erf(numpy.arange(-3, 3, 6 / (npoints))),
"erf_up": lambda npoints: erf(numpy.arange(-3, 3, 6 / (npoints))),
"inverted_gaussian": lambda npoints: 1 - signal.gaussian(npoints, .2 * npoints),
"expo_gaussian": lambda npoints: np.exp(
"expo_gaussian": lambda npoints: numpy.exp(
signal.gaussian(npoints, .1 * npoints) * 30
),
"linear_up": lambda npoints: numpy.linspace(0, 1, num=npoints),
}
def __init__(self, signal="sawtooth", npoints=50):
def __init__(
self,
name: str = "sawtooth",
npoints: int = 50,
poissonian: bool = False,
coef: float = 1.0,
):
if name not in self.SIGNALS:
raise RuntimeError(f"Signal name '{name}' undefined")
self.name = name
self.npoints = npoints
self.poissonian = poissonian
self.coef = coef
def compute(self) -> numpy.ndarray:
generator = self.SIGNALS[self.name]
signal = generator(self.npoints)
if self.coef != 1:
signal = signal * self.coef
if self.poissonian:
signal = numpy.random.poisson(signal)
return signal
class FixedShapeCounter:
"""Counter which generates a signal of predefined shape.
The predefined shape can be obtained by scanning the
associated software axis from 0 to 1:
.. code-block:: python
dev = FixedShapeCounter()
s = ascan(dev.axis, 0, 1, dev.npoints, expo, dev.counter)
"""
SIGNALS = set(_Signal.SIGNALS.keys())
def __init__(self, signal: str = "sawtooth", npoints: int = 50):
self._axis = SoftAxis("TestAxis", self)
self._counter = SoftCounter(self)
self._npoints = npoints
self.signal = signal
self._signal = _Signal(name=signal, npoints=npoints)
self._position = 0
@property
......@@ -508,16 +551,16 @@ class FixedShapeCounter:
ascan of the associated axis between 0 and 1. This ensures
the predefined signal shape.
"""
self._data = self.SIGNALS[self._signal](self._npoints)
self._data = self._signal.compute()
@property
def signal(self):
return self._signal
return self._signal.name
@signal.setter
def signal(self, value):
assert value in self.SIGNALS
self._signal = value
self._signal.name = value
self.init_signal()
@property
......@@ -546,6 +589,55 @@ class FixedShapeCounter:
return self._axis
class OneDimSimulationController(SamplingCounterController):
"""Controller to handle `OneDimSimulationCounter`"""
def read(self, counter):
return counter._get_data()
class OneDimSimulationCounter(SamplingCounter):
"""Create a 1D simulated counter.
Arguments:
signal: Name of the signal shape to generate. See `_Signal.SIGNALS`
size: Number of points to generate
coef: Coefficient applied to the generator
poissonian: If true the signal is randomized with poissonian filter
"""
def __init__(
self,
name: str,
controller: OneDimSimulationController,
conversion_function=None,
unit=None,
signal: str = "sawtooth",
coef: float = 1.0,
poissonian: bool = False,
size: int = 32,
):
SamplingCounter.__init__(
self,
name=name,
mode=SamplingMode.SINGLE,
unit=unit,
conversion_function=conversion_function,
controller=controller,
)
self._size = size
self._signal = _Signal(
name=signal, npoints=size, poissonian=poissonian, coef=coef
)
def _get_data(self):
return self._signal.compute()
@property
def shape(self):
return (self._size,)
class AutoFilterDetMon:
"""Simulated monitor (flat signal) and detector counter with
has a gaussian shape with maximum :code:`monitor * transmission`.
......@@ -593,7 +685,7 @@ class AutoFilterDetMon:
def init_signal(self):
n = self._npoints + 1
stdev = .1 * self._npoints + 1
self._data = np.exp(signal.gaussian(n, stdev) * 10)
self._data = numpy.exp(signal.gaussian(n, stdev) * 10)
@property
def npoints(self):
......
......@@ -343,15 +343,10 @@ class SamplingCounterAcquisitionSlave(BaseCounterAcquisitionSlave):
else:
# SINGLE_COUNT case
acc_value = numpy.array(
[
counters[i].conversion_function(x)
for i, x in enumerate(self.device.read_all(*self._counters))
],
dtype=numpy.double,
ndmin=1,
)
samples = [[v] for v in acc_value]
data = [
counters[i].conversion_function(x)
for i, x in enumerate(self.device.read_all(*self._counters))
]
acc_read_time = 0
nb_read = 1
......@@ -373,9 +368,15 @@ class SamplingCounterAcquisitionSlave(BaseCounterAcquisitionSlave):
stats.append(st)
# apply the necessary operation per channel to convert the read data depending on the mode of each channel
data = self.apply_vectorized(
self.mode_helpers, acc_value, stats, samples, nb_read, self.count_time
)
if not self._SINGLE_COUNT:
data = self.apply_vectorized(
self.mode_helpers,
acc_value,
stats,
samples,
nb_read,
self.count_time,
)
for counter, channel_list in self._counters.items():
if counter.mode == SamplingMode.SAMPLES:
......
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2015-2020 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import numpy
from bliss.controllers.simulation_counter import (
OneDimSimulationCounter,
OneDimSimulationController,
)
from bliss.common.counter import SamplingMode
from bliss.common.scans import loopscan
def test_onedim_simulation_counter__linear_up():
controller = OneDimSimulationController(name="onedim")
c1 = OneDimSimulationCounter(
"c1", controller=controller, signal="linear_up", coef=10, size=5
)
data = controller.read(c1)
expected = numpy.array([0, 2.5, 5, 7.5, 10])
numpy.testing.assert_array_equal(data, expected)
def test_onedim_simulation_counter__gaussian():
controller = OneDimSimulationController(name="onedim")
c1 = OneDimSimulationCounter(
"c1", controller=controller, signal="gaussian", coef=10, size=5
)
data = controller.read(c1)
expected = numpy.array([1.35, 6.07, 10., 6.07, 1.35])
numpy.testing.assert_array_almost_equal(data, expected, decimal=2)
def test_onedim_simulation_counter__acquisition(session):
"""Check acaquisition from a OneDimSimulationCounter"""
controller = OneDimSimulationController(name="onedim")
counter = OneDimSimulationCounter(
"onedim", controller, signal="linear_up", coef=10, size=5
)
assert counter.mode == SamplingMode.SINGLE
loops = loopscan(5, .1, counter, save=False)
data = loops.get_data()["onedim"]
expected = numpy.array([0., 2.5, 5., 7.5, 10.])
assert len(data) == 5
numpy.testing.assert_array_equal(data[0], expected)
numpy.testing.assert_array_equal(data[-1], expected)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment