test_ppf_workflow6.py 2.69 KB
Newer Older
1
2
3
import sys
import logging
from esrf2pypushflow import job
4
from taskgraphlib import assert_taskgraph_result
5
6
7
8
9
10
11
12

logging.getLogger("esrf2pypushflow").setLevel(logging.DEBUG)
logging.getLogger("esrf2pypushflow").addHandler(logging.StreamHandler(sys.stdout))


def submodel6():
    nodes = [
        {
13
            "id": "addtask2a",
14
            "ppfmethod": "tests.test_ppf_actors.pythonActorAdd.run",
15
16
17
        },
        {
            "id": "addtask2b",
18
            "ppfmethod": "tests.test_ppf_actors.pythonActorAdd.run",
19
        },
Wout De Nolf's avatar
Wout De Nolf committed
20
21
        {"id": "in", "ppfport": "input"},
        {"id": "out", "ppfport": "output"},
22
23
    ]

24
    links = [
Wout De Nolf's avatar
Wout De Nolf committed
25
26
27
        {"source": "in", "target": "addtask2a", "all_arguments": True},
        {"source": "addtask2a", "target": "addtask2b", "all_arguments": True},
        {"source": "addtask2b", "target": "out", "all_arguments": True},
28
    ]
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

    graph = {
        "directed": True,
        "graph": {"name": "submodel6"},
        "links": links,
        "multigraph": False,
        "nodes": nodes,
    }

    return graph


def workflow6():
    nodes = [
        {
            "id": "addtask1",
45
            "inputs": {"value": 1},
46
            "ppfmethod": "tests.test_ppf_actors.pythonActorAdd.run",
47
48
49
        },
        {
            "id": "addtask3",
50
            "ppfmethod": "tests.test_ppf_actors.pythonActorAdd.run",
51
52
53
54
55
56
57
58
59
60
61
        },
        {"id": "submodel6", "graph": submodel6()},
    ]

    links = [
        {
            "source": "addtask1",
            "target": "submodel6",
            "links": [
                {
                    "source": "addtask1",
62
                    "target": "in",
63
                    "all_arguments": True,
64
65
66
67
68
69
70
71
                }
            ],
        },
        {
            "source": "submodel6",
            "target": "addtask3",
            "links": [
                {
72
                    "source": "out",
73
                    "target": "addtask3",
74
                    "all_arguments": True,
75
76
77
78
79
80
81
82
83
84
85
86
87
88
                }
            ],
        },
    ]

    graph = {
        "directed": True,
        "graph": {"name": "workflow6"},
        "links": links,
        "multigraph": False,
        "nodes": nodes,
    }

    expected_results = {
89
90
91
92
93
94
        "addtask1": {"ppfdict": {"value": 2}},
        ("submodel6", "in"): {"ppfdict": {"value": 2}},
        ("submodel6", "addtask2a"): {"ppfdict": {"value": 3}},
        ("submodel6", "addtask2b"): {"ppfdict": {"value": 4}},
        ("submodel6", "out"): {"ppfdict": {"value": 4}},
        "addtask3": {"ppfdict": {"value": 5}},
95
96
97
98
99
100
101
102
103
    }

    return graph, expected_results


def test_workflow6(tmpdir):
    varinfo = {"root_uri": str(tmpdir)}
    graph, expected = workflow6()
    job(graph, varinfo=varinfo)
104
    assert_taskgraph_result(graph, expected, varinfo)