Commit 4d52bde2 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

use node id and label destinction

parent a0bf590e
Pipeline #59081 passed with stages
in 46 seconds
......@@ -13,6 +13,7 @@ from ewokscore import load_graph
from ewokscore.inittask import instantiate_task
from ewokscore.inittask import add_dynamic_inputs
from ewokscore.graph import ewoks_jsonload_hook
from ewokscore.node import get_node_label
logger = logging.getLogger(__name__)
......@@ -26,7 +27,7 @@ def execute_task(execinfo, *inputs):
add_dynamic_inputs(dynamic_inputs, link_attrs, source_results)
task = instantiate_task(
execinfo["node_attrs"],
node_name=execinfo["node_name"],
node_id=execinfo["node_id"],
inputs=dynamic_inputs,
varinfo=execinfo["varinfo"],
)
......@@ -36,23 +37,21 @@ def execute_task(execinfo, *inputs):
except Exception as e:
if execinfo["enable_logging"]:
logger.error(
"\nEXECUTE {} {}\n INPUTS: {}\n ERROR: {}".format(
execinfo["node_name"],
repr(task),
task.input_values,
e,
),
"\nEXECUTE %s %s\n INPUTS: %s\n ERROR: %s",
execinfo["node_label"],
task,
task.input_values,
e,
)
raise
if execinfo["enable_logging"]:
if execinfo["enable_logging"] or True:
logger.info(
"\nEXECUTE {} {}\n INPUTS: {}\n OUTPUTS: {}".format(
execinfo["node_name"],
repr(task),
task.input_values,
task.output_values,
),
"\nEXECUTE %s %s\n INPUTS: %s\n OUTPUTS: %s",
execinfo["node_label"],
task,
task.input_values,
task.output_values,
)
return task.output_transfer_data
......@@ -60,11 +59,15 @@ def execute_task(execinfo, *inputs):
def convert_graph(ewoksgraph, varinfo, enable_logging=False):
daskgraph = dict()
for target, node_attrs in ewoksgraph.graph.nodes.items():
sources = tuple(source for source in ewoksgraph.predecessors(target))
link_attrs = tuple(ewoksgraph.graph[source][target] for source in sources)
for target_id, node_attrs in ewoksgraph.graph.nodes.items():
source_ids = tuple(ewoksgraph.predecessors(target_id))
link_attrs = tuple(
ewoksgraph.graph[source_id][target_id] for source_id in source_ids
)
node_label = get_node_label(node_attrs, node_id=target_id)
execinfo = {
"node_name": target,
"node_id": target_id,
"node_label": node_label,
"node_attrs": node_attrs,
"link_attrs": link_attrs,
"varinfo": varinfo,
......@@ -72,7 +75,7 @@ def convert_graph(ewoksgraph, varinfo, enable_logging=False):
}
# Note: the execinfo is serialized to prevent dask
# from interpreting node names as task results
daskgraph[target] = (execute_task, json.dumps(execinfo)) + sources
daskgraph[target_id] = (execute_task, json.dumps(execinfo)) + source_ids
return daskgraph
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment