Commit 7bb42645 authored by payno's avatar payno
Browse files

Add unit system using pint

# Conflicts:
#	est/core/process/
#	est/core/process/pymca/test/
#	est/core/process/test/
#	est/core/test/
#	est/io/
#	est/

# Conflicts:
#	est/core/
#	requirements.txt
parent 58e541a2
......@@ -33,6 +33,7 @@ from import read_xas, write_xas, get_xasproc
from est.core.types import XASObject
from import DataUrl
from est.core.types import Dim
from est.units import ur, convert_to
import h5py
import logging
......@@ -45,7 +46,8 @@ DEFAULT_CHANNEL_PATH = "/data/NXdata/Channel"
DEFAULT_CONF_PATH = "/configuration"
def read(spectra_url, channel_url, config_url=None, dimensions=None):
def read(spectra_url, channel_url, config_url=None, dimensions=None,
:param DataUrl spectra_url: data url to the spectra
......@@ -68,10 +70,11 @@ def read(spectra_url, channel_url, config_url=None, dimensions=None):
def read_frm_file(file_path):
def read_frm_file(file_path, energy_unit=ur.eV):
:param str file_path: path to the file containing the spectra. Must ba a
......@@ -80,24 +83,26 @@ def read_frm_file(file_path):
:rtype: XASObject
reader = XASReader()
return reader.read_from_file(file_path=file_path)
return reader.read_from_file(file_path=file_path, energy=energy_unit)
class XASReader(object):
"""Simple reader of a xas file"""
def read_frm_url(spectra_url, channel_url, dimensions=None, config_url=None):
def read_frm_url(spectra_url, channel_url, dimensions=None,
config_url=None, energy_unit=ur.eV):
sp, en, conf = read_xas(
return XASObject(spectra=sp, energy=en, configuration=conf)
def read_from_file(file_path):
def read_from_file(file_path, energy_unit=ur.eV):
:param str file_path:
......@@ -107,11 +112,13 @@ class XASReader(object):
return XASReader.read_frm_url(
spectra_url=DataUrl(file_path=file_path, scheme="PyMca"),
channel_url=DataUrl(file_path=file_path, scheme="PyMca"),
elif file_path.endswith(".xmu"):
return XASReader.read_frm_url(
spectra_url=DataUrl(file_path=file_path, scheme="larch"),
channel_url=DataUrl(file_path=file_path, scheme="larch"),
elif h5py.is_hdf5(file_path):
return XASReader.read_frm_url(
......@@ -124,6 +131,7 @@ class XASReader(object):
file_path=file_path, scheme="silx", data_path="configuration"
raise ValueError("file type not managed, unable to load")
......@@ -32,6 +32,7 @@ __date__ = "07/08/2019"
from .progress import Progress
from est.core.types import XASObject
import logging
import pint
_logger = logging.getLogger(__name__)
......@@ -83,7 +84,7 @@ class Process(object):
if _xas_obj.n_spectrum > 0:
for spectrum in _xas_obj.spectra:
assert isinstance(spectrum, Spectrum)
assert isinstance(, numpy.ndarray)
assert isinstance(, (numpy.ndarray, pint.Quantity))
assert isinstance(, numpy.ndarray)
assert isinstance(_xas_obj, XASObject)
return _xas_obj
......@@ -92,7 +92,8 @@ class TestNxWriting(unittest.TestCase):
self.assertTrue("absorbed_beam" in hdf["scan1"].keys())
self.assertTrue("monochromator" in hdf["scan1"].keys())
loaded_xas_obj = XASObject.from_file(self.h5_file, configuration_path=None)
loaded_xas_obj = XASObject.from_file(self.h5_file,
loaded_xas_obj.absorbed_beam(), self.xas_obj.absorbed_beam()
......@@ -31,6 +31,7 @@ from est.core.types import XASObject, Spectrum
from .process import Process
import logging
import numpy
import pint
_logger = logging.getLogger(__name__)
......@@ -120,9 +121,8 @@ class ROIProcess(Process):
# TODO: create a spectra object which deal automatically with
# the following. Should be part of spectra object
volumes = {}
for key in xas_obj.spectra_keys():
if isinstance(xas_obj.spectra[0][key], numpy.ndarray):
if isinstance(xas_obj.spectra[0][key], (numpy.ndarray, pint.Quantity)):
# there is no processing for the _larch_grp_members case
if key == "_larch_grp_members":
......@@ -178,6 +178,7 @@ class ROIProcess(Process):
assert xas_obj is not None
_xas_obj = self.getXasObject(xas_obj=xas_obj)
# existing roi is priority. This is the case if called from pushworkflow
# for example.
if self._roi is not None:
......@@ -32,6 +32,7 @@ from est.core.types import Dim
from import XASReader
from import DataUrl
from est.core.types import XASObject
from est.units import ur
import numpy
import os
import tempfile
......@@ -126,7 +127,8 @@ class TestSpectraDimensions(unittest.TestCase):
self.assertTrue(isinstance(xas_obj, XASObject))
self.assertTrue(xas_obj.n_spectrum == x_dim * y_dim)
numpy.testing.assert_array_equal(xas_obj.spectra[1].mu, spectra[:, 0, 1])
numpy.testing.assert_array_equal(xas_obj.spectra[2].energy, channel)
(channel * ur.eV).m)
def suite():
......@@ -37,7 +37,9 @@ from est.core.types import Spectrum, XASObject, Dim
from est.core.utils import spectra as spectra_utils
from import read as read_xas
from import DataUrl
from est.units import ur
import json
import pint
......@@ -67,7 +69,7 @@ class TestSpectrum(unittest.TestCase):
energy = numpy.arange(10, 20)
mu = numpy.arange(10)
spectrum = Spectrum(energy=energy, mu=mu)
numpy.testing.assert_array_equal(, energy)
numpy.testing.assert_array_equal(, (energy * ur.eV).m)
numpy.testing.assert_array_equal(, mu)
mu_2 = numpy.arange(30, 40)
spectrum["Mu"] = mu_2
......@@ -184,7 +186,7 @@ class TestXASObjectSerialization(unittest.TestCase):
# make sure we find a comparable xas object from it
xas_obj_2 = XASObject.from_dict(dict_xas_obj)
self.assertEqual(xas_obj, xas_obj_2)
# simple test without the process_details
......@@ -204,6 +206,7 @@ class TestXASObjectSerialization(unittest.TestCase):
self.assertTrue(isinstance(, pint.Quantity))
# if no h5 file defined, should fail to copy it to a dictionary
with self.assertRaises(ValueError):
......@@ -215,8 +218,10 @@ class TestXASObjectSerialization(unittest.TestCase):
# make sure we find a comparable xas object from it
xas_obj_2 = XASObject.from_dict(dict_xas_obj)
self.assertTrue(isinstance(, pint.Quantity))
self.assertEqual(xas_obj, xas_obj_2)
# simple test without the process_details
......@@ -39,7 +39,8 @@ import os
import shutil
from silx.utils.enum import Enum
from import h5py_read_dataset
from est.units import units, ur, convert_to as convert_unit_to
import pint
_logger = logging.getLogger(__name__)
......@@ -178,6 +179,8 @@ class XASObject(object):
def spectra(self, energy_spectra):
energy, spectra, dim1, dim2 = energy_spectra
if isinstance(energy, numpy.ndarray):
energy = energy * ur.eV
if spectra is None:
self.__spectra = []
self.__energy = energy
......@@ -210,7 +213,10 @@ class XASObject(object):
assert isinstance(spectrum, Spectrum)
self._updateSpectraIndexes() = energy
if energy is not None: = convert_unit_to(energy, ur.eV)
else: = None
def _setSpectra(self, spectra):
self.__spectra = spectra
......@@ -260,7 +266,12 @@ class XASObject(object):
def energy(self, energy):
self.__energy = energy
if isinstance(energy, numpy.ndarray):
energy = energy * ur.eV
if energy is None:
self.__energy = None
self.__energy = convert_unit_to(energy, ur.eV)
if len(self.__spectra) > 0:
if len(self.__spectra[0].energy) != len(energy):
_logger.warning("spectra and energy have incoherent dimension")
......@@ -306,7 +317,7 @@ class XASObject(object):
entry = "/".join((self.entry, "est_saving_pt", "channel"))
if entry in h5f:
del h5f[entry]
h5f[entry] = "None" if is None else
h5f[entry] = "None" if is None else convert_unit_to(, ur.eV).m
def to_dict(self, with_process_details=True):
"""convert the XAS object to a dict
......@@ -343,7 +354,6 @@ class XASObject(object):
return DataUrl(
file_path=self.linked_h5_file, data_path=data_path, scheme="silx"
spectra_ = get_spectra_and_processing()
res = {
......@@ -408,6 +418,8 @@ class XASObject(object):
except Exception:
raise KeyError("fail to access to", key)
if isinstance(value, pint.Quantity):
value = convert_unit_to(value, ur.eV).m
if _has_larch and isinstance(value, larch.symboltable.Group):"pass larch details, not managed for now")
......@@ -438,7 +450,7 @@ class XASObject(object):
if isinstance(spectra, str):
spectra = load_data(data_url=DataUrl(path=spectra), name="spectra")
# if come from a list of spectrum
elif not isinstance(spectra, numpy.ndarray):
elif not isinstance(spectra, (numpy.ndarray, pint.Quantity)):
new_spectra = []
for spectrum in spectra:
assert isinstance(spectrum, dict)
......@@ -542,7 +554,8 @@ class XASObject(object):
def __eq__(self, other):
return (
isinstance(other, XASObject)
and numpy.array_equal(, # noqa
and (( is None and is None) or # noqa
numpy.array_equal(, # noqa
and self.dim1 == other.dim1 # noqa
and self.dim2 == other.dim2 # noqa
and self.configuration == other.configuration # noqa
......@@ -762,7 +775,9 @@ class Spectrum(_Spectrum_Base):
def __init__(self, energy=None, mu=None, x=None, y=None):
if energy is not None:
assert isinstance(energy, numpy.ndarray)
assert isinstance(energy, (numpy.ndarray, pint.Quantity))
if isinstance(energy, numpy.ndarray):
energy = energy * ur.eV
self.__x = x
self.__y = y
......@@ -801,8 +816,9 @@ class Spectrum(_Spectrum_Base):
return self.__energy
def energy(self, energy):
assert isinstance(energy, numpy.ndarray) or energy is None
assert isinstance(energy, (numpy.ndarray, pint.Quantity)) or energy is None
self.__energy = energy
......@@ -863,7 +879,7 @@ class Spectrum(_Spectrum_Base):
def normalized_energy(self, energy):
assert isinstance(energy, numpy.ndarray) or energy is None
assert isinstance(energy, (numpy.ndarray, pint.Quantity)) or energy is None
self.__normalized_energy = energy
......@@ -994,7 +1010,7 @@ class Spectrum(_Spectrum_Base):
self._X_POS_KEY: self.x,
self._Y_POS_KEY: self.y,
self._ENERGY_KEY: convert_unit_to(, ur.eV).m,
self._FT_KEY: self.ft.to_dict(),
if self.normalized_mu is None
......@@ -35,6 +35,7 @@ from import utils
from import dicttoh5, h5todict
from import DataUrl
from silx.utils.enum import Enum
from est.units import ur, convert_to
from import read_spectrum as pymca_read_spectrum
......@@ -148,7 +149,8 @@ def load_data(data_url, name):
_logger.warning("invalid url for", name, ", will not load it")
def read_xas(spectra_url, channel_url, dimensions=None, config_url=None):
def read_xas(spectra_url, channel_url, dimensions=None, config_url=None,
Read the given spectra url and the config url if any
......@@ -218,7 +220,7 @@ def read_xas(spectra_url, channel_url, dimensions=None, config_url=None):
raise ValueError(err)
return (spectra, energy, configuration)
return (spectra, energy * energy_unit, configuration)
def write_xas_proc(
......@@ -352,6 +354,7 @@ def write_xas(
nx_entry.attrs["NX_class"] = "NXentry"
# store energy
<<<<<<< HEAD
nx_monochromator = nx_entry.require_group("monochromator")
nx_monochromator.attrs["NX_class"] = "NXmonochromator"
if overwrite and "energy" in nx_monochromator:
......@@ -359,6 +362,16 @@ def write_xas(
nx_monochromator["energy"] = energy
nx_monochromator["energy"].attrs["interpretation"] = "spectrum"
nx_monochromator["energy"].attrs["NX_class"] = "NXdata"
nx_monochromator = nx_entry.require_group('monochromator')
nx_monochromator.attrs['NX_class'] = "NXmonochromator"
if overwrite and 'energy' in nx_monochromator:
del nx_monochromator['energy']
nx_monochromator['energy'] = convert_to(energy, ur.eV).m
nx_monochromator['energy'].attrs['interpretation'] = 'spectrum'
nx_monochromator['energy'].attrs['NX_class'] = "NXdata"
nx_monochromator['energy'].attrs['unit'] = "eV"
>>>>>>> 91be990... Add unit system using pint
# store absorbed beam
nx_absorbed_beam = nx_entry.require_group("absorbed_beam")
......@@ -28,9 +28,10 @@ __license__ = "MIT"
__date__ = "06/11/2019"
from PyMca5.PyMcaIO import specfilewrapper as specfile
from est.units import ur
def read_spectrum(spec_file):
def read_spectrum(spec_file, energy_unit=ur.eV):
:param spec_file: path to the spec file containing the spectra definition
......@@ -66,4 +67,6 @@ def read_spectrum(spec_file):
energy = data[0, :]
mu = data[1, :]
if energy is not None:
energy = energy * energy_unit
return energy, mu
......@@ -39,6 +39,7 @@ def configuration(parent_package="", top_path=None):
return config
# -*- coding: utf-8 -*-
# This file is part of the bliss project
# Copyright (c) 2015-2020 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""Physical units for engineering and science
Implementation based on :mod:`pint`.
>>> from bliss.physics.units import ur
>>> # use ur as a pint UnitRegistry
>>> mass = 0.1*
>>> E = mass * ur.c**2
>>> print( )
8987551.78737 kilojoule
>>> # decorate your methods to ensure proper units
>>> @ur.units(mass='kg', result='J')
... def energy(mass):
... return mass * ur.c**2
>>> # passing a Quantity will give you back a Quantity
>>> print( energy(mass) )
8987551787.37 joule
>>> # passing a float will assume it is in the proper (kg)
>>> # and will give you back a float in the corresponding unit (J)
>>> print( energy(0.1e-6) )
Advanced usage
Use another UnitRegistry
Bliss unit system creates a default UnitRegistry (ur).
If you want to interact with another library which also uses pint it is
important that both libraries use the same UnitRegistry.
This library allows you to change the active UnitRegistry. You should do
this as soon as possible in the code of your application::
import pint
ureg = UnitRegistry()
from bliss.physics import units
units.ur = ureg
from functools import wraps
from inspect import getfullargspec
import pint
__all__ = ["ur", "units"]
#: unit registry
ur = pint.UnitRegistry()
def is_quantity(arg):
"""Return whether the given argument is a quantity."""
return isinstance(arg, ur.Quantity)
def to_unit(arg):
"""Permissively cast the given argument into a unit."""
return ur.Unit(arg) if arg else None
def values_to_units(dct):
"""Cast the values of the given dict into units"""
return {k: to_unit(v) for k, v in dct.items()}
def convert_to(arg, unit):
"""Permissively convert the given argument into the given unit.
The argument can either be a number or a quantity.
if not unit:
return arg
if arg is None:
return None
return if is_quantity(arg) else arg * unit
def units(**kwarg_units):
Use as a decorator to protect your function against unit errors.
Each keyword argument must be an argument of the function. The
value is the unit (string or Unit) in which your function argument
should be called with.
An extra argument *result* should provide the Unit for the expected
return value. And yes, you cannot have a function which has an
argument called *result* but that is just good naming!
Missing arguments will be ignored.
from bliss.physics.units import ur, units
@units(mass='kg', result=ur.J)
def energy(mass):
return mass * ur.c**2
When you call a decorated function, it will return a Quantity if the
*result* is a Unit and at least one of the arguments is a Quantity. If none
of the arguments is a Quantity the result is a float with a value in the
units specified by *result*
result_unit = to_unit(kwarg_units.pop("result", None))
kwarg_units = values_to_units(kwarg_units)
def decorator(func):
arg_spec = getfullargspec(func).args
if not set(arg_spec).issuperset(kwarg_units):
raise TypeError("units argument names differ from function argument names")
def wrapper(*args, **kwargs):
# Everything is a kwargs
kwargs.update(zip(arg_spec, args))
# Check for quantity-free use case
all_magnitude = all(
not is_quantity(value)
for key, value in kwargs.items()
if key in kwarg_units
# Kwargs conversion
kwargs = {
key: convert_to(value, kwarg_units.get(key))
for key, value in kwargs.items()
# Call the actual func
result = func(**kwargs)
if not result_unit:
return result
# Safety check
if not is_quantity(result):
raise TypeError("Function {!r} did not return a quantity".format(func))
# Convert the result and return magnitude or quantity
result = convert_to(result, result_unit)
return result.magnitude if all_magnitude else result
return wrapper
return decorator
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment