Commit 982ed8d3 authored by payno's avatar payno
Browse files

add python typing on main classes.

close #16

Signed-off-by: default avatarpayno <>
parent a5c69e1a
......@@ -5,7 +5,6 @@ from pypushflow.representation.scheme.ows_parser import OwsParser
from import read_spectrum
has_read_spectrum = True
except ImportError:
has_read_spectrum = False
......@@ -13,19 +12,23 @@ from est.core.types import Spectrum, XASObject
from import read_xas
import logging
import signal
from pypushflow.representation.scheme.scheme import Scheme
import h5py
has_h5py = True
has_h5py = False
from typing import Union
_logger = logging.getLogger(__name__)
def exec_(scheme, input_=None, output_=None, timeout=None):
def exec_(scheme: Scheme,
input_: Union[str, None, dict] = None,
output_: Union[str, None, dict] = None,
timeout: Union[int, None] = None):
def manage_input():
if input_:
# monkey patch the input file for start nodes if an input is given
......@@ -3,7 +3,7 @@ import sys
import logging
from est.core.types import XASObject
from est.core.reprocessing import get_process_instance_frm_h5_desc
from typing import Union
import h5py
......@@ -16,7 +16,11 @@ _logger = logging.getLogger(__name__)
def exec_(
h5_file, entry="scan1", spectra_path=None, energy_path=None, configuration_path=None
h5_file: str,
entry: str = "scan1",
spectra_path: Union[None, str] = None,
energy_path: Union[None, str] = None,
configuration_path: Union[None, str] = None,
......@@ -47,14 +51,6 @@ def exec_(
return xas_obj
def getinputinfo():
return "xas reprocessing workflow_file.h5"
def getInputFrmHdf5(file_path):
raise NotImplementedError()
def main(argv):
parser = argparse.ArgumentParser(description=__doc__)
......@@ -33,6 +33,7 @@ from .progress import Progress
from est.core.types import XASObject
import logging
import pint
from typing import Iterable
_logger = logging.getLogger(__name__)
......@@ -55,7 +56,7 @@ class Process(object):
self._callbacks = []
def name(self):
def name(self) -> str:
return self._name
def stop(self):
......@@ -75,7 +76,7 @@ class Process(object):
return self._callbacks
def getXasObject(xas_obj):
def getXasObject(xas_obj) -> XASObject:
if isinstance(xas_obj, dict):
_xas_obj = XASObject.from_dict(xas_obj)
......@@ -89,19 +90,19 @@ class Process(object):
assert isinstance(_xas_obj, XASObject)
return _xas_obj
def program_name(self):
def program_name(self) -> str:
"""Name of the program used for this processing"""
raise NotImplementedError("Base class")
def program_version(self):
def program_version(self) -> str:
"""version of the program used for this processing"""
raise NotImplementedError("Base class")
def definition(self):
def definition(self) -> str:
"""definition of the process"""
raise NotImplementedError("Base class")
def getConfiguration(self):
def getConfiguration(self) -> dict:
:return: configuration of the process
......@@ -112,7 +113,7 @@ class Process(object):
return None
def setConfiguration(self, configuration):
def setConfiguration(self, configuration: dict):
# filter configuration from orange widgets
if "__version__" in configuration:
del configuration["__version__"]
......@@ -125,7 +126,7 @@ class Process(object):
self._settings = configuration
def register_process(self, xas_obj, data_keys):
def register_process(self, xas_obj: XASObject, data_keys: Iterable):
Store the current process in the linked h5 file if any,
output data stored will be the one defined by the data_keys
......@@ -32,6 +32,7 @@ from .process import Process
import logging
import numpy
import pint
from typing import Union
_logger = logging.getLogger(__name__)
......@@ -55,26 +56,26 @@ class _ROI(object):
self.size = size
def origin(self):
def origin(self) -> tuple:
return self.__origin
def origin(self, origin):
def origin(self, origin: Union[list, tuple]):
self.__origin = int(origin[0]), int(origin[1])
def size(self):
def size(self) -> tuple:
return self.__size
def size(self, size):
def size(self, size: Union[list, tuple]):
self.__size = int(size[0]), int(size[1])
def to_dict(self):
def to_dict(self) -> dict:
return {"origin": self.origin, "size": self.size}
def from_dict(ddict):
def from_dict(ddict: dict):
return _ROI(origin=ddict["origin"], size=ddict["size"])
......@@ -89,11 +90,11 @@ class ROIProcess(Process):
Process.__init__(self, name="roi")
self._roi = None
def set_properties(self, properties):
def set_properties(self, properties: dict):
if "roi" in properties:
self._roi = _ROI.from_dict(properties["roi"])
def setRoi(self, origin, size):
def setRoi(self, origin: Union[list, tuple], size: Union[list, tuple]):
self._roi = _ROI(origin=origin, size=size)
def _apply_roi(self, xas_obj, roi):
......@@ -168,7 +169,7 @@ class ROIProcess(Process):
xas_obj.force_dims(dim1=roi.size[1], dim2=roi.size[0])
return xas_obj
def process(self, xas_obj):
def process(self, xas_obj: Union[dict, XASObject]):
:param xas_obj: object containing the configuration and spectra to process
......@@ -37,10 +37,14 @@ import h5py
import tempfile
import os
import shutil
import pint
from est.units import ur
from est.units import units, convert_to as convert_unit_to
from silx.utils.enum import Enum
from import h5py_read_dataset
from est.units import units, ur, convert_to as convert_unit_to
import pint
from typing import Union
from typing import Iterable
_logger = logging.getLogger(__name__)
......@@ -111,15 +115,15 @@ class XASObject(object):
configuration: Union[dict, None] = None,
dim1: Union[int, None] = None,
dim2: Union[int, None] = None,
name: str = "scan1",
keep_process_flow: bool = True,
spectra_url: Union[DataUrl, None] = None,
energy_url: Union[DataUrl, None] = None,
check_energy: bool = True,
if isinstance(energy, numpy.ndarray):
# automatic energy unit guess. convert energy provided in keV to eV
......@@ -152,44 +156,45 @@ class XASObject(object):
def entry(self):
def entry(self) -> str:
return self.__entry_name
# TODO: fix me: should return a tuple
def spectra(self):
def spectra(self) -> list:
return self.__spectra
def spectra_url(self):
def spectra_url(self) -> Union[None, DataUrl]:
"""Url from where the spectra is available.
Used for object serialization"""
return self.__spectra_url
def spectra_url(self, url):
def spectra_url(self, url: DataUrl):
self.__spectra_url = url
def spectra_url_dims(self):
def spectra_url_dims(self) -> tuple:
"""used to interpret the spectra_url if any"""
return self.__spectra_url_dims
def spectra_url_dims(self, dims):
def spectra_url_dims(self, dims: tuple):
self.__spectra_url_dims = dims
def energy_url(self):
def energy_url(self) -> Union[DataUrl, None]:
"""Url from where the energy is available.
Used for object serialization"""
return self.__energy_url
def energy_url(self, url):
def energy_url(self, url: DataUrl):
self.__energy_url = url
def spectra(self, energy_spectra):
def spectra(self, energy_spectra: tuple):
energy, spectra, dim1, dim2 = energy_spectra
if isinstance(energy, numpy.ndarray):
energy = energy * ur.eV
......@@ -230,10 +235,10 @@ class XASObject(object):
else: = None
def _setSpectra(self, spectra):
def _setSpectra(self, spectra: list):
self.__spectra = spectra
def get_spectrum(self, dim1_idx, dim2_idx):
def get_spectrum(self, dim1_idx: int, dim2_idx: int) -> spectra:
"""Util function to access the spectrum at dim1_idx, dim2_idx"""
assert dim1_idx < self.dim1
assert dim2_idx < self.dim2
......@@ -246,7 +251,7 @@ class XASObject(object):
def dim1(self):
def dim1(self) -> int:
return self.__dim1
def force_dims(self, dim1: int, dim2: int):
......@@ -269,13 +274,18 @@ class XASObject(object):
self.get_spectrum(dim1_idx=y, dim2_idx=x)._force_indexes(x=x, y=y)
def dim2(self):
def dim2(self) -> int:
return self.__dim2
def energy(self):
def energy(self) -> numpy.ndarray:
"""energy as a numpy array in eV"""
return self.__energy
def energy_with_unit(self) -> pint.Quantity:
return self.__energy * ur.eV
def energy(self, energy):
if isinstance(energy, numpy.ndarray):
......@@ -289,11 +299,11 @@ class XASObject(object):
_logger.warning("spectra and energy have incoherent dimension")
def configuration(self):
def configuration(self) -> dict:
return self.__configuration
def configuration(self, configuration):
def configuration(self, configuration: dict):
assert configuration is None or isinstance(configuration, dict)
self.__configuration = configuration or {}
......@@ -335,7 +345,7 @@ class XASObject(object):
else convert_unit_to(, ur.eV).m
def to_dict(self, with_process_details=True):
def to_dict(self, with_process_details=True) -> dict:
"""convert the XAS object to a dict
By default made to simply import raw data.
......@@ -386,13 +396,14 @@ class XASObject(object):
return res
def absorbed_beam(self):
def absorbed_beam(self) -> numpy.ndarray:
return XASObject._spectra_volume(
spectra=self.spectra, key="mu", dim_1=self.dim1, dim_2=self.dim2
def _spectra_volume(spectra, key, dim_1, dim_2, relative_to="energy"):
def _spectra_volume(spectra: list, key: str, dim_1: int, dim_2: int,
relative_to: str = "energy"):
"""Convert a list of spectra (mu) to a numpy array.
..note: only convert raw data for now"""
......@@ -451,7 +462,7 @@ class XASObject(object):
return array.reshape(dim_1, dim_2)
def load_frm_dict(self, ddict):
def load_frm_dict(self, ddict: dict):
"""load XAS values from a dict"""
from import load_data # avoid cyclic import
......@@ -502,7 +513,7 @@ class XASObject(object):
return self
def from_dict(ddict):
def from_dict(ddict: dict):
return XASObject().load_frm_dict(ddict=ddict)
......@@ -537,7 +548,7 @@ class XASObject(object):
return XASObject(spectra=sp, energy=en, configuration=conf)
def dump(self, h5_file):
def dump(self, h5_file: str):
"""dump the XAS object to a file_path within the Nexus format"""
from import XASWriter # avoid cyclic import
......@@ -592,24 +603,25 @@ class XASObject(object):
return True
def n_spectrum(self):
def n_spectrum(self) -> int:
"""return the number of spectra"""
if self.__spectra is None:
return 0
return len(self.__spectra)
def spectra_keys(self):
"""keys contained by the spectrum object (energy, mu, normalizedmu...)"""
def spectra_keys(self) -> tuple:
"""keys contained by the spectrum object (energy, mu, normalizedmu...)
if len(self.spectra) > 0:
assert isinstance(self.spectra[0], Spectrum)
return self.spectra[0].keys()
return tuple(self.spectra[0].keys())
def linked_h5_file(self):
return self.__h5_file
def link_to_h5(self, h5_file):
def link_to_h5(self, h5_file: str):
Associate a .h5 file to the XASObject. This can be used for storing
process flow.
......@@ -619,14 +631,14 @@ class XASObject(object):
self.__h5_file = h5_file
def has_linked_file(self):
def has_linked_file(self) -> bool:
return self.__h5_file is not None
def get_next_processing_index(self):
def get_next_processing_index(self) -> int:
self.__processing_index += 1
return self.__processing_index
def register_processing(self, process, data):
def register_processing(self, process, data) -> None:
Register one process for the current xas object. This require to having
link a h5file to this object
......@@ -647,7 +659,7 @@ class XASObject(object):
def get_process_flow(self):
def get_process_flow(self) -> dict:
:return: the dict of process information
......@@ -704,7 +716,7 @@ class XASObject(object):
return get_ordered_process(recognized_process)
def clean_process_flow(self):
def clean_process_flow(self) -> None:
Remove existing process flow
......@@ -720,7 +732,7 @@ class XASObject(object):
for index, process_ in process_flow.items():
del h5f[process_["_h5py_path"]]
def copy_process_flow_to(self, h5_file_target):
def copy_process_flow_to(self, h5_file_target: str) -> None:
copy all the recognized process from self.__h5_file to h5_file_target
......@@ -791,7 +803,11 @@ class Spectrum(_Spectrum_Base):
_Y_POS_KEY = "YPos"
def __init__(self, energy=None, mu=None, x=None, y=None):
def __init__(self,
energy: Union[None, numpy.ndarray] = None,
mu: Union[None, numpy.ndarray] = None,
x: Union[None, int] = None,
y: Union[None, int] = None):
if energy is not None:
assert isinstance(energy, (numpy.ndarray, pint.Quantity))
......@@ -831,7 +847,7 @@ class Spectrum(_Spectrum_Base):
def energy(self):
def energy(self) -> Union[None, numpy.ndarray]:
return self.__energy
......@@ -841,96 +857,96 @@ class Spectrum(_Spectrum_Base):
self.__energy = energy
def mu(self):
def mu(self) -> Union[None, numpy.ndarray]:
return self.__mu
def mu(self, mu):
def mu(self, mu: numpy.ndarray):
assert isinstance(mu, numpy.ndarray) or mu is None
self.__mu = mu
def x(self):
def x(self) -> Union[None, int]:
return self.__x
def y(self):
def y(self) -> Union[None, int]:
return self.__y
def chi(self):
def chi(self) -> Union[None, numpy.ndarray]:
return self.__chi
def chi(self, chi):
def chi(self, chi: numpy.ndarray):
self.__chi = chi
def k(self):
def k(self) -> Union[None, numpy.ndarray]:
return self.__k_values
def k(self, k):
def k(self, k: numpy.ndarray):
self.__k_values = k
def normalized_mu(self):
def normalized_mu(self) -> Union[None, numpy.ndarray]:
return self.__normalized_mu
def normalized_mu(self, mu):
def normalized_mu(self, mu: numpy.ndarray):
assert isinstance(mu, numpy.ndarray) or mu is None
self.__normalized_mu = mu
def norm(self):
def norm(self) -> Union[None, numpy.ndarray]:
# this alias is needed for larch
return self.__normalized_mu
def norm(self, value):
def norm(self, value: numpy.ndarray):
# this alias is needed for larch
self.__normalized_mu = value
def normalized_energy(self):
def normalized_energy(self) -> Union[None, numpy.ndarray]:
return self.__normalized_energy
def normalized_energy(self, energy):
def normalized_energy(self, energy: Union[numpy.ndarray, pint.Quantity]):
assert isinstance(energy, (numpy.ndarray, pint.Quantity)) or energy is None
self.__normalized_energy = energy
def pre_edge(self):
def pre_edge(self) -> Union[None, numpy.ndarray]:
return self.__pre_edge
def pre_edge(self, value):
def pre_edge(self, value: numpy.ndarray):
self.__pre_edge = value
def _edge(self):
def _edge(self) -> Union[None, numpy.ndarray]:
return self.edge
def _edge(self, value):
def _edge(self, value: numpy.ndarray):
self.edge = value
def post_edge(self):
def post_edge(self) -> Union[None, numpy.ndarray]:
return self.__post_edge
def post_edge(self, value):
def post_edge(self, value: numpy.ndarray):
self.__post_edge = value
def e0(self):
def e0(self) -> Union[None, numpy.ndarray]:
return self.__e0
def e0(self, e0):
def e0(self, e0: numpy.ndarray):
self.__e0 = e0
......@@ -945,7 +961,7 @@ class Spectrum(_Spectrum_Base):
self.__ft = _FT(ddict=ft)