Commit f70463c7 authored by Henri Payno's avatar Henri Payno
Browse files

clear process registration and h5 file link mecanism

* process registration was used to keep trace of each processes and dump results. This will be done at another level in the future (probably by ewosk)
   * remove `Process.register_process` function (and call to it)
   * remove `_NexusDatasetDef` class which was used to define which field must be saved for a process
* remove saving_point widget: was using the regitered processes and needed for large number of spectrum. This is not the use case today and if we want to have such a mecanism it will be done another way in the future.
* remove `write_process` parameter from `XASWriter.dump_xas`
* XASObject:
   * remove `keep_process_flow` from constructor
   * remove `link_to_h5`, `has_linked_file`, `get_next_processing_index`, `register_processing`, `get_process_flow`, `clean_process_flow` and `copy_process_flow_to` functions
   * `to_dict`: remove `with_process_details` parameter and link mecanism
* update unit test
parent ebc7e551
Pipeline #81004 passed with stages
in 8 minutes and 38 seconds
......@@ -310,11 +310,6 @@ class AnalysisThread(threading.Thread):
legend="mean noise_savgol",
color="red",
)
#
# # print("Raw data average noise: {:.5E}".format(numpy.mean(spectrum.noise_savgol)))
# # print("edge step {:.3f}".format(spectrum.edge_step))
# # print("normalized noise {:.5E}".format(spectrum.norm_noise_savgol))
#
def main():
......
......@@ -5,7 +5,7 @@ from typing import Optional, Union
import h5py
from silx.io.url import DataUrl
from est.io import read_xas, write_xas, get_xasproc
from est.io import read_xas, write_xas
from est.core.types import XASObject
from est.core.types import dimensions as dimensions_mod
from est.units import ur
......@@ -175,14 +175,12 @@ class XASWriter:
if "_output_file_setting" in properties:
self._output_file = properties["_output_file_setting"]
def dump_xas(self, xas_obj, write_process=True):
def dump_xas(self, xas_obj):
"""
write the XASObject into a hdf5 file.
:param xas_obj: object to be stored
:type: Union[:class:`.XASObject`,dict]
:param bool write_process: if True then store the process flow in the
same file.
"""
if isinstance(xas_obj, dict):
_xas_obj = XASObject.from_dict(xas_obj)
......@@ -205,14 +203,4 @@ class XASWriter:
entry=_xas_obj.entry,
)
if write_process is True and xas_obj._keep_process_flow:
if len(get_xasproc(self._output_file, entry=_xas_obj.entry)) > 0:
_logger.warning(
"removing previous process registred. They are no " "more accurate"
)
_xas_obj.clean_process_flow()
# write process flow
_xas_obj.copy_process_flow_to(self._output_file)
__call__ = dump_xas
......@@ -50,7 +50,6 @@ class EnergyROIProcess(
self._apply_roi(_xas_obj, energy_roi.get("minE"), energy_roi.get("maxE"))
self.progress = 100.0
# self.register_process(_xas_obj, data_keys=("Mu", "energy"))
self.outputs.xas_obj = _xas_obj
def _apply_roi(self, xas_obj, emin, emax):
......
......@@ -2,8 +2,6 @@
from est.core.types import Spectrum, XASObject
from est.core.process.process import Process
# from est.core.process.process import _NexusDatasetDef
from larch.xafs.autobk import autobk
from larch.symboltable import Group
import logging
......@@ -122,7 +120,6 @@ class Larch_autobk(
self.progress = 0
self._pool_process(xas_obj=_xas_obj)
self.progress = 100
# self.register_process(_xas_obj, data_keys=(_NexusDatasetDef("bkg"),))
self.outputs.xas_obj = _xas_obj
def _pool_process(self, xas_obj):
......
......@@ -2,8 +2,6 @@
from est.core.types import Spectrum, XASObject
from est.core.process.process import Process
# from est.core.process.process import _NexusDatasetDef
from larch.xafs.mback import mback
from larch.symboltable import Group
import logging
......@@ -105,10 +103,6 @@ class Larch_mback(
self.progress = 0.0
self._pool_process(xas_obj=_xas_obj)
self.progress = 100.0
# self.register_process(
# _xas_obj,
# data_keys=(_NexusDatasetDef("normalized_mu"),),
# )
self.outputs.xas_obj = _xas_obj
def _pool_process(self, xas_obj):
......
......@@ -2,8 +2,6 @@
from est.core.types import Spectrum, XASObject
from est.core.process.process import Process
# from est.core.process.process import _NexusDatasetDef
from larch.xafs.mback import mback_norm
from larch.symboltable import Group
import logging
......@@ -109,13 +107,6 @@ class Larch_mback_norm(
self.progress = 0.0
self._pool_process(xas_obj=_xas_obj)
self.progress = 100.0
# data_keys = [
# _NexusDatasetDef("mback_mu"),
# _NexusDatasetDef("norm_mback"),
# ]
# if _xas_obj.n_spectrum > 0 and hasattr(_xas_obj.spectra[0], "edge_step"):
# data_keys += ["norm"]
# self.register_process(_xas_obj, data_keys=data_keys)
self.outputs.xas_obj = _xas_obj
def _pool_process(self, xas_obj):
......
......@@ -2,9 +2,6 @@
from est.core.types import Spectrum, XASObject
from est.core.process.process import Process
# from est.core.process.process import _NexusDatasetDef
# from est.core.utils.symbol import MU_CHAR
from larch.xafs.pre_edge import pre_edge
import logging
from larch.symboltable import Group
......@@ -119,27 +116,6 @@ class Larch_pre_edge(
self.progress = 0.0
self._pool_process(xas_obj=_xas_obj)
self.progress = 100.0
# self.register_process(
# _xas_obj,
# data_keys=(
# _NexusDatasetDef(
# "flatten_mu", units="{}(E)".format(MU_CHAR), units_latex=r"\mu(E)"
# ),
# _NexusDatasetDef("mu"),
# _NexusDatasetDef("normalized_mu"),
# _NexusDatasetDef("post_edge"),
# _NexusDatasetDef("pre_edge"),
# _NexusDatasetDef("edge_step"),
# _NexusDatasetDef("e0", "eV"),
# _NexusDatasetDef(
# "Mu", units="{}(E)".format(MU_CHAR), units_latex=r"\mu(E)"
# ),
# _NexusDatasetDef("energy", "eV"),
# _NexusDatasetDef("I0"),
# _NexusDatasetDef("I1"),
# _NexusDatasetDef("I2"),
# ),
# )
self.outputs.xas_obj = _xas_obj
def _pool_process(self, xas_obj):
......
......@@ -2,11 +2,7 @@
from est.core.types import Spectrum, XASObject
from est.core.process.process import Process
# from est.core.process.process import _NexusDatasetDef
from larch.xafs.xafsft import xftf
# from est.core.utils.symbol import ANGSTROM_CHAR
from larch.symboltable import Group
import logging
import numpy
......@@ -141,50 +137,6 @@ class Larch_xftf(
self._advancement.startProcess()
self._pool_process(xas_obj=_xas_obj)
self._advancement.endProcess()
# self.register_process(
# _xas_obj,
# data_keys=(
# _NexusDatasetDef(
# "chir_im",
# units="{}^(-3)".format(ANGSTROM_CHAR),
# units_latex=r"\mathring{A}^{-3}",
# ),
# _NexusDatasetDef("chir_re"),
# _NexusDatasetDef(
# "chir_mag",
# units="{}^(-3)".format(ANGSTROM_CHAR),
# units_latex=r"\mathring{A}^{-3}",
# ),
# _NexusDatasetDef(
# "masked_chir_mag",
# units="{}".format(ANGSTROM_CHAR),
# units_latex=r"\mathring{A}",
# ),
# _NexusDatasetDef(
# "r", units="{}".format(ANGSTROM_CHAR), units_latex=r"\mathring{A}"
# ),
# _NexusDatasetDef(
# "masked_r",
# units="{}".format(ANGSTROM_CHAR),
# units_latex=r"\mathring{A}",
# ),
# _NexusDatasetDef(
# "k",
# units="{}^(-1)".format(ANGSTROM_CHAR),
# units_latex=r"\mathring{A}^{-1}",
# ),
# _NexusDatasetDef(
# "masked_k",
# units="{}^(-1)".format(ANGSTROM_CHAR),
# units_latex=r"\mathring{A}^{-1}",
# ),
# _NexusDatasetDef(
# "masked_chi_weighted_k",
# units="{}^(-2)".format(ANGSTROM_CHAR),
# units_latex=r"\mathring{A}^{-2}",
# ),
# ),
# )
self.outputs.xas_obj = _xas_obj
def _pool_process(self, xas_obj):
......
from est.core.types import XASObject
from .process import Process
# from .process import _NexusDatasetDef
import scipy.signal
import logging
import numpy
......@@ -114,16 +112,6 @@ class NoiseProcess(
self.progress = 0.0
self._pool_process(xas_obj=_xas_obj)
self.progress = 100.0
# self.register_process(
# _xas_obj,
# data_keys=(
# _NexusDatasetDef("norm_noise_savgol"),
# _NexusDatasetDef("noise_savgol", units="raw data noise"),
# _NexusDatasetDef("noise_savgol_energy", units="eV"),
# _NexusDatasetDef("edge_step"),
# ),
# )
self.outputs.xas_obj = _xas_obj
def _pool_process(self, xas_obj):
......
"""module for process base class"""
import logging
from typing import Iterable
from ewokscore.taskwithprogress import TaskWithProgress as Task
from est.core.types import XASObject
from .progress import Progress
from ..utils import extract_properties_from_dict
......@@ -13,66 +10,6 @@ from ... import __version__
_logger = logging.getLogger(__name__)
class _NexusDatasetDef:
"""Util function to define a Nexus plot"""
def __init__(self, name: str, units=None, units_latex=None):
self.__name = None
self.__units = None
self.__units_latex = None
assert isinstance(name, str)
self.name = name
self.units = units
self.units_latex = units_latex
@property
def name(self):
return self.__name
@name.setter
def name(self, name):
if not isinstance(name, str):
raise TypeError("name should be an instance of str")
else:
self.__name = name
@property
def units(self):
return self.__units
@units.setter
def units(self, units):
if not isinstance(units, (str, type(None))):
raise TypeError("units should be an instance of str")
else:
self.__units = units
@property
def units_latex(self):
return self.__units_latex
@units_latex.setter
def units_latex(self, units):
if not isinstance(units, (str, type(None))):
raise TypeError("units should be an instance of str")
else:
self.__units_latex = units
@property
def attrs(self):
attrs = {}
if self.units is not None:
attrs.update({"units": self.units})
if self.units_latex is not None:
attrs.update({"units_latex": self.units_latex})
return attrs
def __str__(self):
return self.name
class Process(Task, register=False):
def __init__(self, **kwargs):
super().__init__(**kwargs)
......@@ -165,119 +102,6 @@ class Process(Task, register=False):
self._settings = configuration
def register_process(self, xas_obj: XASObject, data_keys: Iterable):
"""
Store the current process in the linked h5 file if any,
output data stored will be the one defined by the data_keys
:param xas_obj: object for which we want to save the treatment
:type: :class:`.XASObject`
:param tuple data_keys: keys of the id to save
:param plots:
"""
if xas_obj.has_linked_file():
_data = {}
for data_info in data_keys:
key = str(
data_info
) # data_info can be a string or an instance of _NexusDatasetDef
relative_to = None
use = "map_to"
# the followinf condition is used to know against wich parameter the array must be associated
# to know what the output of map_to must looks like before dumping it.
# this is a complex condition that has no real reason (exept it was done on the proto).
# TODO: remove it and simplify this section
if key in (
"flat",
"fpp",
"f2",
"dmude",
"norm",
"norm_area",
"post_edge",
"pre_edge",
"bkg",
"energy",
"mback_mu",
"norm_mback",
"Energy",
"Mu",
"mu",
"normalized_energy",
"normalized_mu",
"flatten_mu",
"mu_ref",
"I0",
"I1",
"I2",
):
relative_to = "energy"
use = "map_to"
elif key in (
"noise_savgol",
"noise_savgol_energy",
):
relative_to = "noise_savgol_energy"
use = "map_to"
elif key in ("chir_re", "chir_im", "chir_mag", "r"):
relative_to = "r"
use = "map_to"
elif key in ("masked_chir_mag", "masked_r"):
relative_to = "masked_r"
use = "map_to"
elif key in ("ft.radius", "ft.intensity", "ft.imaginary"):
relative_to = "radius"
use = "_list_res_ft"
elif key in ("masked_chi_weighted_k", "masked_k"):
relative_to = "masked_k"
use = "map_to"
elif key in (
"chi",
"k",
"chi_weighted_k",
):
relative_to = "k"
use = "map_to"
if use == "map_to":
if key == "norm_area":
continue
_data[key] = xas_obj.spectra.map_to(
data_info=key,
relative_to=relative_to,
)
# if we can display the result as a numpy.array 3d
try:
_data[key] = xas_obj.spectra.map_to(
data_info=key,
relative_to=relative_to,
)
except KeyError:
mess = "%s: unable to store %s, parameter not found" % (
self.name,
key,
)
_logger.error(mess)
elif use == "_list_res_ft":
# for ft we are force to have a dictionary because it
# processing can end with a nan or with a numpy array.
# combining both would make us end up with a ndarray of
# object type
key_ = key.split(".")[-1]
res = {}
for spectrum in xas_obj.spectra:
x, y = spectrum.x, spectrum.y
res["_".join((str(x), str(y)))] = getattr(spectrum.ft, key_)
_data[key] = res
else:
raise ValueError()
if isinstance(data_info, _NexusDatasetDef):
for aname, avalue in data_info.attrs.items():
_data[(key, aname)] = avalue
xas_obj.register_processing(process=self, results=_data)
def addCallback(self, callback):
self._callbacks.append(callback)
......
......@@ -135,13 +135,6 @@ class PyMca_exafs(
self._pool_process(xas_obj=_xas_obj)
self.progress = 100.0
self._advancement.endProcess()
# self.register_process(
# _xas_obj,
# data_keys=(
# _NexusDatasetDef("k"),
# _NexusDatasetDef("post_edge"),
# ),
# )
self.outputs.xas_obj = _xas_obj
def _pool_process(self, xas_obj):
......
......@@ -4,8 +4,6 @@ import logging
import numpy
from PyMca5.PyMcaPhysics.xas.XASClass import XASClass
from est.core.process.process import Process
# from est.core.process.process import _NexusDatasetDef
from est.core.types import XASObject, Spectrum
_logger = logging.getLogger(__name__)
......@@ -153,14 +151,6 @@ class PyMca_ft(
assert hasattr(_xas_obj.spectra.data.flat[0], "ft")
assert hasattr(_xas_obj.spectra.data.flat[0].ft, "intensity")
assert hasattr(_xas_obj.spectra.data.flat[0].ft, "imaginary")
# self.register_process(
# _xas_obj,
# data_keys=(
# _NexusDatasetDef("ft.radius"),
# _NexusDatasetDef("ft.intensity"),
# _NexusDatasetDef("ft.imaginary"),
# ),
# )
self.outputs.xas_obj = _xas_obj
def _pool_process(self, xas_obj):
......
......@@ -157,7 +157,6 @@ class PyMca_k_weight(
self.progress = 0.0
self._pool_process(xas_obj=_xas_obj)
self.progress = 100.0
# self.register_process(xas_obj=_xas_obj, data_keys=tuple())
self.outputs.xas_obj = _xas_obj
def _pool_process(self, xas_obj):
......
......@@ -3,8 +3,6 @@
import logging
from PyMca5.PyMcaPhysics.xas.XASClass import XASClass
from est.core.process.process import Process
# from est.core.process.process import _NexusDatasetDef
from est.core.types import Spectrum, XASObject
_logger = logging.getLogger(__name__)
......@@ -124,14 +122,7 @@ class PyMca_normalization(
self.progress = 100.0
if xas_obj.normalized_energy is None:
raise ValueError("Fail to compute normalize energy")
# self.register_process(
# xas_obj,
# data_keys=(
# _NexusDatasetDef("normalized_energy"),
# _NexusDatasetDef("normalized_mu"),
# _NexusDatasetDef("post_edge"),
# ),
# )
self.outputs.xas_obj = xas_obj
def _pool_process(self, xas_obj):
......
import copy
import logging
import tempfile
import os
import shutil
from typing import Optional, Union
import pint
import numpy
import h5py
from silx.io.url import DataUrl
from silx.io.utils import h5py_read_dataset
from silx.io.h5py_utils import File as HDF5File
import est.io
from est.io.utils import get_data
......@@ -41,10 +35,6 @@ class XASObject:
:param int dim1: first dimension of the spectra
:param int dim2: second dimension of the spectra
:param str name: name of the object. Will be used for the hdf5 entry
:param bool keep_process_flow: if True then will keep the trace of the set
of process applied to the XASObject into a
hdf5 file.
This is also used for the
:param spectra_url: path to the spectra data if any. Used when serializing
the XASObject. Won't read it from it.
:type: Union[None,str]
......@@ -66,7 +56,6 @@ class XASObject:
dim1: Union[int, None] = None,
dim2: Union[int, None] = None,
name: str = "scan1",
keep_process_flow: bool = False,
spectra_url: Union[DataUrl, None] = None,
spectra_url_dims: Optional[dimensions_mod.DimensionsType] = None,
energy_url: Union[DataUrl, None] = None,
......@@ -90,20 +79,12 @@ class XASObject:
self.__spectra = Spectra(energy=energy)
self.__dim1 = None
self.__dim2 = None
self.__processing_index = 0
self.__h5_file = None
self.__entry_name = name
self.__spectra_url = spectra_url
self.__spectra_url_dims = spectra_url_dims
self.__energy_url = energy_url
self.spectra = (energy, spectra, dim1, dim2)
self.configuration = configuration
self._keep_process_flow = keep_process_flow
if keep_process_flow is True:
self.__h5_file = os.path.join(
tempfile.mkdtemp(), "_".join((name, "_flow.h5"))
)
self.link_to_h5(self.__h5_file)
def attach_3d_array(self, attr_name, attr_value):
"""
......@@ -212,42 +193,7 @@ class XASObject:
assert configuration is None or isinstance(configuration, dict)
self.__configuration = configuration or {}
def _create_saving_pt(self):
if not self.has_linked_file():
raise ValueError(
"there is not where to create a saving pt"
"(no h5 file linked to the XASObject)"
)
else:
def get_list_spectra():
res = {}
for i_spectrum, spectrum in enumerate(self.spectra.data.flat):
res[i_spectrum] = spectrum.to_dict()
return res
from est.io import write_spectrum_saving_pt
# save spectra
entry = "/".join((self.entry, "est_saving_pt", "spectra"))
for i_spectrum, spectrum in get_list_spectra().items():
spectrum = {k: v for k, v in spectrum.items() if not k.startswith("_")}
path = "/".join((entry, str(i_spectrum)))
write_spectrum_saving_pt(
h5_file=self.linked_h5_file,
entry=path,
obj=spectrum,
overwrite=True,
)
# save channel
with HDF5File(self.linked_h5_file, "a") as h5f:
entry = "/".join((self.entry, "est_saving_pt", "channel"))
if entry in h5f:
del h5f[entry]