process.py 3.61 KB
Newer Older
1
2
import argparse
import sys
payno's avatar
payno committed
3
4
from pypushflow.Workflow import ProcessableWorkflow
from pypushflow.representation.scheme.ows_parser import OwsParser
5
6
7

try:
    from est.io.utils.pymca import read_spectrum
payno's avatar
payno committed
8

9
10
11
    has_read_spectrum = True
except ImportError:
    has_read_spectrum = False
12
13
from est.core.types import Spectrum, XASObject
from est.core.io import read_xas
14
import logging
payno's avatar
payno committed
15
import signal
payno's avatar
payno committed
16
from pypushflow.representation.scheme.scheme import Scheme
payno's avatar
payno committed
17

18
19
try:
    import h5py
payno's avatar
payno committed
20

21
22
23
    has_h5py = True
except:
    has_h5py = False
payno's avatar
payno committed
24
from typing import Union
25

26
logging.basicConfig(level=logging.DEBUG)
27
28
29
_logger = logging.getLogger(__name__)


payno's avatar
payno committed
30
31
32
33
34
35
def exec_(
    scheme: Scheme,
    input_: Union[str, None, dict] = None,
    output_: Union[str, None, dict] = None,
    timeout: Union[int, None] = None,
):
36
37
38
39
    def manage_input():
        if input_:
            # monkey patch the input file for start nodes if an input is given
            for node in scheme.nodes:
payno's avatar
payno committed
40
41
42
                if node.properties and "_input_file_setting" in node.properties:
                    node.properties["_input_file_setting"] = input_

43
44
45
46
    manage_input()

    def manage_output():
        if output_:
47
            found_output = False
48
49
            # monkey patch the input file for start nodes if an input is given
            for node in scheme.nodes:
payno's avatar
payno committed
50
51
                if node.properties and "_output_file_setting" in node.properties:
                    node.properties["_output_file_setting"] = output_
52
53
54
55
56
57
                    found_output = True
            if not found_output:
                _logger.warning(
                    "No node for processing output found. output "
                    "information provided will be ignored"
                )
58
59
60
61
62

    # TODO: if the input file contains configuration, then overwrite the node
    # properties...
    manage_output()

payno's avatar
payno committed
63
64
65
66
67
68
69
70
71
72
    workflow = ProcessableWorkflow(scheme=scheme)

    # add SIGINT capture
    def signal_handler(sig, frame):
        _logger.warning("stop workflow execution on user request")
        workflow._end_actor.join(0)
        sys.exit(0)

    signal.signal(signal.SIGINT, signal_handler)

73
    # TODO: handle several entries from url on top of .dat file... using read_xas
74
75
76
77
78
79
    if has_read_spectrum:
        energy, mu = read_spectrum(input_)
        spectrum = Spectrum(energy=energy, mu=mu)
        xas_obj = XASObject(energy=energy, spectra=(spectrum,), dim1=1, dim2=1)
    else:
        raise ValueError("Unable to read spectrum")
80
81

    workflow._start_actor.trigger(("data", xas_obj.to_dict()))
payno's avatar
payno committed
82
83
84
85
86
87
    workflow._end_actor.join(timeout)
    res = workflow._end_actor.out_data
    title = scheme.title or "unknow"
    _logger.info("workflow '{}' completed with {}".format(title, str(input_)))
    return res

88
89
90
91
92
93
94
95
96
97
98
99

def getinputinfo():
    return "xas process workflow_file_desc.ows [input]"


def getInputFrmHdf5(file_path):
    raise NotImplementedError()


def main(argv):
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument(
payno's avatar
payno committed
100
101
102
103
        "workflow_file",
        help="Workflow file providing the workflow description (.ows, .xml)",
    )
    parser.add_argument(
104
        "-i", "--input",
payno's avatar
payno committed
105
106
107
108
109
        dest="input_",
        default=None,
        help="Input of the workflow. require at most one " "instance of XASInputOW",
    )
    parser.add_argument(
110
        "-o", "--output",
payno's avatar
payno committed
111
112
113
114
115
        dest="output_",
        default=None,
        help="Output file of the workflow. Require at most one "
        "instance of XASOutputOW",
    )
116
117
    options = parser.parse_args(argv[1:])

payno's avatar
payno committed
118
    scheme = OwsParser.scheme_load(options.workflow_file, load_handlers=True)
119
    exec_(scheme, options.input_, options.output_)
120

payno's avatar
payno committed
121

122
123
if __name__ == "__main__":
    main(sys.argv)