Commit 8342fed4 authored by Cyril Guilloud's avatar Cyril Guilloud

added multi distributions and XCALIBU capacities:

* multi dist: user can define many distributions, they will be sum to create counter data.
* XCALIBU: user can give a calibaration file in config to load TABLE or POLYNOMIAL calib as distribution.
* class name to use: SimulationCounter
parent c8edbb53
Pipeline #11643 failed with stages
in 33 minutes and 40 seconds
......@@ -12,14 +12,21 @@ import pprint
from bliss.scanning.chain import AcquisitionDevice, AcquisitionChannel
from bliss.scanning.acquisition.counter import SamplingMode
from bliss.common.measurement import GroupedReadMixin, Counter
from bliss.common import session
# for logging
import logging
from bliss.common import mapping
from bliss.common.logtools import LogMixin
try:
import xcalibu
XCALIBU_IMPORTED = True
except:
XCALIBU_IMPORTED = False
"""
`simulation_counter` allows to define a fake counter.
`SimulationCounter` allows to define a fake counter.
This fake counter is usable in a `ct` or in a scan.
......@@ -31,12 +38,13 @@ It returns floats numbers that can be:
If included in a scan (except timescan/loopscan without predefined
number of points), it returns values according to a user defined
distribution:
sum of distribution(s).
types of the distribution can be:
* FLAT (constant value)
* GAUSSIAN
Parameters:
* <distribution>: 'GAUSSIAN' | 'FLAT'
* <noise_factor>:
......@@ -56,18 +64,26 @@ Parameters if using GAUSSIAN:
# configuration example:
"""
-
name: sim_ct
name: sim_ct_1
plugin: bliss
class: simulation_counter
distribution: GAUSSIAN
mu_offset: 1.0
sigma_factor: 1.0
height_factor: 1.0
noise_factor: 0.005
class: SimulationCounter
distributions:
- type: GAUSSIAN
name: g1
mu_offset: 1.0
sigma_factor: 1.0
height_factor: 1.0
noise_factor: 0.005
- type: GAUSSIAN
name: g2
mu_offset: -3.0
sigma_factor: 2.0
height_factor: 0.2
noise_factor: 0.005
-
name: sim_ct_2
plugin: bliss
class: simulation_counter
class: SimulationCounter
distribution: FLAT
height_factor: 1.0
noise_factor: 0.005
......@@ -110,17 +126,20 @@ dscan(m1,-1,1, 13, 0.01)
class SimulationCounter_AcquisitionDevice(AcquisitionDevice, LogMixin):
def __init__(self, counter, scan_param, distribution, gauss_param, noise_factor):
session.get_current().map.register(self)
def __init__(self, counter, scan_param, distributions):
self._logger.debug(
"SIMULATION_COUNTER_ACQ_DEV -- SimulationCounter_AcquisitionDevice()"
"SIMULATIONCOUNTER_ACQ_DEV -- SimulationCounter_AcquisitionDevice()"
)
# , gauss_param, noise_factor
self.counter = counter
self.scan_param = scan_param
self.distribution = distribution
self.gauss_param = gauss_param
self.noise_factor = noise_factor
self.distributions = distributions
# self.gauss_param = gauss_param
# self.noise_factor = noise_factor
self.scan_type = self.scan_param.get("type")
AcquisitionDevice.__init__(
......@@ -128,8 +147,8 @@ class SimulationCounter_AcquisitionDevice(AcquisitionDevice, LogMixin):
None,
counter.name,
npoints=self.scan_param.get("npoints"),
prepare_once=True, # Do not call prepare at each point.
start_once=True, # Do not call start at each point.
prepare_once=True, # Do not call prepare() at each point.
start_once=True, # Do not call start() at each point.
)
# add a new channel (data) to the acq dev.
......@@ -146,15 +165,15 @@ class SimulationCounter_AcquisitionDevice(AcquisitionDevice, LogMixin):
return False
def prepare(self):
self._logger.debug("SIMULATION_COUNTER_ACQ_DEV -- prepare()")
self._logger.debug("SIMULATIONCOUNTER_ACQ_DEV -- prepare()")
self._index = 0
#### Get scan paramerters
nbpoints = self.scan_param.get("npoints")
points_count = self.scan_param.get("npoints")
# npoints should be 0 only in case of timescan without 'npoints' parameter
if nbpoints == 0:
nbpoints = 1
if points_count == 0:
points_count = 1
if self.is_count_scan() or self.scan_type in ["pointscan"]:
# ct, timescan(without npoints), pointscan
......@@ -163,88 +182,190 @@ class SimulationCounter_AcquisitionDevice(AcquisitionDevice, LogMixin):
elif self.scan_type in ["loopscan", "timescan"]:
# no user defined start/stop or timescan-with-npoints
scan_start = 0
scan_stop = nbpoints
scan_stop = points_count
else:
# ascan etc.
scan_start = self.scan_param.get("start")[0]
scan_stop = self.scan_param.get("stop")[0]
self._logger.debug(
f"SIMULATION_COUNTER_ACQ_DEV -- prepare() -- type={self.scan_type} \
nbpoints={nbpoints} start={scan_start} stop={scan_stop}"
f"SIMULATIONCOUNTER_ACQ_DEV -- prepare() -- type={self.scan_type} \
points_count={points_count} start={scan_start} stop={scan_stop}"
)
#### Get gaussian distribution parameters
#### Generation of the distribution
# base data
self.data = np.ones(points_count)
# if self.is_count_scan() or self.distribution == "FLAT":
# self._logger.debug(
# "SIMULATIONCOUNTER_ACQ_DEV -- prepare() -- is count scan or FLAT"
# )
# else:
# self._logger.debug(
# "SIMULATIONCOUNTER_ACQ_DEV -- prepare() -- neither count nor FLAT"
# )
#
# Sums all distributions:
for dist in self.distributions:
dist.update({"scan_start": scan_start})
dist.update({"scan_stop": scan_stop})
(dist_data, stats) = self.get_distribution_data(dist, points_count)
self.data += dist_data
self._logger.debug("SIMULATIONCOUNTER_ACQ_DEV -- prepare() -- data=")
self._logger.debug(self.data)
mu_offset = self.gauss_param.get("mu_offset", 0.0)
sigma_factor = self.gauss_param.get("sigma_factor", 1.0)
self.height_factor = self.gauss_param.get("height_factor", 1.0)
self.counter.data = self.data
self._logger.debug(f"SIMULATIONCOUNTER_ACQ_DEV -- prepare() END")
_dbg_string = f"SIMULATION_COUNTER_ACQ_DEV -- prepare() -- distribution={self.distribution}"
_dbg_string += f"mu_offset={mu_offset:g} sigma_factor={sigma_factor}"
_dbg_string += (
f"height_factor={self.height_factor} noise_factor={self.noise_factor}"
)
self._logger.debug(_dbg_string)
def calc_gaussian(self, x, mu, sigma):
one_over_sqtr = 1.0 / np.sqrt(2.0 * np.pi * np.square(sigma))
exp = np.exp(-np.square(x - mu) / (2.0 * np.square(sigma)))
#### Generation of the distribution
# base data
if self.is_count_scan() or self.distribution == "FLAT":
self._logger.debug(
"SIMULATION_COUNTER_ACQ_DEV -- prepare() -- is count scan or FLAT"
)
self.data = np.ones(nbpoints)
else:
self._logger.debug(
"SIMULATION_COUNTER_ACQ_DEV -- prepare() -- neither count nor FLAT"
)
self.data = np.linspace(scan_start, scan_stop, nbpoints)
_val = one_over_sqtr * exp
self._logger.debug("SIMULATION_COUNTER_ACQ_DEV -- prepare() -- data(linspace)=")
self._logger.debug(self.data)
return _val
def get_distribution_data(self, distribution, points_count):
"""
Returns a numpy array with distribution of points according to
corresponding distribution.
"""
# creates distribution
if self.is_count_scan() or self.distribution == "FLAT":
self._logger.debug(f"SIMULATION_COUNTER_ACQ_DEV -- prepare() -- FLAT")
pass
if points_count == 1:
point_value = np.random.rand() * distribution.get("noise_factor", 1)
single_point_dist = [point_value]
stats = {"average": point_value}
return (single_point_dist, stats)
if distribution["type"] == "FLAT":
return self.get_flat_distribution(distribution, points_count)
elif distribution["type"] == "GAUSSIAN":
return self.get_gaussian_distribution(distribution, points_count)
elif distribution["type"] == "XCALIBU":
return self.get_xcalibu_distribution(distribution, points_count)
else:
self._logger.debug(
f"SIMULATION_COUNTER_ACQ_DEV -- prepare() -- GAUSSIAN -- start={scan_start} stop={scan_stop} nbpoints={nbpoints}"
)
self.data = self.gauss(self.data, mu_offset, sigma_factor)
raise ValueError("Unknown distribution type")
self._logger.debug("SIMULATION_COUNTER_ACQ_DEV -- prepare() -- data=")
self._logger.debug(self.data)
def get_flat_distribution(self, distribution, points_count):
"""
Parameters:
* <distribution>: a dict of parameters:
* <>: ???
* <points_count>: Number of points of the returned array
Returns a tuple: (flat_dist, stats)
* flat_dist: a numpy array (dim 1) of length <points_count>
* stats: a dict of statistics
* 'average': ???
* 'stdev': ???
"""
stats = dict()
f_dist = np.ones(points_count)
stats["average"] = 4.0
stats["stdev"] = 5.0
return (f_dist, stats)
def get_xcalibu_distribution(self, distribution, points_count):
"""
Parameters:
* <distribution>: a dict of parameters:
* <>: ???
* <points_count>: Number of points of the returned array
Returns a tuple: (x_dist, stats)
* x_dist: a numpy array (dim 1) of length <points_count> created using
xcalibu calibration
* stats: a dict of statistics
* 'average': ???
* 'stdev': ???
"""
stats = dict()
calib_file_name = distribution["xcalibu_file"]
from bliss.config.conductor.client import remote_open
with remote_open(calib_file_name) as calibration:
calibration_string = calibration.read()
calib = xcalibu.Xcalibu(
calib_string=calibration_string.decode(),
reconstruction_method="INTERPOLATION",
)
min_calib = calib.min_x()
max_calib = calib.max_x()
x_dist = np.linspace(min_calib, max_calib, points_count)
for ii in range(len(x_dist)):
x_dist[ii] = calib.get_y(x_dist[ii])
return (x_dist, stats)
def get_gaussian_distribution(self, distribution, points_count):
"""
Parameters:
* <distribution>: a dict of parameters:
* <mu_offset>:
* <sigma_factor>:
* <height_factor>:
* <noise_factor>:
* <points_count>: Number of points of the returned array
Returns a tuple: (gauss_dist, stats)
* gauss_dist: a numpy array (dim 1) of length <points_count>
* stats: a dict of statistics
"""
scan_start = distribution["scan_start"]
scan_stop = distribution["scan_stop"]
self._logger.debug("gaussian distrib requiered")
self._logger.debug(
f" param: nb{points_count} start{scan_start} stop{scan_stop}"
)
stats = dict()
_dist = np.linspace(scan_start, scan_stop, points_count)
mu_offset = distribution["mu_offset"]
sigma_factor = distribution["sigma_factor"]
height_factor = distribution["height_factor"]
noise_factor = distribution["noise_factor"]
g_dist = self.gauss(_dist, mu_offset, sigma_factor)
# applying Y factor.
self.data = self.data * self.height_factor
g_dist = g_dist * height_factor
self._logger.debug("self.data with height_factor=")
self._logger.debug(self.data)
self._logger.debug(g_dist)
# computing noise.
if self.is_count_scan():
noise = (np.random.rand(1)[0] * self.noise_factor) + 1
noise = (np.random.rand(1)[0] * noise_factor) + 1
else:
noise = (np.random.rand(nbpoints) * self.noise_factor) + 1
noise = (np.random.rand(points_count) * noise_factor) + 1
self._logger.debug("noise=")
self._logger.debug(noise)
# applying noise.
self.data = self.data * noise
self._logger.debug("self.data with noise=")
self._logger.debug(self.data)
self.counter.data = self.data
self._logger.debug(f"SIMULATION_COUNTER_ACQ_DEV -- prepare() END")
g_dist = g_dist * noise
self._logger.debug("g_dist with noise=")
self._logger.debug(g_dist)
def calc_gaussian(self, x, mu, sigma):
one_over_sqtr = 1.0 / np.sqrt(2.0 * np.pi * np.square(sigma))
exp = np.exp(-np.square(x - mu) / (2.0 * np.square(sigma)))
_val = one_over_sqtr * exp
stats["mu"] = 1.0
stats["sigma"] = 2.0
stats["fwhm"] = 3.0
return _val
return (g_dist, stats)
def gauss(self, x, mu_offset, sigma_factor):
"""
......@@ -256,6 +377,8 @@ class SimulationCounter_AcquisitionDevice(AcquisitionDevice, LogMixin):
"""
# x is: array([], shape=(1, 0), dtype=float64)
xmin = min(x)
xmax = max(x)
mu = (xmax + xmin) / 2.0
......@@ -273,24 +396,24 @@ class SimulationCounter_AcquisitionDevice(AcquisitionDevice, LogMixin):
self.fwhm = 2 * np.sqrt(2 * np.log(2)) * sigma # ~ 2.35 * sigma
self._logger.debug(
f"SIMULATION_COUNTER_ACQ_DEV -- xmin={xmin} xmax={xmax} mu_offset={mu_offset:g} mu={mu:g} sigma={sigma:g}"
f"SIMULATIONCOUNTER_ACQ_DEV -- xmin={xmin} xmax={xmax} mu_offset={mu_offset:g} mu={mu:g} sigma={sigma:g}"
)
_val = self.calc_gaussian(x, mu, sigma)
self._logger.debug(f"SIMULATION_COUNTER_ACQ_DEV -- gauss() -- returns {_val}")
self._logger.debug(f"SIMULATIONCOUNTER_ACQ_DEV -- gauss() -- returns {_val}")
return _val
def start(self):
self._logger.debug(f"SIMULATION_COUNTER_ACQ_DEV -- start()")
self._logger.debug(f"SIMULATIONCOUNTER_ACQ_DEV -- start()")
pass
def stop(self):
self._logger.debug("SIMULATION_COUNTER_ACQ_DEV -- stop()")
if self.distribution == "GAUSSIAN" and not self.is_count_scan():
print(
f"SIMULATION_COUNTER_ACQ_DEV -- (Theorical values) {self.name} mu={self.mu:g} sigma={self.sigma:g} fwhm={self.fwhm:g}"
)
self._logger.debug("SIMULATIONCOUNTER_ACQ_DEV -- stop()")
# if self.distribution_type == "GAUSSIAN" and not self.is_count_scan():
# print(
# f"SIMULATIONCOUNTER_ACQ_DEV -- (Theorical values) {self.name} mu={self.mu:g} sigma={self.sigma:g} fwhm={self.fwhm:g}"
# )
pass
def trigger(self):
......@@ -300,7 +423,7 @@ class SimulationCounter_AcquisitionDevice(AcquisitionDevice, LogMixin):
* called during timescan()
"""
self._logger.debug(
f"SIMULATION_COUNTER_ACQ_DEV -- **************** trigger() **************************"
f"SIMULATIONCOUNTER_ACQ_DEV -- **************** trigger() **************************"
)
if self._logger.isEnabledFor(logging.DEBUG):
print(self.data)
......@@ -313,7 +436,7 @@ class SimulationCounter_AcquisitionDevice(AcquisitionDevice, LogMixin):
if not self.is_count_scan():
self._index += 1
self._logger.debug(f"SIMULATION_COUNTER_ACQ_DEV -- trigger() END")
self._logger.debug(f"SIMULATIONCOUNTER_ACQ_DEV -- trigger() END")
class SimulationCounter(Counter, LogMixin):
......@@ -325,31 +448,35 @@ class SimulationCounter(Counter, LogMixin):
self.scan_pars = None
def create_acquisition_device(self, scan_pars):
self._logger.debug("SIMULATION_COUNTER -- create_acquisition_device")
self._logger.debug("SIMULATIONCOUNTER -- create_acquisition_device")
# list of distributions
distributions = self.config.get("distributions")
mu_offset = self.config.get("mu_offset", 0.0)
sigma_factor = self.config.get("sigma_factor", 1.0)
height_factor = self.config.get("height_factor", 1.0)
# mu_offset = self.config.get("mu_offset", 0.0)
# sigma_factor = self.config.get("sigma_factor", 1.0)
# height_factor = self.config.get("height_factor", 1.0)
gauss_param = {
"mu_offset": mu_offset,
"sigma_factor": sigma_factor,
"height_factor": height_factor,
}
# gauss_param = {
# "mu_offset": mu_offset,
# "sigma_factor": sigma_factor,
# "height_factor": height_factor,
# }
self.acq_device = SimulationCounter_AcquisitionDevice(
self,
scan_param=scan_pars,
distribution=self.config.get("distribution", "FLAT"),
gauss_param=gauss_param,
noise_factor=self.config.get("noise_factor", 0.0),
distributions=distributions
# self.config.get("distribution", "FLAT"),
# gauss_param=gauss_param,
# noise_factor=self.config.get("noise_factor", 0.0),
)
self._logger.debug("SIMULATION_COUNTER -- COUNTER CONFIG")
self._logger.debug("SIMULATIONCOUNTER -- COUNTER CONFIG")
if self._logger.isEnabledFor(logging.DEBUG):
pprint.pprint(self.config)
self._logger.debug("SIMULATION_COUNTER -- SCAN_PARS")
self._logger.debug("SIMULATIONCOUNTER -- SCAN_PARS")
if self._logger.isEnabledFor(logging.DEBUG):
pprint.pprint(scan_pars)
......@@ -362,15 +489,15 @@ class SimulationCounter(Counter, LogMixin):
self.scan_pars = scan_pars
self._logger.debug("SIMULATION_COUNTER -- create_acquisition_device END")
self._logger.debug("SIMULATIONCOUNTER -- create_acquisition_device END")
return self.acq_device
def get_acquisition_device(self):
self._logger.debug("SIMULATION_COUNTER -- get_acquisition_device()")
self._logger.debug("SIMULATIONCOUNTER -- get_acquisition_device()")
return self.acq_device
def read(self):
self._logger.debug("SIMULATION_COUNTER -- read()")
self._logger.debug("SIMULATIONCOUNTER -- read()")
return 33
# If no controller, a warning is emited in `master_to_devices_mapping()`
......
# BLISS simulation devices configuration
This chapter explains how to configure simulation BLISS devices:
......@@ -57,10 +58,9 @@ controller:
## Counter
A pretty generic simulation counter is provided by
`simulation_counter` module to define a fake counter.
A pretty generic simulation counter is provided by `SimulationCounter` module.
This fake counter is usable in a `ct` or in a [default
Such a simulation counter is usable in a `ct` or in a [default
scan](scan_default.md).
It returns floats numbers that can be:
......@@ -68,22 +68,25 @@ It returns floats numbers that can be:
* constant
* random
* following a gaussian distribution
* following a calibration defined by an xcalibu calibration file.
* see: https://gitlab.esrf.fr/bliss/xcalibu
If included in a scan (except timescan/loopscan without predefined
number of points), it returns values according to a user defined
distribution:
If included in a scan (except timescan/loopscan without predefined number of
points), it returns values according to a user defined distribution:
* FLAT (constant value)
* GAUSSIAN
* FLAT: constant value
* GAUSSIAN: gaussian distribution
* XCALIBU: xcalibu calibration defined by polynom or by a table
If included in a `ct` or a `timescan`, it returns either a constant
If included in a `ct` or a `timescan`, it returns either a constant or a random
value.
Returned values can be altered by adding a random "noise".
### Parameters
* `<distribution>`: 'GAUSSIAN' | 'FLAT'
* `<distributions>`: list of distributions
* `<type>`: 'GAUSSIAN' | 'FLAT' | 'XCALIBU'
* `<noise_factor>`:
* `>= 0.0`
* add a random noise to the distribution
......@@ -98,16 +101,14 @@ Parameters if using GAUSSIAN:
* `<mu_offset>`: shitfs mean value by `<mu_offset> `(X-offset)
* `<sigma_factor>`: standard deviation adjustement factor.
!!! note
TODO: adding an option to be able to furnish to counter a
user-defined array for tests on a deterministic curve.
Parameters if using XCALIBU:
* `<xcalibu_file>`: name of the file to use. File must be Beacon-accessible (ex:
in `...local/beamline_configuration/`)
### Examples
`sim_ct_1` counter is configured to generate a gaussian curve:
`sim_ct_gauss` counter is configured to generate a gaussian curve:
* centered in 0 (mu = 0.0)
* with a standard deviation (fwhm = ~2.35 * sigma) of 1 (sigma_factor = 1.0)
......@@ -118,74 +119,115 @@ NB: the real height depends also on the sigma value.
```yaml
-
name: sim_ct_1
name: sim_ct_gauss
plugin: bliss
class: SimulationCounter
distribution: GAUSSIAN
mu_offset: 0.0
sigma_factor: 1.0
height_factor: 100.0
noise_factor: 0.0
distributions:
- type: GAUSSIAN
mu_offset: 0.0
sigma_factor: 1.0
height_factor: 100.0
noise_factor: 0.0
```
`sim_ct_2` counter is configured to generate a noisy gaussian curve:
`sim_ct_gauss_noise` counter is configured to generate a noisy gaussian curve:
* centered in -1 (mu = -1.0)
* with a standard deviation of 0.4 (sigma_factor = 0.4) (narrower than sim_ct_1's curve)
* with a standard deviation of 0.4 (sigma_factor = 0.4) (narrower than sim_ct_gauss's curve)
* scaled in height by 100 ( height_factor: 100.0)
* with a noise factor of 0.1
```yaml
-
name: sim_ct_2
name: sim_ct_gauss_noise
plugin: bliss
class: SimulationCounter
distribution: GAUSSIAN
mu_offset: -1.0
sigma_factor: 0.4
height_factor: 100.0
noise_factor: 0.1
distributions:
- type: GAUSSIAN
mu_offset: -1.0
sigma_factor: 0.4
height_factor: 100.0
noise_factor: 0.1
```
`sim_ct_3` counter is configured to depict a constant value:
`sim_ct_flat_12` counter is configured to depict a constant value:
* of value 12.0 ( height_factor: 12.0)
* without noise (noise_factor: 0.0)
```yaml
-
name: sim_ct_3
name: sim_ct_flat_12
plugin: bliss
class: SimulationCounter
distribution: FLAT
height_factor: 12.0
noise_factor: 0.0
distributions:
- type: FLAT
height_factor: 12.0
noise_factor: 0.0
```
`sim_ct_4` counter is configured to depict a random value:
`sim_ct_rand_12` counter is configured to depict a random value:
* with a base line of 12.0 ( height_factor: 12.0)
* with positive noise (noise_factor: 1.01)
```yaml
-
name: sim_ct_4
name: sim_ct_rand_12
plugin: bliss
class: SimulationCounter
distributions:
- type: FLAT
height_factor: 12.0
noise_factor: 1.01
```
`sim_ct_double_gauss` counter is configured to depict a sum of two gaussian
distributions.
```yaml
-
name: sim_ct_double_gauss
plugin: bliss
class: SimulationCounter
distribution: FLAT
height_factor: 12.0
noise_factor: 1.01
distributions:
- type: GAUSSIAN
mu_offset: 1.0
sigma_factor: 1.0
height_factor: 1.0
noise_factor: 0.0
- type: GAUSSIAN
mu_offset: -8.0
sigma_factor: 1.0
height_factor: 2.0
noise_factor: 0.0
```
`sim_ct_xcalibu` counter is configured to depict a pre-defined distribution
defined as a xcalibu calibration from `gauss.calib` file (located in beacon
configuration directory.
```yaml
-
name: sim_ct_xcalibu
plugin: bliss
class: SimulationCounter
distributions:
- type: XCALIBU
xcalibu_file: "gauss.calib"
```
## MCA
To create a simulation MCA, just use `SimulatedMCA` class:
name: simul_mca
module: mca
class: SimulatedMCA
plugin: bliss
```yaml
name: simul_mca
module: mca
class: SimulatedMCA
plugin: bliss
```
## Lima Device
......
......@@ -37,3 +37,4 @@ tabulate
tango >= 9.3.2
tmux >= 2.7
treelib
xcalibu
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import pytest
import os