Commit 0b536477 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

Merge branch 'fix_logging' into 'main'

no explicit logging needed

See merge request !17
parents 248995d8 98e0cd26
Pipeline #61715 passed with stages
in 2 minutes and 6 seconds
...@@ -3,7 +3,6 @@ https://docs.dask.org/en/latest/scheduler-overview.html ...@@ -3,7 +3,6 @@ https://docs.dask.org/en/latest/scheduler-overview.html
""" """
import json import json
import logging
from typing import Optional from typing import Optional
from dask.distributed import Client from dask.distributed import Client
from dask.threaded import get as multithreading_scheduler from dask.threaded import get as multithreading_scheduler
...@@ -17,9 +16,6 @@ from ewokscore.graph import ewoks_jsonload_hook ...@@ -17,9 +16,6 @@ from ewokscore.graph import ewoks_jsonload_hook
from ewokscore.node import get_node_label from ewokscore.node import get_node_label
logger = logging.getLogger(__name__)
def execute_task(execinfo, *inputs): def execute_task(execinfo, *inputs):
execinfo = json.loads(execinfo, object_pairs_hook=ewoks_jsonload_hook) execinfo = json.loads(execinfo, object_pairs_hook=ewoks_jsonload_hook)
...@@ -33,27 +29,7 @@ def execute_task(execinfo, *inputs): ...@@ -33,27 +29,7 @@ def execute_task(execinfo, *inputs):
varinfo=execinfo["varinfo"], varinfo=execinfo["varinfo"],
) )
try: task.execute()
task.execute()
except Exception as e:
if execinfo["enable_logging"]:
logger.error(
"\nEXECUTE %s %s\n INPUTS: %s\n ERROR: %s",
execinfo["node_label"],
task,
task.input_values,
e,
)
raise
if execinfo["enable_logging"] or True:
logger.info(
"\nEXECUTE %s %s\n INPUTS: %s\n OUTPUTS: %s",
execinfo["node_label"],
task,
task.input_values,
task.output_values,
)
return task.output_transfer_data return task.output_transfer_data
...@@ -79,7 +55,6 @@ def convert_graph(ewoksgraph, **execute_options): ...@@ -79,7 +55,6 @@ def convert_graph(ewoksgraph, **execute_options):
def execute_graph( def execute_graph(
graph, graph,
scheduler=None, scheduler=None,
log_task_execution: Optional[bool] = False,
results_of_all_nodes: Optional[bool] = False, results_of_all_nodes: Optional[bool] = False,
load_options: Optional[dict] = None, load_options: Optional[dict] = None,
**execute_options **execute_options
...@@ -91,9 +66,7 @@ def execute_graph( ...@@ -91,9 +66,7 @@ def execute_graph(
raise RuntimeError("Dask can only execute DAGs") raise RuntimeError("Dask can only execute DAGs")
if ewoksgraph.has_conditional_links: if ewoksgraph.has_conditional_links:
raise RuntimeError("Dask cannot handle conditional links") raise RuntimeError("Dask cannot handle conditional links")
daskgraph = convert_graph( daskgraph = convert_graph(ewoksgraph, **execute_options)
ewoksgraph, enable_logging=log_task_execution, **execute_options
)
if results_of_all_nodes: if results_of_all_nodes:
nodes = list(ewoksgraph.graph.nodes) nodes = list(ewoksgraph.graph.nodes)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment