Newer
Older
# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2016-2017 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
__authors__ = ["H. Payno"]
__license__ = "MIT"
__date__ = "06/12/2019"
import logging
from datetime import datetime
from silx.io import utils
from silx.io.dictdump import dicttoh5, h5todict
from silx.io.url import DataUrl
payno
committed
from silx.utils.enum import Enum
payno
committed
from est.units import ur
from est.core.types.dim import Dim
from est.core.types import Spectra
from est.io.utils.spec import read_spectrum as read_spec_spectrum
from est.io.utils.larch import read_ascii as larch_read_ascii
except ImportError:
has_larch = False
else:
has_larch = True
_logger = logging.getLogger(__name__)
payno
committed
class InputType(Enum):
dat_spectrum = "*.dat" # contains one spectrum
hdf5_spectra = "*.h5" # contains several spectra
xmu_spectrum = "*.xmu" # contains one spectrum
csv_spectrum = "*.csv" # two columns, comma separated
payno
committed
def move_axis_to_standard(spectra, dimensions):
# make sure all dimensions are defined
for dim in Dim:
if dim not in dimensions:
err = "%s is not defined in the dimensions" % dim
raise ValueError(err)
# fit spectra according to dimension
src_axis = (
dimensions.index(Dim.DIM_2),
dimensions.index(Dim.DIM_1),
dimensions.index(Dim.DIM_0),
)
dst_axis = (0, 1, 2)
_logger.warning("move axis for spectra, from %s to %s" % (src_axis, dst_axis))
if isinstance(spectra, Spectra):
spectra.data = numpy.moveaxis(spectra.data, src_axis, dst_axis)
elif isinstance(spectra, numpy.ndarray):
spectra = numpy.moveaxis(spectra, src_axis, dst_axis)
return spectra
def load_data(data_url, name, dimensions, columns_names=None, energy_unit=ur.eV):
"""
Load a specific data from an url. Manage the different scheme (silx, fabio,
numpy, PyMca, xraylarch)
:param data_url: silx DataUrl with path to the data
:type: DataUrl
:param str name: name of the data we want to load. Should be in
('spectra', 'energy', 'configuration')
:param Union[None,dict] columns_names: name of the column to pick for .dat
files... Expect key 'mu' and
'energy' to be registered
:return: data loaded
:rtype: Union[None,dict,numpy.ndarray]
"""
if data_url is None:
return None
if dimensions is not None:
dimensions = tuple([Dim.from_value(dim) for dim in dimensions])
if data_url.scheme() in ("PyMca", "PyMca5", "Spec", "spec"):
def get_energy_col_name():
if columns_names is not None:
return columns_names["energy"]
if name == "energy" and data_url.data_path() is not None:
return data_url.data_path()
else:
return None
def get_absorption_col_name():
if columns_names is not None:
return columns_names["mu"]
if name == "spectra" and data_url.data_path() is not None:
return data_url.data_path()
else:
return None
energy, mu = read_spec_spectrum(
data_url.file_path(),
energy_col_name=get_energy_col_name(),
absorption_col_name=get_absorption_col_name(),
monitor_col_name=columns_names["monitor"] if columns_names else None,
scan_header_S=columns_names["scan_title"] if columns_names else None,
energy_unit=energy_unit,
)
if name == "spectra":
return mu.reshape(-1, 1, 1)
else:
if has_larch is False:
_logger.warning("Requires larch to load data from " "%s" % data_url.path())
return None
else:
energy, mu = larch_read_ascii(
xmu_file=data_url.file_path(), energy_unit=energy_unit
)
payno
committed
mu = numpy.ascontiguousarray(mu[:])
return mu.reshape(mu.shape[0], 1, 1)
else:
return energy
return move_axis_to_standard(numpy.load(data_url.file_path()), dimensions)
elif data_url.scheme() == "est":
assert name == "spectra"
spectra = []
# get all possible entries
entries = filter(
lambda x: isinstance(hdf5[x], h5py.Group)
and "est_saving_pt" in hdf5[x].keys(),
hdf5.keys(),
)
entries = list(entries)
if len(entries) == 0:
_logger.error(
"no spectra dataset found in the file", data_url.file_path()
)
return
if len(entries) > 1:
_logger.warning(
"several entry detected, only one will be loaded:", entries[0]
)
spectra_path = "/".join((entries[0], "est_saving_pt", "spectra"))
node_spectra = hdf5[spectra_path]
spectrum_indexes = list(node_spectra.keys())
spectrum_indexes = list(map(lambda x: int(x), spectrum_indexes))
spectrum_indexes.sort()
from est.core.types import Spectrum
for index in spectrum_indexes:
spectrum_path = "/".join((spectra_path, str(index)))
dict_ = h5todict(
h5file=data_url.file_path(), path=spectrum_path, asarray=False
)
spectrum = Spectrum().load_frm_dict(dict_)
spectra.append(spectrum)
return Spectra(energy=spectra[0].energy, spectra=spectra)
else:
if data_url.is_valid():
try:
except ValueError as e:
_logger.error(e)
if data.ndim == 1:
return data.reshape(data.shape[0], 1, 1)
elif data.ndim == 3:
return move_axis_to_standard(data, dimensions=dimensions)
else:
payno
committed
_logger.warning(
"invalid url for {}: {} will not load it".format(name, data_url)
)
spectra_url,
channel_url,
dimensions=None,
config_url=None,
energy_unit=ur.eV,
columns_names=None,
"""
Read the given spectra url and the config url if any
:param Union[DataUrl, str] spectra_url:
:param DataUrl config_url:
:param dimensions: dimensions of the spectra. If None will be set to the
default (channel, Y, x)
:param Union[None,tuple]: name of the column to pick for .dat files...
:return: spectra, energy, configuration
"""
def get_url(original_url, name):
url_ = original_url
if type(url_) is str:
try:
url_ = DataUrl(path=url_)
if not isinstance(url_, DataUrl):
raise TypeError("given input for, ", name, "is invalid")
_spectra_url = get_url(original_url=spectra_url, name="spectra")
_energy_url = get_url(original_url=channel_url, name="energy")
_config_url = None
if not (_config_url is None or isinstance(_config_url, DataUrl)):
raise TypeError("given input for configuration is invalid")
from est.core.types import Dim # avoid cyclic import
dimensions_ = (Dim.DIM_2, Dim.DIM_1, Dim.DIM_0)
else:
dimensions_ = []
for dim in dimensions:
dimensions_.append(Dim.from_value(dim))
spectra = load_data(
_spectra_url,
name="spectra",
dimensions=dimensions_,
columns_names=columns_names,
energy_unit=energy_unit,
energy = load_data(
_energy_url,
name="energy",
dimensions=dimensions_,
columns_names=columns_names,
energy_unit=energy_unit,
)
configuration = load_data(
_config_url,
name="configuration",
dimensions=dimensions_,
columns_names=columns_names,
energy_unit=energy_unit,
if energy is None:
raise ValueError("Unable to load energy from {}".format(_energy_url))
if not energy.shape[0] == spectra.shape[0]:
err = "Energy / channel and spectra dim1 have incoherent length (%s vs %s)" % (
energy.shape[0],
spectra.shape[0],
)
payno
committed
return spectra, energy * energy_unit, configuration
h5_file, entry, process, results, processing_order, data_path="/", overwrite=True
"""
Write a xas :class:`.Process` into .h5
:param str h5_file: path to the hdf5 file
:param str entry: entry name
:param process: process executed
:type: :class:`.Process`
:param results: process result data
:type: numpy.ndarray
:param processing_order: processing order of treatment
:type: int
:param data_path: path to store the data
:type: str
process_name = "xas_process_" + str(processing_order)
# write the xasproc default information
with h5py.File(h5_file, "a") as h5f:
nx_entry = h5f.require_group("/".join((data_path, entry)))
nx_entry.attrs["NX_class"] = "NXentry"
nx_process = nx_entry.require_group(process_name)
payno
committed
if overwrite:
for key in (
"program",
"version",
"date",
"processing_order",
"class_instance",
"ft",
):
payno
committed
if key in nx_process:
del nx_process[key]
nx_process["program"] = process.program_name()
nx_process["version"] = process.program_version()
nx_process["date"] = datetime.now().replace(microsecond=0).isoformat()
nx_process["processing_order"] = numpy.int32(processing_order)
nx_process["class_instance"] = ".".join((_class.__module__, _class.__name__))
nx_data = nx_entry.require_group("data")
nx_data.attrs["NX_class"] = "NXdata"
nx_data.attrs["signal"] = "data"
if isinstance(results, numpy.ndarray):
data_ = {"data": results}
data_ = results
def get_interpretation(my_data):
"""Return hdf5 attribute for this type of data"""
if isinstance(my_data, numpy.ndarray):
return None
def save_key(key_path, value):
"""Save the given value to the associated path. Manage numpy arrays
and dictionaries"""
# save if is dict
if isinstance(value, dict):
h5_path = "/".join((entry, process_name, key_path))
dicttoh5(
value, h5file=h5_file, h5path=h5_path, overwrite_data=True, mode="a"
)
nx_process = h5f.require_group(nx_process_path)
try:
nx_process[key_path] = value
except TypeError as e:
_logger.error("Unable to write", str(key_path), "reason is", str(e))
else:
interpretation = get_interpretation(value)
if interpretation:
nx_process[key_path].attrs["interpretation"] = interpretation
for key, value in data_.items():
key_path = "/".join(("results", key))
save_key(key_path=key_path, value=value)
if process.getConfiguration() is not None:
h5_path = "/".join((nx_process_path, "configuration"))
dicttoh5(
process.getConfiguration(),
h5file=h5_file,
h5path=h5_path,
overwrite_data=True,
mode="a",
)
def write_xas(
h5_file,
entry,
energy,
mu,
sample=None,
start_time=None,
data_path="/",
title=None,
definition=None,
overwrite=True,
):
"""
Write raw date in nexus format
:param str h5_file: path to the hdf5 file
:param str entry: entry name
:param sample: definition of the sample
:type: :class:`.Sample`
:param energy: beam energy (1D)
:type: numpy.ndarray
:param mu: beam absorption (2D)
:type: numpy.ndarray
:param str title: experiment title
:param str definition: experiment definition
"""
with h5py.File(h5_file, "a") as h5f:
nx_entry = h5f.require_group("/".join((data_path, entry)))
nx_entry.attrs["NX_class"] = "NXentry"
# store energy
nx_monochromator = nx_entry.require_group("monochromator")
nx_monochromator.attrs["NX_class"] = "NXmonochromator"
if overwrite and "energy" in nx_monochromator:
del nx_monochromator["energy"]
nx_monochromator["energy"] = energy
nx_monochromator["energy"].attrs["interpretation"] = "spectrum"
nx_monochromator["energy"].attrs["NX_class"] = "NXdata"
nx_absorbed_beam = nx_entry.require_group("absorbed_beam")
nx_absorbed_beam.attrs["NX_class"] = "NXdetector"
if overwrite and "data" in nx_absorbed_beam:
del nx_absorbed_beam["data"]
nx_absorbed_beam["data"] = mu
nx_absorbed_beam["data"].attrs["interpretation"] = "image"
nx_absorbed_beam["data"].attrs["NX_class"] = "NXdata"
nx_sample = nx_entry.require_group("sample")
nx_sample.attrs["NX_class"] = "NXsample"
if overwrite and "name" in nx_sample:
del nx_sample["name"]
nx_sample["name"] = sample.name
nx_data = nx_entry.require_group("data")
nx_data.attrs["NX_class"] = "NXdata"
if overwrite and "energy" in nx_data:
del nx_data["energy"]
nx_data["energy"] = h5py.SoftLink(nx_monochromator["energy"].name)
if overwrite and "absorbed_beam" in nx_data:
del nx_data["absorbed_beam"]
nx_data["absorbed_beam"] = h5py.SoftLink(nx_absorbed_beam["data"].name)
if overwrite and "start_time" in nx_entry:
del nx_entry["start_time"]
nx_entry["start_time"] = start_time
if overwrite and "title" in nx_entry:
del nx_entry["title"]
nx_entry["title"] = title
if overwrite and "definition" in nx_entry:
del nx_entry["definition"]
nx_entry["definition"] = definition
def write_spectrum_saving_pt(h5_file, entry, obj, overwrite=True):
"""Save the current status of an est object
:param str h5_file: path to the hdf5 file
:param str entry: entry name
:param obj: object to save.
:param str obj_name: name of the object to store
:param str data_path:
"""
dicttoh5(obj, h5file=h5_file, h5path=entry, overwrite_data=True, mode="a")
def get_xasproc(h5_file, entry):
"""
Return the list of all NXxasproc existing at the data_path level
:param str h5_file: hdf5 file
:param str entry: data location
def copy_nx_xas_process(h5_group):
"""copy base information from nx_xas_process"""
res = {}
res["_h5py_path"] = h5_group.name
relevant_keys = (
"program",
"version",
"data",
"parameters",
"processing_order",
"configuration",
"class_instance",
)
Henri Payno
committed
from silx.io.dictdump import h5todict
for key in h5_group.keys():
# for now we don't want to copy the numpy array (data)
if key in relevant_keys:
if key == "configuration":
config_path = "/".join((h5_group.name, "configuration"))
res[key] = h5todict(h5_file, config_path, asarray=False)
Henri Payno
committed
else:
res[key] = h5_group[key][...]
try:
root_group = h5f[entry]
except KeyError:
_logger.warning(entry + " does not exist in " + h5_file)
else:
for key in root_group.keys():
elmt = root_group[key]
if hasattr(elmt, "attrs") and "NX_class" in elmt.attrs:
if elmt.attrs["NX_class"] == "NXprocess":
nx_xas_proc = copy_nx_xas_process(elmt)
if len(nx_xas_proc) == 0:
_logger.warning(
"one xas process was not readable "
"from the hdf5 file at:" + key
)
else:
res.append(nx_xas_proc)
return res
from est.core.process.pymca.normalization import PyMca_normalization
from est.core.process.pymca.exafs import PyMca_exafs
from est.core.types import Sample
if os.path.exists(h5_file):
os.remove(h5_file)
data = numpy.random.rand(256 * 20 * 10)
process_data = numpy.random.rand(256 * 20 * 10).reshape((256, 20, 10))
energy = numpy.linspace(start=3.25, stop=3.69, num=256)
write_xas(h5_file=h5_file, entry="scan1", sample=sample, energy=energy, mu=data)
process_norm = PyMca_normalization()
write_xas_proc(
h5_file=h5_file,
entry="scan1",
process=process_norm,
results=process_data,
process_data2 = numpy.random.rand(256 * 20 * 10).reshape((256, 20, 10))
write_xas_proc(
h5_file=h5_file,
entry="scan1",
process=process_exafs,
results=process_data2,
def get_column_name(dat_file):
from silx.io.spech5 import SpecH5
from silx.io.spech5 import SpecFile
spec_h5 = SpecH5(dat_file)
spec_file = SpecFile(dat_file)
if __name__ == "__main__":
get_column_name("specfiledata_tests.dat")