process.py 6.43 KB
Newer Older
1
2
import argparse
import sys
payno's avatar
payno committed
3
4
from pypushflow.Workflow import ProcessableWorkflow
from pypushflow.representation.scheme.ows_parser import OwsParser
5
import logging
payno's avatar
payno committed
6
import signal
payno's avatar
payno committed
7
from pypushflow.representation.scheme.scheme import Scheme
8
from silx.io.url import DataUrl
9
from est.units import ur
payno's avatar
payno committed
10
11
from .utils import get_unit
from .utils import get_url
12
13
from .utils import convert_spectra_dims
from .utils import get_xas_obj
payno's avatar
payno committed
14

15
16
try:
    import h5py
payno's avatar
payno committed
17

18
19
20
    has_h5py = True
except:
    has_h5py = False
payno's avatar
payno committed
21
from typing import Union
22

23
logging.basicConfig(level=logging.DEBUG)
24
25
26
_logger = logging.getLogger(__name__)


27
28
29
30
31
32
33
34
def _insert_input_in_scheme(
    scheme,
    input_,
    input_spectra_url,
    input_spectra_dims,
    input_channel_url,
    input_configuration_url,
):
35
36
37
38
    """update 'starting' node properties to include the provided input"""
    # monkey patch the input file for start nodes if an input is given
    for node in scheme.nodes:
        if node.properties and "_input_file_setting" in node.properties:
39
            if input_ is not None:
40
                node.properties["_input_file_setting"] = input_
41
42
43
44
45
46
47
48
49
50
            if input_spectra_url is not None:
                node.properties["_spectra_url_setting"] = input_spectra_url.path()
            if input_spectra_dims is not None:
                node.properties["_dimensions_setting"] = input_spectra_dims
            if input_channel_url is not None:
                node.properties["_energy_url_setting"] = input_channel_url.path()
            if input_configuration_url is not None:
                node.properties[
                    "_configuration_url_setting"
                ] = input_configuration_url.path()
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67


def _insert_output_in_scheme(scheme, output_):
    """update 'starting' node properties to include the provided input"""
    found_output = False
    # monkey patch the input file for start nodes if an input is given
    for node in scheme.nodes:
        if node.properties and "_output_file_setting" in node.properties:
            node.properties["_output_file_setting"] = output_
            found_output = True
    if not found_output:
        _logger.warning(
            "No node for processing output found. output "
            "information provided will be ignored"
        )


payno's avatar
payno committed
68
69
def exec_(
    scheme: Scheme,
70
    input_energy_unit=ur.eV,
payno's avatar
payno committed
71
    input_: Union[str, None, dict] = None,
72
    input_spectra_url: Union[None, DataUrl] = None,
73
    input_dims: Union[None, tuple] = None,
74
75
    input_channel_url: Union[None, DataUrl] = None,
    input_configuration_url: Union[None, DataUrl] = None,
payno's avatar
payno committed
76
77
78
    output_: Union[str, None, dict] = None,
    timeout: Union[int, None] = None,
):
79
80

    has_url_information = (
81
        input_spectra_url or input_channel_url or input_configuration_url
82
83
84
85
86
    )
    if input_ is not None and has_url_information:
        raise ValueError("You cannot provide an input file and input urls")

    if input_ is not None or has_url_information:
87
88
89
90
        _insert_input_in_scheme(
            scheme=scheme,
            input_=input_,
            input_spectra_url=input_spectra_url,
91
            input_spectra_dims=input_dims,
92
93
94
            input_channel_url=input_channel_url,
            input_configuration_url=input_configuration_url,
        )
95
96
    if output_ is not None:
        _insert_output_in_scheme(scheme=scheme, output_=output_)
97

payno's avatar
payno committed
98
99
100
101
102
103
104
105
106
107
    workflow = ProcessableWorkflow(scheme=scheme)

    # add SIGINT capture
    def signal_handler(sig, frame):
        _logger.warning("stop workflow execution on user request")
        workflow._end_actor.join(0)
        sys.exit(0)

    signal.signal(signal.SIGINT, signal_handler)

payno's avatar
payno committed
108
109
110
111
112
113
114
115
    xas_obj = get_xas_obj(
        input_energy_unit=input_energy_unit,
        input_=input_,
        input_spectra_url=input_spectra_url,
        input_dims=input_dims,
        input_channel_url=input_channel_url,
        input_configuration_url=input_configuration_url,
    )
116
117

    workflow._start_actor.trigger(("data", xas_obj.to_dict()))
payno's avatar
payno committed
118
119
120
121
122
123
    workflow._end_actor.join(timeout)
    res = workflow._end_actor.out_data
    title = scheme.title or "unknow"
    _logger.info("workflow '{}' completed with {}".format(title, str(input_)))
    return res

124
125
126
127

def main(argv):
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument(
payno's avatar
payno committed
128
129
130
        "workflow_file",
        help="Workflow file providing the workflow description (.ows, .xml)",
    )
131
    # single file input option
payno's avatar
payno committed
132
    parser.add_argument(
133
134
        "-i",
        "--input",
payno's avatar
payno committed
135
136
        dest="input_",
        default=None,
137
        help="Input of the workflow. Should be a path to a file",
payno's avatar
payno committed
138
    )
139
    # input url option
payno's avatar
payno committed
140
    parser.add_argument(
141
142
143
        "--input-spectra",
        dest="input_spectra",
        default=None,
144
        help="Input spectra url",
145
146
147
148
    )
    parser.add_argument(
        "--input-spectra-dims",
        dest="input_spectra_dims",
149
150
151
152
        default=None,
        help="Input spectra dimension. Should be a tuple of three values: "
        "(X,Y,channel). If None will take the default dimension "
        "according to the input type.",
153
154
155
156
157
    )
    parser.add_argument(
        "--input-channel",
        dest="input_channel",
        default=None,
158
        help="Input channel url (usually energy)",
159
160
161
162
163
    )
    parser.add_argument(
        "--input-configuration",
        dest="input_configuration",
        default=None,
164
165
166
167
168
169
170
        help="Input configuration url",
    )
    parser.add_argument(
        "--input-energy-unit",
        dest="input_energy_unit",
        default="eV",
        help="energy unit",
171
    )
172
173
174
175
    parser.add_argument(
        "--input-dimensions",
        dest="input_dimensions",
        default="None",
176
        help="dimension of the input as (Z,Y,X) for example."
177
178
        "If None will take default unit according to the input type",
    )
179
180
181
182
    # output option
    parser.add_argument(
        "-o",
        "--output",
payno's avatar
payno committed
183
184
185
186
187
        dest="output_",
        default=None,
        help="Output file of the workflow. Require at most one "
        "instance of XASOutputOW",
    )
188
189
    options = parser.parse_args(argv[1:])

payno's avatar
payno committed
190
    scheme = OwsParser.scheme_load(options.workflow_file, load_handlers=True)
191
192
193
    exec_(
        scheme=scheme,
        input_=options.input_,
194
        input_spectra_url=get_url(options.input_spectra),
195
        input_dims=convert_spectra_dims(options.input_spectra_dims),
196
197
        input_channel_url=get_url(options.input_channel),
        input_configuration_url=get_url(options.input_configuration),
198
        output_=options.output_,
199
        input_energy_unit=get_unit(options.input_energy_unit),
200
    )
201

payno's avatar
payno committed
202

203
204
if __name__ == "__main__":
    main(sys.argv)