Allow to define a Task from a function using introspection and typing?
Following yesterday training, I was curious about it and so I gave it a try. It's probably limited in some area and I didn't do it all, but here is a POC:
Sample code and demo
from collections.abc import Callable, Iterable
import inspect
from typing import Optional, TypedDict
from ewoks import execute_graph
from ewokscore import Task
def astask(
func: Callable,
name: Optional[str]=None,
output_names: Optional[Iterable[str]]=None,
) -> Task:
"""Convert a function to an ewoks Task.
It levergages introspection and type annotation to set input and outputs.
It can also be used as a decorator.
Warning: In this case, it becomes a class and can no longer be used as a function...
:param func: Function to wrap in a Task
:param name: Name of the returned Task subclass
:param output_names: List of output_names to extract from function returned value
:return: A Task subclass wrapping the provided function
"""
input_names = []
optional_input_names = []
n_required_positional_inputs = 0
signature = inspect.signature(func)
# Get input_names and optional_input_names from signature
for param in signature.parameters.values():
if param.kind == param.VAR_POSITIONAL:
raise TypeError("Variable positional (e.g., *args) arguments are not supported")
if param.kind == param.VAR_KEYWORD:
raise TypeError("Variable keyword (e.g., **kwargs) arguments are not supported")
if param.kind == param.POSITIONAL_ONLY:
raise NotImplementedError("Positional only argument are not supported")
if param.kind not in (param.POSITIONAL_OR_KEYWORD, param.KEYWORD_ONLY):
raise RuntimeError(f"Unsupported parameter kind: {param.kind}")
is_optional = param.default is not param.empty
if is_optional:
optional_input_names.append(param.name)
else:
input_names.append(param.name)
if output_names is not None: # Use provided output_names
unpack_output = True
output_names = tuple(output_names)
else:
# Return result as return_value if return type annotation cannot be used
unpack_output = False
output_names = ('return_value',)
if signature.return_annotation is not signature.empty:
annotation = getattr(signature.return_annotation, '__annotations__', None)
if isinstance(annotation, dict):
# Unpack returned value with known type
# TODO allow users to force returning as return_value in this case
output_names = tuple(annotation.keys())
unpack_output = True
class FuncTaskMixIn:
_TASK_FUNC_UNPACK_OUTPUT = unpack_output
def run(self):
result = func(**self.input_values)
if not self._TASK_FUNC_UNPACK_OUTPUT:
self.outputs.return_value = result
return
for name in self.output_names():
if name in result:
self.outputs[name] = result[name]
return type(
func.__name__ if name is None else name,
(FuncTaskMixIn, Task),
{},
input_names=input_names,
optional_input_names=optional_input_names,
output_names=output_names,
n_required_positional_inputs=n_required_positional_inputs,
)
# Examples
# From function
def add(a, b=0):
return a+b
Add = astask(add, name='Add')
def sub(a, b=0):
return a-b
Sub = astask(sub, name='Sub')
# With return type
class Result(TypedDict):
quotient: int
remainder: int
def integer_division(a, b=1) -> Result:
return {'quotient': a // b, 'remainder': a % b}
IntDiv = astask(integer_division, name='IntDiv')
# As decorator
@astask
def Mult(a: int, b: int) -> int:
return a*b
# Workflow: ((add_a+add_b) - (div_a // div_b)) * (div_a % div_b)
nodes = [
{
"id": "add",
"task_type": "class",
"task_identifier": "__main__.Add",
},
{
"id": "sub",
"task_type": "class",
"task_identifier": "__main__.Sub",
},
{
"id": "div",
"task_type": "class",
"task_identifier": "__main__.IntDiv",
},
{
"id": "mult",
"task_type": "class",
"task_identifier": "__main__.Mult",
},
]
links = [
{
"source": "add",
"target": "sub",
"data_mapping": [{"source_output": "return_value", "target_input": "a"}],
},
{
"source": "div",
"target": "sub",
"data_mapping": [{"source_output": "quotient", "target_input": "b"}],
},
{
"source": "div",
"target": "mult",
"data_mapping": [{"source_output": "remainder", "target_input": "a"}],
},
{
"source": "sub",
"target": "mult",
"data_mapping": [{"source_output": "return_value", "target_input": "b"}],
},
]
workflow = {"graph": {"id": "testworkflow"}, "nodes": nodes, "links": links}
# Define task inputs
add_a, add_b, div_a, div_b = 10, 5, 11, 5
inputs = [
{"id": "add", "name": "a", "value": add_a},
{"id": "add", "name": "b", "value": add_b},
{"id": "div", "name": "a", "value": div_a},
{"id": "div", "name": "b", "value": div_b},
]
result = execute_graph(workflow, inputs=inputs)
print(result)
print('expected:', ((add_a+add_b) - (div_a // div_b)) * (div_a % div_b))
One thing that can be improved is that, when used as a decorator, it is no longer a function. Maybe with integration in the loading the task, the decorator can be:
Sample code
def decorator(func, output_names: Optional[Iterable[str]]=None):
func._ewoks_task = astask(func, output_names=output_names)
return func
@decorator
def noop(a):
return a
or:
Sample code
FUNC_MAPPING = {}
def decorator(func, output_names: Optional[Iterable[str]]=None):
FUNC_MAPPING[func.__name__] = astask(func, output_names=output_names)
return func
@decorator
def noop(a):
return a
and then the function is both a proper function and can be used as a Task.
If you think that's of interest, I can rework it to a proper PR.
Edited by Thomas Vincent