ewokscore issueshttps://gitlab.esrf.fr/workflow/ewoks/ewokscore/-/issues2024-01-19T08:47:28+01:00https://gitlab.esrf.fr/workflow/ewoks/ewokscore/-/issues/46Allow to define a Task from a function using introspection and typing?2024-01-19T08:47:28+01:00Thomas VincentAllow to define a Task from a function using introspection and typing?Following yesterday training, I was curious about it and so I gave it a try.
It's probably limited in some area and I didn't do it all, but here is a POC:
<details><summary>Sample code and demo</summary>
```python
from collections.abc ...Following yesterday training, I was curious about it and so I gave it a try.
It's probably limited in some area and I didn't do it all, but here is a POC:
<details><summary>Sample code and demo</summary>
```python
from collections.abc import Callable, Iterable
import inspect
from typing import Optional, TypedDict
from ewoks import execute_graph
from ewokscore import Task
def astask(
func: Callable,
name: Optional[str]=None,
output_names: Optional[Iterable[str]]=None,
) -> Task:
"""Convert a function to an ewoks Task.
It levergages introspection and type annotation to set input and outputs.
It can also be used as a decorator.
Warning: In this case, it becomes a class and can no longer be used as a function...
:param func: Function to wrap in a Task
:param name: Name of the returned Task subclass
:param output_names: List of output_names to extract from function returned value
:return: A Task subclass wrapping the provided function
"""
input_names = []
optional_input_names = []
n_required_positional_inputs = 0
signature = inspect.signature(func)
# Get input_names and optional_input_names from signature
for param in signature.parameters.values():
if param.kind == param.VAR_POSITIONAL:
raise TypeError("Variable positional (e.g., *args) arguments are not supported")
if param.kind == param.VAR_KEYWORD:
raise TypeError("Variable keyword (e.g., **kwargs) arguments are not supported")
if param.kind == param.POSITIONAL_ONLY:
raise NotImplementedError("Positional only argument are not supported")
if param.kind not in (param.POSITIONAL_OR_KEYWORD, param.KEYWORD_ONLY):
raise RuntimeError(f"Unsupported parameter kind: {param.kind}")
is_optional = param.default is not param.empty
if is_optional:
optional_input_names.append(param.name)
else:
input_names.append(param.name)
if output_names is not None: # Use provided output_names
unpack_output = True
output_names = tuple(output_names)
else:
# Return result as return_value if return type annotation cannot be used
unpack_output = False
output_names = ('return_value',)
if signature.return_annotation is not signature.empty:
annotation = getattr(signature.return_annotation, '__annotations__', None)
if isinstance(annotation, dict):
# Unpack returned value with known type
# TODO allow users to force returning as return_value in this case
output_names = tuple(annotation.keys())
unpack_output = True
class FuncTaskMixIn:
_TASK_FUNC_UNPACK_OUTPUT = unpack_output
def run(self):
result = func(**self.input_values)
if not self._TASK_FUNC_UNPACK_OUTPUT:
self.outputs.return_value = result
return
for name in self.output_names():
if name in result:
self.outputs[name] = result[name]
return type(
func.__name__ if name is None else name,
(FuncTaskMixIn, Task),
{},
input_names=input_names,
optional_input_names=optional_input_names,
output_names=output_names,
n_required_positional_inputs=n_required_positional_inputs,
)
# Examples
# From function
def add(a, b=0):
return a+b
Add = astask(add, name='Add')
def sub(a, b=0):
return a-b
Sub = astask(sub, name='Sub')
# With return type
class Result(TypedDict):
quotient: int
remainder: int
def integer_division(a, b=1) -> Result:
return {'quotient': a // b, 'remainder': a % b}
IntDiv = astask(integer_division, name='IntDiv')
# As decorator
@astask
def Mult(a: int, b: int) -> int:
return a*b
# Workflow: ((add_a+add_b) - (div_a // div_b)) * (div_a % div_b)
nodes = [
{
"id": "add",
"task_type": "class",
"task_identifier": "__main__.Add",
},
{
"id": "sub",
"task_type": "class",
"task_identifier": "__main__.Sub",
},
{
"id": "div",
"task_type": "class",
"task_identifier": "__main__.IntDiv",
},
{
"id": "mult",
"task_type": "class",
"task_identifier": "__main__.Mult",
},
]
links = [
{
"source": "add",
"target": "sub",
"data_mapping": [{"source_output": "return_value", "target_input": "a"}],
},
{
"source": "div",
"target": "sub",
"data_mapping": [{"source_output": "quotient", "target_input": "b"}],
},
{
"source": "div",
"target": "mult",
"data_mapping": [{"source_output": "remainder", "target_input": "a"}],
},
{
"source": "sub",
"target": "mult",
"data_mapping": [{"source_output": "return_value", "target_input": "b"}],
},
]
workflow = {"graph": {"id": "testworkflow"}, "nodes": nodes, "links": links}
# Define task inputs
add_a, add_b, div_a, div_b = 10, 5, 11, 5
inputs = [
{"id": "add", "name": "a", "value": add_a},
{"id": "add", "name": "b", "value": add_b},
{"id": "div", "name": "a", "value": div_a},
{"id": "div", "name": "b", "value": div_b},
]
result = execute_graph(workflow, inputs=inputs)
print(result)
print('expected:', ((add_a+add_b) - (div_a // div_b)) * (div_a % div_b))
```
</details>
One thing that can be improved is that, when used as a decorator, it is no longer a function.
Maybe with integration in the loading the task, the decorator can be:
<details><summary>Sample code</summary>
```python
def decorator(func, output_names: Optional[Iterable[str]]=None):
func._ewoks_task = astask(func, output_names=output_names)
return func
@decorator
def noop(a):
return a
```
</details>
or:
<details><summary>Sample code</summary>
```python
FUNC_MAPPING = {}
def decorator(func, output_names: Optional[Iterable[str]]=None):
FUNC_MAPPING[func.__name__] = astask(func, output_names=output_names)
return func
@decorator
def noop(a):
return a
```
</details>
and then the function is both a proper function and can be used as a Task.
If you think that's of interest, I can rework it to a proper PR.https://gitlab.esrf.fr/workflow/ewoks/ewokscore/-/issues/49Rename universal hashing to deterministic hashing2024-01-18T21:07:14+01:00Wout De NolfRename universal hashing to deterministic hashingdeterministic hashing: stable across interpreter versions and implementations
universal hashing: selecting a hash function at random from a family of hash functions with a certain mathematical property with the purpose of guaranteeing a...deterministic hashing: stable across interpreter versions and implementations
universal hashing: selecting a hash function at random from a family of hash functions with a certain mathematical property with the purpose of guaranteeing a low number of collisions.
What I meant to achieve is the first thing but I wrongly called it the second thing.https://gitlab.esrf.fr/workflow/ewoks/ewokscore/-/issues/55add warning for unused input data2024-01-18T21:06:04+01:00paynoadd warning for unused input dataAccording to the training slides `Nodes that receive an input they don't have will silently ignore it.`
I think it would be good to avoid ignoring inputs silently.
Maybe some log could be added to a `debug-mode` / specific `log-level`. ...According to the training slides `Nodes that receive an input they don't have will silently ignore it.`
I think it would be good to avoid ignoring inputs silently.
Maybe some log could be added to a `debug-mode` / specific `log-level`. But from experience if users do 'dummy' typos they can just complain that the workflow is not processing as expected and / or we might end up spending (significant) time for (none) debugging.https://gitlab.esrf.fr/workflow/ewoks/ewokscore/-/issues/56load_graph does not work with pathlib.Path object2024-01-16T14:47:33+01:00Olof Svenssonload_graph does not work with pathlib.Path objectThe following code does not work if task_graph_path is a pathlib.Path() object:
```plaintext
graph = load_graph(task_graph_path, root_dir=flows_dir)
```
The following code works:
```plaintext
graph = load_graph(str(task_graph_...The following code does not work if task_graph_path is a pathlib.Path() object:
```plaintext
graph = load_graph(task_graph_path, root_dir=flows_dir)
```
The following code works:
```plaintext
graph = load_graph(str(task_graph_path), root_dir=flows_dir)
```Giannis KoumoutsosGiannis Koumoutsoshttps://gitlab.esrf.fr/workflow/ewoks/ewokscore/-/issues/43Flag to force reset ewoks event handlers2023-12-22T15:13:03+01:00Wout De NolfFlag to force reset ewoks event handlershttps://gitlab.esrf.fr/workflow/ewoks/ewokscore/-/issues/41Ewoks events handlers: change handlers when execinfo has different handlers2023-12-22T15:13:03+01:00Wout De NolfEwoks events handlers: change handlers when execinfo has different handlersWout De NolfWout De Nolfhttps://gitlab.esrf.fr/workflow/ewoks/ewokscore/-/issues/54Add all execution parameters as payload to the job start event2023-10-05T14:40:27+02:00Wout De NolfAdd all execution parameters as payload to the job start eventhttps://gitlab.esrf.fr/workflow/ewoks/ewokscore/-/issues/53`convert_graph` adds fields not present in the spec2023-10-05T14:37:42+02:00Loic Huder`convert_graph` adds fields not present in the specWhen creating a graph with `convert_graph` (as shown in https://workflow.gitlab-pages.esrf.fr/ewokstutorials/ewoksfordevs/#/57), some fields are added to the JSON fields:
##### In root
- `directed`
- `multigraph`
##### In `graph`
- `sc...When creating a graph with `convert_graph` (as shown in https://workflow.gitlab-pages.esrf.fr/ewokstutorials/ewoksfordevs/#/57), some fields are added to the JSON fields:
##### In root
- `directed`
- `multigraph`
##### In `graph`
- `schema_version`
These are not part of the definition given in https://ewokscore.readthedocs.io/en/latest/definitions.html. For example, `ewoksweb` does not support them.
Should we make the spec evolve ?https://gitlab.esrf.fr/workflow/ewoks/ewokscore/-/issues/47Request to save the full Task location2023-06-01T21:16:42+02:00paynoRequest to save the full Task locationSome users think that having the full identifier as idea might not be enought and would like to get the full Task location (module.__file__) stored somewhere. Not sure this is useful but can beconforting to user and maybe a good helper f...Some users think that having the full identifier as idea might not be enought and would like to get the full Task location (module.__file__) stored somewhere. Not sure this is useful but can beconforting to user and maybe a good helper for debug in the case some environment are a bit dirty.https://gitlab.esrf.fr/workflow/ewoks/ewokscore/-/issues/40Optimize graph analysis2023-03-10T14:04:41+01:00Wout De NolfOptimize graph analysissrc/ewokscore/graph/analysis.py
The graph iteration methods will transverse the graph multiple times and the time increases non-linear with the number of nodes. A single iteration should be possible.src/ewokscore/graph/analysis.py
The graph iteration methods will transverse the graph multiple times and the time increases non-linear with the number of nodes. A single iteration should be possible.Loic HuderLoic Huderhttps://gitlab.esrf.fr/workflow/ewoks/ewokscore/-/issues/38Recursive task discovery2023-01-26T13:10:49+01:00Wout De NolfRecursive task discoveryWout De NolfWout De Nolfhttps://gitlab.esrf.fr/workflow/ewoks/ewokscore/-/issues/45Restoring Variable data from HDF52022-10-28T17:00:48+02:00Wout De NolfRestoring Variable data from HDF5Type changes can be problematic:
https://gitlab.esrf.fr/workflow/ewoksapps/ewoksxrpd/-/blob/main/src/ewoksxrpd/tasks/utils/data_utils.py#L14Type changes can be problematic:
https://gitlab.esrf.fr/workflow/ewoksapps/ewoksxrpd/-/blob/main/src/ewoksxrpd/tasks/utils/data_utils.py#L14https://gitlab.esrf.fr/workflow/ewoks/ewokscore/-/issues/35Expand versioning to graphs and nodes2022-06-09T12:09:57+02:00Wout De NolfExpand versioning to graphs and nodes## What we already have
We can currently specify the version of the graph language to describe a workflow in the workflow description
```json
{"graph": {"version": "1.0"}}
```
and we have a version to a workflow task implementation
`...## What we already have
We can currently specify the version of the graph language to describe a workflow in the workflow description
```json
{"graph": {"version": "1.0"}}
```
and we have a version to a workflow task implementation
```python
class MyTask(Task, version="1.2"):
...
```
which is used in the hashing scheme.
## What we should add
We need to add required task versioning and graph versioning. What I propose:
```json
{"graph": {"version": "1.0", "graph_version":"1.4"}
"nodes": [
{"id": "task1", "task_identifier":"myproject.MyTask", "task_version":">=1.1,<2"}
]
}
```
The `"task_version"` will be used by ewoks to check the version of the task and raise an exception if it doesn't. In this example it will check whether the current MyTask version `1.2` satifies `>=1.1,<2` (which it does in this case). The version pattern can be anything that for example pip would use.
The `"graph_version"` is not used by ewoks but by any version control system we use. Not sure about this one. If we use git for example, the version does not need to be inside the workflow description itself (it's a git tag for example).
@payno @svensson @koumouts @andy.gotz What do you thinks?Wout De NolfWout De Nolfhttps://gitlab.esrf.fr/workflow/ewoks/ewokscore/-/issues/34yield typing is Iterator, not Iterable2022-05-25T13:12:54+02:00Wout De Nolfyield typing is Iterator, not IterableWout De NolfWout De Nolfhttps://gitlab.esrf.fr/workflow/ewoks/ewokscore/-/issues/27Logger injected into all tasks2022-05-05T14:41:08+02:00Stuart FisherLogger injected into all tasks> @denolf
> I discussed with @payno and you should make another issue for this and describe the use case. Strictly speaking python
> logging is not ewoks' business. But it would be easy to use the `execinfo` mechanism that registers ha...> @denolf
> I discussed with @payno and you should make another issue for this and describe the use case. Strictly speaking python
> logging is not ewoks' business. But it would be easy to use the `execinfo` mechanism that registers handlers to the ewoks
> event logger for the root logger as well (e.g. provide a flag which says this handler is only for ewoks events or for all
> python logging).
Follow on from #12, it would be nice if we could reuse the `execinfo` mechanism to inject a logging handler in to all tasks with at least job_id populated as extra metadata. This allows you to track processing log messages in a single location originating from multiple beamlines and being executed over multiple machines.
This is how things are currently set up with zocalo, graylog handler is injected into each task of the graph. Maybe my use case is weird and the rest of you have other ways to handle this @svensson @payno ?