graph.py 24.8 KB
Newer Older
1
import os
2
3
import enum
import json
4
from collections import Counter
5
from collections.abc import Mapping
6
from typing import Dict, Iterable, Optional, Set
7
import networkx
8

9
10
11
from . import inittask
from .utils import qualname
from .utils import dict_merge
12
from .subgraph import extract_graph_nodes
13
from .subgraph import add_subgraph_links
14
15
16
from .task import Task
from .node import NodeIdType
from .node import node_id_from_json
17
18
19
20

CONDITIONS_ELSE_VALUE = "__other__"


21
def load_graph(source=None, representation=None, **load_options):
22
23
24
    if isinstance(source, TaskGraph):
        return source
    else:
25
        return TaskGraph(source=source, representation=representation, **load_options)
26
27
28
29
30
31
32
33


def set_graph_defaults(graph_as_dict):
    graph_as_dict.setdefault("directed", True)
    graph_as_dict.setdefault("nodes", list())
    graph_as_dict.setdefault("links", list())


34
def node_has_links(graph, node_id):
35
    try:
36
        next(graph.successors(node_id))
37
38
    except StopIteration:
        try:
39
            next(graph.predecessors(node_id))
40
41
42
43
44
        except StopIteration:
            return False
    return True


45
def merge_graphs(graphs, graph_attrs=None, rename_nodes=None, **load_options):
46
47
48
49
50
51
    lst = list()
    if rename_nodes is None:
        rename_nodes = [True] * len(graphs)
    else:
        assert len(graphs) == len(rename_nodes)
    for g, rename in zip(graphs, rename_nodes):
52
        g = load_graph(g, **load_options)
53
54
55
56
57
58
        gname = repr(g)
        g = g.graph
        if rename:
            mapping = {s: (gname, s) for s in g.nodes}
            g = networkx.relabel_nodes(g, mapping, copy=True)
        lst.append(g)
59
    ret = load_graph(networkx.compose_all(lst), **load_options)
60
61
    if graph_attrs:
        ret.graph.graph.update(graph_attrs)
62
63
64
    return ret


65
66
def flatten_multigraph(graph: networkx.DiGraph) -> networkx.DiGraph:
    """The attributes of links between the same two nodes are merged."""
67
68
69
70
71
72
73
74
    if not graph.is_multigraph():
        return graph
    newgraph = networkx.DiGraph(**graph.graph)

    edgeattrs = dict()
    for edge, attrs in graph.edges.items():
        key = edge[:2]
        mergedattrs = edgeattrs.setdefault(key, dict())
75
76
77
        # mergedattrs["links"] and attrs["links"]
        # could be two sequences that need to be concatenated
        dict_merge(mergedattrs, attrs, contatenate_sequences=True)
78
79
80
81
82
83
84
85

    for name, attrs in graph.nodes.items():
        newgraph.add_node(name, **attrs)
    for (source, target), mergedattrs in edgeattrs.items():
        newgraph.add_edge(source, target, **mergedattrs)
    return newgraph


86
def get_subgraphs(graph: networkx.DiGraph, **load_options):
87
    subgraphs = dict()
88
    for node_id, node_attrs in graph.nodes.items():
89
        task_type, task_info = inittask.task_executable_info(
90
            node_attrs, node_id=node_id, all=True
91
        )
92
93
        if task_type == "graph":
            g = load_graph(task_info["task_identifier"], **load_options)
94
95
96
97
98
            g.graph.graph["id"] = node_id
            node_label = node_attrs.get("label")
            if node_label:
                g.graph.graph["label"] = node_label
            subgraphs[node_id] = g
99
100
101
    return subgraphs


Wout De Nolf's avatar
Wout De Nolf committed
102
103
def _ewoks_jsonload_hook_pair(item):
    key, value = item
104
    if key in ("source", "target", "sub_source", "sub_target", "id", "sub_node"):
105
        value = node_id_from_json(value)
Wout De Nolf's avatar
Wout De Nolf committed
106
107
108
109
110
111
112
    return key, value


def ewoks_jsonload_hook(items):
    return dict(map(_ewoks_jsonload_hook_pair, items))


113
114
115
116
117
118
119
120
def abs_path(path, root_dir=None):
    if os.path.isabs(path):
        return path
    if root_dir:
        path = os.path.join(root_dir, path)
    return os.path.abspath(path)


121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
class TaskGraph:
    """The API for graph analysis is provided by `networkx`.
    Any directed graph is supported (cyclic or acyclic).

    Loop over the dependencies of a task

    .. code-block:: python

        for source in taskgraph.predecessors(target):
            link_attrs = taskgraph.graph[source][target]

    Loop over the tasks dependent on a task

    .. code-block:: python

        for target in taskgraph.successors(source):
            link_attrs = taskgraph.graph[source][target]

    Instantiate a task

    .. code-block:: python

        task = taskgraph.instantiate_task(name, varinfo=varinfo, inputs=inputs)

    For acyclic graphs, sequential task execution can be done like this:

    .. code-block:: python

        taskgraph.execute()
    """

    GraphRepresentation = enum.Enum(
        "GraphRepresentation", "json_file json_dict json_string yaml"
    )

156
157
    def __init__(self, source=None, representation=None, **load_options):
        self.load(source=source, representation=representation, **load_options)
158
159

    def __repr__(self):
160
161
162
163
164
165
166
167
168
        return self.graph_label

    @property
    def graph_id(self):
        return self.graph.graph.get("id", qualname(type(self)))

    @property
    def graph_label(self):
        return self.graph.graph.get("label", self.graph_id)
169
170
171
172
173
174

    def __eq__(self, other):
        if not isinstance(other, type(self)):
            raise TypeError(other, type(other))
        return self.dump() == other.dump()

175
    def load(self, source=None, representation=None, root_dir=None):
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
        """From persistent to runtime representation"""
        if representation is None:
            if isinstance(source, Mapping):
                representation = self.GraphRepresentation.json_dict
            elif isinstance(source, str):
                if source.endswith(".json"):
                    representation = self.GraphRepresentation.json_file
                else:
                    representation = self.GraphRepresentation.json_string
        if not source:
            graph = networkx.DiGraph()
        elif isinstance(source, networkx.Graph):
            graph = source
        elif isinstance(source, TaskGraph):
            graph = source.graph
        elif representation == self.GraphRepresentation.json_dict:
            set_graph_defaults(source)
            graph = networkx.readwrite.json_graph.node_link_graph(source)
        elif representation == self.GraphRepresentation.json_file:
195
            source = abs_path(source, root_dir)
196
            with open(source, mode="r") as f:
Wout De Nolf's avatar
Wout De Nolf committed
197
                source = json.load(f, object_pairs_hook=ewoks_jsonload_hook)
198
199
200
            set_graph_defaults(source)
            graph = networkx.readwrite.json_graph.node_link_graph(source)
        elif representation == self.GraphRepresentation.json_string:
Wout De Nolf's avatar
Wout De Nolf committed
201
            source = json.loads(source, object_pairs_hook=ewoks_jsonload_hook)
202
203
204
            set_graph_defaults(source)
            graph = networkx.readwrite.json_graph.node_link_graph(source)
        elif representation == self.GraphRepresentation.yaml:
205
            source = abs_path(source, root_dir)
206
207
208
209
210
211
212
            graph = networkx.readwrite.read_yaml(source)
        else:
            raise TypeError(representation, type(representation))

        if not networkx.is_directed(graph):
            raise TypeError(graph, type(graph))

213
        subgraphs = get_subgraphs(graph, root_dir=root_dir)
214
215
        if subgraphs:
            # Extract
216
217
            edges, update_attrs = extract_graph_nodes(graph, subgraphs)
            graph = flatten_multigraph(graph)
218
219
220
221
222
223

            # Merged
            self.graph = graph
            graphs = [self] + list(subgraphs.values())
            rename_nodes = [False] + [True] * len(subgraphs)
            graph = merge_graphs(
224
                graphs,
225
                graph_attrs=graph.graph,
226
227
                rename_nodes=rename_nodes,
                root_dir=root_dir,
228
229
230
231
232
            ).graph

            # Re-link
            add_subgraph_links(graph, edges, update_attrs)

233
        self.graph = flatten_multigraph(graph)
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
        self.validate_graph()

    def dump(self, destination=None, representation=None, **kw):
        """From runtime to persistent representation"""
        if representation is None:
            if isinstance(destination, str) and destination.endswith(".json"):
                representation = self.GraphRepresentation.json_file
            else:
                representation = self.GraphRepresentation.json_dict
        if representation == self.GraphRepresentation.json_dict:
            return networkx.readwrite.json_graph.node_link_data(self.graph)
        elif representation == self.GraphRepresentation.json_file:
            dictrepr = self.dump()
            with open(destination, mode="w") as f:
                json.dump(dictrepr, f, **kw)
            return destination
        elif representation == self.GraphRepresentation.json_string:
            dictrepr = self.dump()
            return json.dumps(dictrepr, **kw)
        elif representation == self.GraphRepresentation.yaml:
            return networkx.readwrite.write_yaml(self.graph, destination, **kw)
        else:
            raise TypeError(representation, type(representation))

    def serialize(self):
        return self.dump(representation=self.GraphRepresentation.json_string)

    @property
    def is_cyclic(self):
        return not networkx.is_directed_acyclic_graph(self.graph)

    @property
    def has_conditional_links(self):
        for attrs in self.graph.edges.values():
            if attrs.get("conditions") or attrs.get("on_error"):
                return True
        return False

272
273
274
275
276
277
    def instantiate_task(
        self,
        node_id: NodeIdType,
        varinfo: Optional[dict] = None,
        inputs: Optional[dict] = None,
    ) -> Task:
278
        """Named arguments are dynamic input and Variable config.
279
        Default input from the persistent representation are added internally.
280
        """
281
        # Dynamic input has priority over default input
282
        nodeattrs = self.graph.nodes[node_id]
283
        return inittask.instantiate_task(
284
            nodeattrs, node_id=node_id, varinfo=varinfo, inputs=inputs
285
286
        )

287
288
289
    def instantiate_task_static(
        self,
        node_id: NodeIdType,
290
        tasks: Optional[Dict[Task, int]] = None,
291
        varinfo: Optional[dict] = None,
292
        evict_result_counter: Optional[Dict[NodeIdType, int]] = None,
293
    ) -> Task:
294
295
        """Instantiate destination task while no access to the dynamic inputs.
        Side effect: `tasks` will contain all predecessors.
296
297
298
299
300
        """
        if self.is_cyclic:
            raise RuntimeError(f"{self} is cyclic")
        if tasks is None:
            tasks = dict()
301
302
303
        if evict_result_counter is None:
            evict_result_counter = dict()
        # Input from previous tasks (instantiate them if needed)
304
        dynamic_inputs = dict()
305
306
307
308
309
310
311
312
        for source_node_id in self.predecessors(node_id):
            source_task = tasks.get(source_node_id, None)
            if source_task is None:
                source_task = self.instantiate_task_static(
                    source_node_id,
                    tasks=tasks,
                    varinfo=varinfo,
                    evict_result_counter=evict_result_counter,
313
                )
314
            link_attrs = self.graph[source_node_id][node_id]
315
            inittask.add_dynamic_inputs(
316
                dynamic_inputs, link_attrs, source_task.output_variables
317
            )
318
319
320
321
322
323
324
325
326
327
328
            # Evict intermediate results
            if evict_result_counter:
                evict_result_counter[source_node_id] -= 1
                if evict_result_counter[source_node_id] == 0:
                    tasks.pop(source_node_id)
        # Instantiate the requested task
        target_task = self.instantiate_task(
            node_id, varinfo=varinfo, inputs=dynamic_inputs
        )
        tasks[node_id] = target_task
        return target_task
329

330
    def successors(self, node_id: NodeIdType, **include_filter):
331
        yield from self._iter_downstream_nodes(
332
            node_id, recursive=False, **include_filter
333
334
        )

335
    def descendants(self, node_id: NodeIdType, **include_filter):
336
        yield from self._iter_downstream_nodes(
337
            node_id, recursive=True, **include_filter
338
339
        )

340
341
    def predecessors(self, node_id: NodeIdType, **include_filter):
        yield from self._iter_upstream_nodes(node_id, recursive=False, **include_filter)
342

343
344
    def ancestors(self, node_id: NodeIdType, **include_filter):
        yield from self._iter_upstream_nodes(node_id, recursive=True, **include_filter)
345

346
347
    def has_successors(self, node_id: NodeIdType, **include_filter):
        return self._iterator_has_items(self.successors(node_id, **include_filter))
348

349
350
    def has_descendants(self, node_id: NodeIdType, **include_filter):
        return self._iterator_has_items(self.descendants(node_id, **include_filter))
351

352
353
    def has_predecessors(self, node_id: NodeIdType, **include_filter):
        return self._iterator_has_items(self.predecessors(node_id, **include_filter))
354

355
356
    def has_ancestors(self, node_id: NodeIdType, **include_filter):
        return self._iterator_has_items(self.ancestors(node_id, **include_filter))
357
358
359
360
361
362
363
364
365

    @staticmethod
    def _iterator_has_items(iterator):
        try:
            next(iterator)
            return True
        except StopIteration:
            return False

366
367
    def _iter_downstream_nodes(self, node_id: NodeIdType, **kw):
        yield from self._iter_nodes(node_id, upstream=False, **kw)
368

369
370
    def _iter_upstream_nodes(self, node_id: NodeIdType, **kw):
        yield from self._iter_nodes(node_id, upstream=True, **kw)
371
372
373

    def _iter_nodes(
        self,
374
        node_id: NodeIdType,
375
376
377
378
379
380
381
382
383
384
385
386
387
        upstream=False,
        recursive=False,
        _visited=None,
        **include_filter,
    ):
        """Recursion is not stopped by the node or link filters.
        Recursion is stopped by either not having any successors/predecessors
        or encountering a node that has been visited already.
        The original node on which we start iterating is never included.
        """
        if recursive:
            if _visited is None:
                _visited = set()
388
            elif node_id in _visited:
389
                return
390
            _visited.add(node_id)
391
392
393
394
        if upstream:
            iter_next_nodes = self.graph.predecessors
        else:
            iter_next_nodes = self.graph.successors
395
396
        for next_id in iter_next_nodes(node_id):
            node_is_included = self._filter_node(next_id, **include_filter)
397
            if upstream:
398
                link_is_included = self._filter_link(next_id, node_id, **include_filter)
399
            else:
400
                link_is_included = self._filter_link(node_id, next_id, **include_filter)
401
            if node_is_included and link_is_included:
402
                yield next_id
403
404
            if recursive:
                yield from self._iter_nodes(
405
                    next_id,
406
407
408
409
410
411
412
413
                    upstream=upstream,
                    recursive=True,
                    _visited=_visited,
                    **include_filter,
                )

    def _filter_node(
        self,
414
        node_id: NodeIdType,
415
416
417
418
419
420
421
        node_filter=None,
        node_has_predecessors=None,
        node_has_successors=None,
        **linkfilter,
    ):
        """Filters are combined with the logical AND"""
        if callable(node_filter):
422
            if not node_filter(node_id):
423
424
                return False
        if node_has_predecessors is not None:
425
            if self.has_predecessors(node_id) != node_has_predecessors:
426
427
                return False
        if node_has_successors is not None:
428
            if self.has_successors(node_id) != node_has_successors:
429
430
431
432
433
                return False
        return True

    def _filter_link(
        self,
434
435
        source_id: NodeIdType,
        target_id: NodeIdType,
436
437
438
439
440
441
442
443
444
        link_filter=None,
        link_has_on_error=None,
        link_has_conditions=None,
        link_is_conditional=None,
        link_has_required=None,
        **nodefilter,
    ):
        """Filters are combined with the logical AND"""
        if callable(link_filter):
445
            if not link_filter(source_id, target_id):
446
447
                return False
        if link_has_on_error is not None:
448
            if self._link_has_on_error(source_id, target_id) != link_has_on_error:
449
450
                return False
        if link_has_conditions is not None:
451
            if self._link_has_conditions(source_id, target_id) != link_has_conditions:
452
453
                return False
        if link_is_conditional is not None:
454
            if self._link_is_conditional(source_id, target_id) != link_is_conditional:
455
456
                return False
        if link_has_required is not None:
457
            if self._link_has_required(source_id, target_id) != link_has_required:
458
459
460
                return False
        return True

461
462
    def _link_has_conditions(self, source_id: NodeIdType, target_id: NodeIdType):
        link_attrs = self.graph[source_id][target_id]
463
464
        return bool(link_attrs.get("conditions", False))

465
466
    def _link_has_on_error(self, source_id: NodeIdType, target_id: NodeIdType):
        link_attrs = self.graph[source_id][target_id]
467
468
        return bool(link_attrs.get("on_error", False))

469
470
    def _link_has_required(self, source_id: NodeIdType, target_id: NodeIdType):
        link_attrs = self.graph[source_id][target_id]
471
472
        return bool(link_attrs.get("required", False))

473
474
    def _link_is_conditional(self, source_id: NodeIdType, target_id: NodeIdType):
        link_attrs = self.graph[source_id][target_id]
475
476
477
478
        return bool(
            link_attrs.get("on_error", False) or link_attrs.get("conditions", False)
        )

479
480
    def link_is_required(self, source_id: NodeIdType, target_id: NodeIdType):
        if self._link_has_required(source_id, target_id):
481
            return True
482
        if self._link_is_conditional(source_id, target_id):
483
            return False
484
        return self._node_is_required(source_id)
485

486
    def _node_is_required(self, node_id: NodeIdType):
487
        return not self.has_ancestors(
488
            node_id, link_has_required=False, link_is_conditional=True
489
490
        )

491
492
493
494
    def _required_predecessors(self, target_id: NodeIdType):
        for source_id in self.predecessors(target_id):
            if self.link_is_required(source_id, target_id):
                yield source_id
495

496
497
    def _has_required_predecessors(self, node_id: NodeIdType):
        return self._iterator_has_items(self._required_predecessors(node_id))
498

499
    def _has_required_static_inputs(self, node_id: NodeIdType):
500
        """Returns True when the default inputs cover all required inputs."""
501
        node_attrs = self.graph.nodes[node_id]
502
503
504
505
506
507
        inputs_complete = node_attrs.get("inputs_complete", None)
        if isinstance(inputs_complete, bool):
            # method and script tasks always have an empty `required_input_names`
            # although they may have required input. This keyword is used the
            # manually indicate that all required inputs are statically provided.
            return inputs_complete
508
        taskclass = inittask.get_task_class(node_attrs, node_id=node_id)
509
        static_inputs = {d["name"] for d in node_attrs.get("default_inputs", list())}
510
        return not (set(taskclass.required_input_names()) - static_inputs)
511

512
    def start_nodes(self) -> Set[NodeIdType]:
513
        nodes = set(
514
515
516
            node_id
            for node_id in self.graph.nodes
            if not self.has_predecessors(node_id)
517
518
519
520
        )
        if nodes:
            return nodes
        return set(
521
522
523
524
            node_id
            for node_id in self.graph.nodes
            if self._has_required_static_inputs(node_id)
            and not self._has_required_predecessors(node_id)
525
526
        )

527
    def end_nodes(self) -> Set[NodeIdType]:
528
        nodes = set(
529
            node_id for node_id in self.graph.nodes if not self.has_successors(node_id)
530
531
532
533
        )
        if nodes:
            return nodes
        return set(
534
535
536
            node_id
            for node_id in self.graph.nodes
            if self._node_has_noncovered_conditions(node_id)
537
538
        )

539
540
    def _node_has_noncovered_conditions(self, source_id: NodeIdType) -> bool:
        links = self._get_node_expanded_conditions(source_id)
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
        has_complement = [False] * len(links)

        default_complements = {CONDITIONS_ELSE_VALUE}
        complements = {
            CONDITIONS_ELSE_VALUE: None,
            True: {False, CONDITIONS_ELSE_VALUE},
            False: {True, CONDITIONS_ELSE_VALUE},
        }

        for i, conditions1 in enumerate(links):
            if has_complement[i]:
                continue
            for j in range(i + 1, len(links)):
                conditions2 = links[j]
                if self._conditions_are_complementary(
                    conditions1, conditions2, default_complements, complements
                ):
                    has_complement[i] = True
                    has_complement[j] = True
                    break
            if not has_complement[i]:
                return True
        return False

    @staticmethod
    def _conditions_are_complementary(
        conditions1, conditions2, default_complements, complements
    ):
        for varname, value in conditions1.items():
            complementary_values = complements.get(value, default_complements)
            if complementary_values is None:
                # Any value is complementary
                continue
            if conditions2[varname] not in complementary_values:
                return False
        return True

578
    def _get_node_expanded_conditions(self, source_id: NodeIdType):
Wout De Nolf's avatar
Wout De Nolf committed
579
580
581
        """Ensure that conditional link starting from a source node has
        the same set of output names.
        """
582
        links = [
583
584
            self.graph[source_id][target_id]["conditions"]
            for target_id in self.successors(source_id, link_has_conditions=True)
585
        ]
Wout De Nolf's avatar
Wout De Nolf committed
586
        all_condition_names = {
587
588
            item["source_output"] for conditions in links for item in conditions
        }
589
        for conditions in links:
Wout De Nolf's avatar
Wout De Nolf committed
590
591
            link_condition_names = {item["source_output"] for item in conditions}
            for name in all_condition_names - link_condition_names:
592
593
594
                conditions.append(
                    {"source_output": name, "value": CONDITIONS_ELSE_VALUE}
                )
595
596
597
        return links

    def validate_graph(self):
598
599
        for node_id, node_attrs in self.graph.nodes.items():
            inittask.validate_task_executable(node_attrs, node_id=node_id)
600
601

            # Isolated nodes do no harm so comment this
602
603
            # if len(graph.nodes) > 1 and not node_has_links(graph, node_id):
            #    raise ValueError(f"Node {repr(node_id)} has no links")
604
605

            inputs_from_required = dict()
606
607
            for source_id in self._required_predecessors(node_id):
                link_attrs = self.graph[source_id][node_id]
608
                arguments = link_attrs.get("data_mapping", list())
609
610
                for arg in arguments:
                    try:
611
                        name = arg["target_input"]
612
613
                    except KeyError:
                        raise KeyError(
614
                            f"Argument '{arg}' of link '{source_id}' -> '{node_id}' is missing an 'input' key"
615
                        ) from None
616
617
                    other_source_id = inputs_from_required.get(name)
                    if other_source_id:
618
                        raise ValueError(
619
                            f"Node {repr(source_id)} and {repr(other_source_id)} both connect to the input {repr(name)} of {repr(node_id)}"
620
                        )
621
                    inputs_from_required[name] = source_id
622
623
624
625
626

        for (source, target), linkattrs in self.graph.edges.items():
            err_msg = (
                f"Link {source}->{target}: '{{}}' and '{{}}' cannot be used together"
            )
627
628
            if linkattrs.get("map_all_data") and linkattrs.get("data_mapping"):
                raise ValueError(err_msg.format("map_all_data", "data_mapping"))
629
630
631
            if linkattrs.get("on_error") and linkattrs.get("conditions"):
                raise ValueError(err_msg.format("on_error", "conditions"))

632
    def topological_sort(self) -> Iterable[NodeIdType]:
633
634
635
636
637
        """Sort node names for sequential instantiation+execution of DAGs"""
        if self.is_cyclic:
            raise RuntimeError("Sorting nodes is not possible for cyclic graphs")
        yield from networkx.topological_sort(self.graph)

638
639
640
641
642
643
644
645
646
647
648
649
    def successor_counter(self) -> Dict[NodeIdType, int]:
        nsuccessor = Counter()
        for edge in self.graph.edges:
            nsuccessor[edge[0]] += 1
        return nsuccessor

    def execute(
        self,
        varinfo: Optional[dict] = None,
        raise_on_error: Optional[bool] = True,
        results_of_all_nodes: Optional[bool] = False,
    ) -> Dict[NodeIdType, Task]:
650
651
652
653
654
        """Sequential execution of DAGs"""
        if self.is_cyclic:
            raise RuntimeError("Cannot execute cyclic graphs")
        if self.has_conditional_links:
            raise RuntimeError("Cannot execute graphs with conditional links")
655
656
657
658
        if results_of_all_nodes:
            evict_result_counter = None
        else:
            evict_result_counter = self.successor_counter()
659
        tasks = dict()
660
661
662
663
664
665
666
        for node_id in self.topological_sort():
            task = self.instantiate_task_static(
                node_id,
                tasks=tasks,
                varinfo=varinfo,
                evict_result_counter=evict_result_counter,
            )
667
            task.execute(raise_on_error=raise_on_error)
668
        return tasks