Commit 4d01bd57 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

Merge branch '16-upload-to-icat' into 'main'

Resolve "Upload to ICAT"

Closes #16

See merge request !58
parents 41f51ea0 578c5038
Pipeline #84455 passed with stages
in 3 minutes and 42 seconds
......@@ -52,6 +52,13 @@ def convert_and_execute_workflow(self, *args, **kwargs) -> Dict:
return tasks.convert_and_execute_graph(*args, **kwargs)
@app.task(bind=True)
@_add_job_id
@_add_working_directory
def execute_and_upload_workflow(self, *args, **kwargs) -> Dict:
return tasks.execute_and_upload_graph(*args, **kwargs)
@app.task()
@_add_working_directory
def discover_tasks_from_modules(*args, **kwargs) -> List[dict]:
......
......@@ -9,6 +9,7 @@ __all__ = [
"convert_workflow",
"convert_and_trigger_workflow",
"convert_and_trigger_test_workflow",
"trigger_and_upload_workflow",
"discover_tasks_from_modules",
]
......@@ -39,6 +40,11 @@ def convert_and_trigger_workflow(**kwargs) -> AsyncResult:
return send_task("ewoksjob.apps.ewoks.convert_and_execute_workflow", **kwargs)
@requires_config
def trigger_and_upload_workflow(**kwargs) -> AsyncResult:
return send_task("ewoksjob.apps.ewoks.execute_and_upload_workflow", **kwargs)
def convert_and_trigger_test_workflow(seconds=0, args=None, **kwargs) -> AsyncResult:
if len(args) != 1:
raise TypeError(
......
......@@ -20,6 +20,7 @@ __all__ = [
"convert_workflow",
"convert_and_trigger_workflow",
"convert_and_trigger_test_workflow",
"trigger_and_upload_workflow",
"discover_tasks_from_modules",
]
......@@ -58,6 +59,13 @@ def convert_and_trigger_workflow(
return _submit_with_jobid(tasks.convert_and_execute_graph, args=args, kwargs=kwargs)
@_requires_ewoks
def trigger_and_upload_workflow(
args: Optional[Tuple] = tuple(), kwargs: Optional[Mapping] = None
) -> Future:
return _submit_with_jobid(tasks.execute_and_upload_graph, args=args, kwargs=kwargs)
def trigger_test_workflow(seconds=0, kwargs: Optional[Mapping] = None) -> Future:
args = (test_workflow(),)
if kwargs is None:
......
......@@ -2,3 +2,4 @@
"""
from .convert_and_execute import * # noqa F403
from .execute_and_upload import * # noqa F403
......@@ -19,6 +19,4 @@ def convert_and_execute_graph(
load_options=load_options,
save_options=save_options,
)
if workflow is None: # TODO: deprecated, never returns None
workflow = destination
return ewoks.execute_graph(workflow, **kwargs)
import ewoks
try:
from pyicat_plus.client.main import IcatClient
except ImportError:
IcatClient = None
from typing import Dict, List, Optional
__all__ = ["execute_and_upload_graph"]
def execute_and_upload_graph(
source,
inputs: Optional[List[dict]] = None,
load_options: Optional[List[dict]] = None,
upload_kwargs: Optional[Dict] = None,
**kwargs
) -> Dict:
workflow = ewoks.convert_graph(
source,
None,
inputs=inputs,
load_options=load_options,
save_options={"representation": "json_string"},
)
result = ewoks.execute_graph(workflow, **kwargs)
if IcatClient is None:
raise RuntimeError("requires pyicat-plus")
if upload_kwargs is None:
upload_kwargs = dict()
metadata_urls = upload_kwargs.pop(
"metadata_urls", ["bcu-mq-01.esrf.fr:61613", "bcu-mq-02.esrf.fr:61613"]
)
client = IcatClient(metadata_urls=metadata_urls)
client.store_processed_data(**upload_kwargs)
return result
import time
import pytest
from ewokscore.tests.examples.graphs import get_graph
from ..client import celery
from ..client import process
from .utils import get_result
def test_submit(ewoks_worker):
assert_submit(celery)
def test_submit_local(local_ewoks_worker):
assert_submit(process)
def assert_submit(mod):
graph, expected = get_graph("acyclic1")
expected = expected["task6"]
kwargs = {
"upload_kwargs": {
"metadata_urls": list(),
"beamline": "id00",
"proposal": f"id00{time.strftime('%y%m')}",
"dataset": "testdataset",
"path": "/path/to/processed/dataset",
"sample": "testsample",
"raw": "/path/to/raw/dataset",
}
}
future1 = mod.trigger_and_upload_workflow(args=(graph,), kwargs=kwargs)
with pytest.raises(RuntimeError, match="requires pyicat-plus"):
get_result(future1, timeout=3)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment