# This file has been generated automatically using # pypushflow 0.2.0-rc1 and est 0.3.0-dev1 import est.core.process.io import pypushflow.utils import est.core.process.larch.pre_edge import est.core.process.larch.autobk import est.core.process.larch.xftf import est.core.process.noise import logging _logger = logging.getLogger(__name__) connections = {} def get_output_channel_name(class_inst, output): assert hasattr(class_inst, "outputs") for output_ in class_inst.outputs: output_name, output_type_, output_handler = output_[:3] if isinstance(output, output_type_): return output_name return None def get_handler(class_inst, channel_name): """Find the associate handler for channel `channel_name`""" if hasattr(class_inst, "inputs"): for input_ in class_inst.inputs: input_name, input_type, input_handler = input_[:3] if input_name == channel_name: return input_handler return None return None def connect(source_process, source_channel, sink_process, sink_channel): if (source_process, source_channel) not in connections: connections[(source_process, source_channel)] = [] handler = get_handler(sink_process, channel_name=sink_channel) if handler is None: return TypeError( "{} channel is not managed by {}".format(sink_channel, sink_process) ) connections[source_process, source_channel].append((sink_process, handler)) """Register for each cuple process / outputtype the process and handler to launch when processes end""" def trigger(process, input_data, channel_name): _logger.info( "{} has been trigger on channel {} with input {}" "".format(process, channel_name, input_data) ) if isinstance(process, IgnoreProcess): output_data = input_data output_channel_name = channel_name else: handler = get_handler(class_inst=process, channel_name=channel_name) if handler is None: raise ValueError( "Fail to find handler for {} on channel {}" "".format(process, channel_name) ) assert hasattr(process, handler) output_data = getattr(process, handler)(input_data) output_channel_name = get_output_channel_name(process, output_data) if (process, output_channel_name) in connections: for downstream in connections[(process, output_channel_name)]: sink_process, handler = downstream trigger(sink_process, output_data, output_channel_name) def main(input_data, channel, options): mask = (input_data.energy > 100) & ( input_data.energy < (input_data.energy.max() - 100) ) input_data.energy = input_data.energy[mask] input_data.spectra.data.flat.energy = input_data.spectra.data.flat.energy[mask] input_data.spectra.data.flat.mu = input_data.spectra.data.flat.mu[mask] process_0 = est.core.process.io.DumpXasObject() process_0.set_properties({"_output_file_setting": "output_bm23.h5"}) process_1 = pypushflow.utils.IgnoreProcess() process_1.set_properties( { "_I0_url_setting": "silx:///users_ext/dev/esrf/workflow/est/Cu_CHA_68E_Proc5b.h5?/110.1/measurement/I0", "_I1_url_setting": "silx:///users_ext/dev/esrf/workflow/est/Cu_CHA_68E_Proc5b.h5?/110.1/measurement/I1", "_I2_url_setting": "silx:///users_ext/dev/esrf/workflow/est/Cu_CHA_68E_Proc5b.h5?/110.1/measurement/I2", "_configuration_url_setting": None, "_dimensions_setting": ("dim 2", "dim 1", "dim 0"), "_energy_unit_settings": "kiloelectron_volt", "_energy_url_setting": "silx:///users_ext/dev/esrf/workflow/est/Cu_CHA_68E_Proc5b.h5?/110.1/measurement/musst_eneenc", "_mu_ref_url_setting": "silx:///users_ext/dev/esrf/workflow/est/Cu_CHA_68E_Proc5b.h5?/110.1/measurement/mu_ref", "_spec_file_setting": ("", "", "", "", ""), "_spectra_url_setting": "silx:///users_ext/dev/esrf/workflow/est/Cu_CHA_68E_Proc5b.h5?/110.1/measurement/mu_trans", } ) process_2 = est.core.process.larch.pre_edge.Larch_pre_edge() process_2.set_properties( { "_larchSettings": { "e0": None, "step": None, "pre1": None, "pre2": -50.0, "norm1": 100.0, "norm2": None, "nnorm": None, "nvict": 0, "make_flat": True, "emin_area": None, } } ) process_2.update_properties(options.set_pre_edge_params) process_3 = est.core.process.larch.autobk.Larch_autobk() process_3.set_properties( { "_larchSettings": { "rbkg": 1, "e0": None, "edge_step": None, "kmin": 0, "kmax": None, "kweight": 1, "dk": 0.1, "win": "hanning", "nfft": 2048, "kstep": 0.05, "nclamp": 4, "clamp_lo": 1, "clamp_hi": 1, "calc_uncertaintites": True, "err_sigma": 1, } } ) process_3.update_properties(options.set_autobk_params) process_4 = est.core.process.larch.xftf.Larch_xftf() process_4.set_properties( { "_larchSettings": { "kmin": 0, "kmax": 20, "kweight": 0, "dk": 1, "dk2": None, "with_phase": False, "window": "kaiser", "rmax_out": 6.0, "nfft": 2048, "kstep": 0.05, } } ) process_4.update_properties(options.set_xftf_params) process_5 = est.core.process.noise.NoiseProcess() process_5.set_properties( {"e_max": -1, "e_min": -1, "polynomial_order": 2, "window_size": 5} ) process_5.update_properties(options.set_noise_params) connect(process_1, "xas_obj", process_2, "xas_obj") connect(process_2, "xas_obj", process_3, "xas_obj") connect(process_3, "xas_obj", process_4, "xas_obj") connect(process_4, "xas_obj", process_5, "xas_obj") connect(process_5, "xas_obj", process_0, "xas_obj") trigger(process_1, input_data, channel) # start processing from pypushflow.utils import IgnoreProcess if __name__ == "__main__": import sys import argparse from est.app.utils import get_xas_obj from est.app.utils import get_url from est.app.utils import get_unit from est.app.utils import convert_spectra_dims from silx.io.url import DataUrl from est.core.types import Dim from est.io.utils.information import InputInformation from est.io.utils.information import SpecInputInformation parser = argparse.ArgumentParser(description=__doc__) # single file input option parser.add_argument( "-i", "--input", dest="input_", default=None, help="Input of the workflow. Should be a path to a file", ) # input url option parser.add_argument( "--input-spectra", "--spectra", dest="input_spectra", default=None, help="Input spectra url", ) parser.add_argument( "--input-spectra-dims", "--spectra-dims", dest="input_spectra_dims", default=None, help="Input spectra dimension. Should be a tuple of three values: " "(X,Y,channel). If None will take the default dimension " "according to the input type.", ) parser.add_argument( "--input-energy", "--input-channel", "--channel", dest="input_channel", default=None, help="Input channel url (usually energy)", ) parser.add_argument( "--input-configuration", "--configuration", dest="input_configuration", default=None, help="Input configuration url", ) parser.add_argument( "--input-energy-unit", "--energy-unit", dest="input_energy_unit", default="eV", help="energy unit", ) parser.add_argument( "--input-dimensions", "--dimensions", dest="input_dimensions", default="None", help="dimension of the input as (Z,Y,X) for example." "If None will take default unit according to the input type", ) # I0, I1, I2 & mu_ref parser.add_argument( "--input-I0", "--I0", dest="input_I0", default="None", help="url to I0", ) parser.add_argument( "--input-I1", "--I1", dest="input_I1", default="None", help="url to I1", ) parser.add_argument( "--input-I2", "--I2", dest="input_I2", default="None", help="url to I2", ) parser.add_argument( "--input-mu-ref", "--mu-ref", dest="input_mu_ref", default="None", help="url to mu_ref", ) # spec file specific inputs parser.add_argument( "--input-energy-col-name", "--energy-col-name", dest="input_energy_col_name", default=None, help="Provide name of the energy column for spec file", ) parser.add_argument( "--input-abs-col-name", "--abs-col-name", dest="input_abs_col_name", default=None, help="Provide name of the absorption column for spec file", ) parser.add_argument( "--input-monitor-col-name", "--monitor-col-name", dest="input_monitor_col_name", default=None, help="Provide name of the monitor column for spec file", ) parser.add_argument( "--input-scan-title", "--scan-title", dest="input_scan_title_name", default=None, help="Provide scan title name to consider", ) # handle larch settings parser.add_argument( "--set-autobk-params", dest="set_autobk_params", default=None, help="set autobk settings", ) parser.add_argument( "--set-mback-params", dest="set_mback_params", default=None, help="set mback settings", ) parser.add_argument( "--set-mback-norm-params", dest="set_mback_norm_params", default=None, help="set mback norm settings", ) parser.add_argument( "--set-pre-edge-params", dest="set_pre_edge_params", default=None, help="set pre-edge settings", ) parser.add_argument( "--set-xftf-params", dest="set_xftf_params", default=None, help="set xftf settings", ) # handle noise settings parser.add_argument( "--set-noise--params", dest="set_noise_params", default=None, help="set noise settings", ) options = parser.parse_args(sys.argv[1:]) input_information = InputInformation( spectra_url=get_url(options.input_spectra), channel_url=get_url(options.input_channel), dimensions=convert_spectra_dims(options.input_spectra_dims), config_url=get_url(options.input_configuration), energy_unit=get_unit(options.input_energy_unit), spec_input=SpecInputInformation( options.input_, options.input_energy_col_name, options.input_abs_col_name, options.input_monitor_col_name, options.input_scan_title_name, ), ) input_information.I0 = get_url(options.input_I0) input_information.I1 = get_url(options.input_I1) input_information.I2 = get_url(options.input_I2) input_information.mu_ref = get_url(options.input_mu_ref) xas_obj = get_xas_obj(input_information) main(input_data=xas_obj, channel="xas_obj", options=options)