diff --git a/ewoksjob/apps/ewoks.py b/ewoksjob/apps/ewoks.py index a9385a308eedab300a6a74d71c2252f07ef6e51a..97fdb029e52b20ec6bdc3babb8370f0d3963257a 100644 --- a/ewoksjob/apps/ewoks.py +++ b/ewoksjob/apps/ewoks.py @@ -1,15 +1,22 @@ +from typing import Dict, List import celery import ewoks -from .config import configure_app +from ewokscore import task_discovery +from ..config import configure_app app = celery.Celery("ewoks") configure_app(app) @app.task(bind=True) -def execute_graph(self, *args, execinfo=None, **kwargs): +def execute_workflow(self, *args, execinfo=None, **kwargs) -> Dict: if execinfo is None: execinfo = dict() if "job_id" not in execinfo: execinfo["job_id"] = self.request.id return ewoks.execute_graph(*args, execinfo=execinfo, **kwargs) + + +@app.task() +def discover_tasks_from_modules(*args, **kwargs) -> List[dict]: + return list(task_discovery.discover_tasks_from_modules(*args, **kwargs)) diff --git a/ewoksjob/client.py b/ewoksjob/client.py deleted file mode 100644 index c5eb10ea5e7b6de2b65d41eb1bf4fe005ac9de05..0000000000000000000000000000000000000000 --- a/ewoksjob/client.py +++ /dev/null @@ -1,65 +0,0 @@ -from typing import Optional, Tuple -from concurrent.futures import Future - -from celery.execute import send_task -from celery.result import AsyncResult - -try: - from . import server -except ImportError as e: - server = None - server_import_error = e - -_EWOKS_TASK = "ewoksjob.apps.ewoks.execute_graph" - - -def submit(**kwargs) -> Tuple[AsyncResult, int]: - return send_task(_EWOKS_TASK, **kwargs) - - -def submit_local(**kwargs) -> Tuple[Future, int]: - """'Local' means that the worker pool runs in the same - process as the client. - """ - if server is None: - raise ImportError(server_import_error) - pool = server.active_workflow_worker_pool() - if pool is None: - raise RuntimeError("No worker pool is available") - return pool.submit(**kwargs) - - -def get_future(job_id) -> Optional[AsyncResult]: - return AsyncResult(job_id) - - -def get_local_future(job_id) -> Optional[Future]: - if server is None: - raise ImportError(server_import_error) - pool = server.active_workflow_worker_pool() - return pool.get_future(job_id) - - -def cancel(job_id): - future = get_future(job_id) - if future is not None: - future.revoke() - - -def cancel_local(job_id): - future = get_local_future(job_id) - if future is not None: - future.cancel() - - -def get_result(job_id, **kwargs): - kwargs.setdefault("interval", 0.1) - future = AsyncResult(job_id) - if future is not None: - return future.get(**kwargs) - - -def get_local_result(job_id, **kwargs): - future = get_local_future(job_id) - if future is not None: - return future.result(**kwargs) diff --git a/ewoksjob/client/__init__.py b/ewoksjob/client/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..6cb4205376a0ed8560aa29c9c3812f380b193a25 --- /dev/null +++ b/ewoksjob/client/__init__.py @@ -0,0 +1 @@ +from .celery import * # noqa F403 diff --git a/ewoksjob/client/celery/__init__.py b/ewoksjob/client/celery/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..76e5b1c91ef6381df6cd2dfa156b5375430d05b0 --- /dev/null +++ b/ewoksjob/client/celery/__init__.py @@ -0,0 +1,6 @@ +"""Remote worker pool managed by Celery +""" +from .tasks import * # noqa F403 +from .utils import * # noqa F403 +from .tasks import trigger_workflow as submit # noqa F401 +from .tasks import trigger_test_workflow as submit_test # noqa F401 diff --git a/ewoksjob/client/celery/tasks.py b/ewoksjob/client/celery/tasks.py new file mode 100644 index 0000000000000000000000000000000000000000..fabe43504add9dd618b8c2cd5eae6de8af17af5c --- /dev/null +++ b/ewoksjob/client/celery/tasks.py @@ -0,0 +1,20 @@ +from celery.execute import send_task +from celery.result import AsyncResult +from ..test_workflow import test_workflow + +__all__ = ["trigger_workflow", "trigger_test_workflow", "discover_tasks_from_modules"] + + +def trigger_workflow(**kwargs) -> AsyncResult: + return send_task("ewoksjob.apps.ewoks.execute_workflow", **kwargs) + + +def trigger_test_workflow(seconds=0) -> AsyncResult: + return trigger_workflow( + args=(test_workflow(),), + kwargs={"inputs": [{"id": "sleepnode", "name": 0, "value": seconds}]}, + ) + + +def discover_tasks_from_modules(**kwargs) -> AsyncResult: + return send_task("ewoksjob.apps.ewoks.discover_tasks_from_modules", **kwargs) diff --git a/ewoksjob/client/celery/utils.py b/ewoksjob/client/celery/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..b27e86a8ee14cc01122781219601c39d80320367 --- /dev/null +++ b/ewoksjob/client/celery/utils.py @@ -0,0 +1,64 @@ +import logging +from functools import wraps +from typing import Optional +from celery import current_app +from celery.result import AsyncResult +from ...config import configure_app + +__all__ = ["get_future", "cancel", "get_result", "get_running"] + + +logger = logging.getLogger(__name__) + + +def requires_config(method): + @wraps(method) + def wrapper(*args, **kwargs): + if not current_app.conf.broker_url: + configure_app(current_app) + return method(*args, **kwargs) + + return wrapper + + +@requires_config +def get_future(task_id) -> Optional[AsyncResult]: + return AsyncResult(task_id) + + +@requires_config +def cancel(task_id): + future = get_future(task_id) + if future is not None: + future.revoke() + + +@requires_config +def get_result(task_id, **kwargs): + kwargs.setdefault("interval", 0.1) + future = AsyncResult(task_id) + if future is not None: + return future.get(**kwargs) + + +@requires_config +def get_running(): + inspect = current_app.control.inspect() + task_ids = list() + + workers = inspect.active() + if workers is None: + logger.warning("No Celery workers were detected") + workers = dict() + for tasks in workers.values(): + for task in tasks: + task_ids.append(task["id"]) + + workers = inspect.scheduled() + if workers is None: + workers = dict() + for tasks in workers.values(): + for task in tasks: + task_ids.append(task["id"]) + + return task_ids diff --git a/ewoksjob/client/process/__init__.py b/ewoksjob/client/process/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..3e9eed0408412e5a19c97fc9a2977c27d44450e8 --- /dev/null +++ b/ewoksjob/client/process/__init__.py @@ -0,0 +1,7 @@ +"""Client side process pool +""" +from .tasks import * # noqa F403 +from .utils import * # noqa F403 +from .pool import * # noqa F403 +from .tasks import trigger_workflow as submit # noqa F401 +from .tasks import trigger_test_workflow as submit_test # noqa F401 diff --git a/ewoksjob/client/process/pool.py b/ewoksjob/client/process/pool.py new file mode 100644 index 0000000000000000000000000000000000000000..8b3bd298259b3638b0ef2084d985f0220d66061f --- /dev/null +++ b/ewoksjob/client/process/pool.py @@ -0,0 +1,84 @@ +import sys +from typing import Mapping, Optional, Tuple +from uuid import uuid4 +from contextlib import contextmanager +import multiprocessing +import weakref +from concurrent.futures import ProcessPoolExecutor, Future + +__all__ = ["get_active_pool", "pool_context"] + + +_EWOKS_WORKER_POOL = None + + +def get_active_pool(raise_on_missing: Optional[bool] = True): + if raise_on_missing and _EWOKS_WORKER_POOL is None: + raise RuntimeError("No worker pool is available") + return _EWOKS_WORKER_POOL + + +@contextmanager +def pool_context(max_workers=1, context="spawn", **kwargs): + global _EWOKS_WORKER_POOL + if _EWOKS_WORKER_POOL is None: + if sys.version_info < (3, 7): + ctx = None + else: + ctx = multiprocessing.get_context(context) + with _LocalPool(max_workers=max_workers, mp_context=ctx) as pool: + _EWOKS_WORKER_POOL = pool + try: + yield pool + finally: + _EWOKS_WORKER_POOL = None + else: + yield _EWOKS_WORKER_POOL + + +class _LocalPool(ProcessPoolExecutor): + def __init__(self, *args, **kwargs) -> None: + if sys.version_info < (3, 7): + kwargs.pop("mp_context", None) + self._jobs = weakref.WeakValueDictionary() + super().__init__(*args, **kwargs) + + def submit( + self, + func, + task_id=None, + args: Optional[Tuple] = tuple(), + kwargs: Optional[Mapping] = None, + ) -> Future: + """Like celery.send_task""" + if kwargs is None: + kwargs = dict() + future = super().submit(func, *args, **kwargs) + self._patch_future(future, task_id) + self._jobs[future.task_id] = future + return future + + def check_task_id(self, task_id=None): + if task_id is None: + task_id = str(uuid4()) + while task_id in self._jobs: + task_id = str(uuid4()) + return task_id + if task_id in self._jobs: + raise RuntimeError(f"Job '{task_id}' already exists") + return task_id + + def get_future(self, task_id) -> Future: + future = self._jobs.get(task_id) + if future is None: + future = Future() + self._patch_future(future, task_id) + return future + + def get_running(self): + return list(self._jobs) + + def _patch_future(self, future: Future, task_id): + future.task_id = self.check_task_id(task_id) + # Warning: this causes the future to never be garbage collected + # future.get = future.result diff --git a/ewoksjob/client/process/tasks.py b/ewoksjob/client/process/tasks.py new file mode 100644 index 0000000000000000000000000000000000000000..5a4707c429fec3d71694c5223983868a883e842e --- /dev/null +++ b/ewoksjob/client/process/tasks.py @@ -0,0 +1,60 @@ +from functools import wraps +from typing import Mapping, Optional, Tuple +from concurrent.futures import Future + +from .pool import get_active_pool +from ..test_workflow import test_workflow + +try: + from ewoks import execute_graph + from ewokscore import task_discovery +except ImportError as e: + execute_graph = None + ewoks_import_error = e + + +__all__ = ["trigger_workflow", "trigger_test_workflow", "discover_tasks_from_modules"] + + +def _requires_ewoks(method): + @wraps(method) + def wrapper(*args, **kwargs): + if execute_graph is None: + raise ImportError(ewoks_import_error) + return method(*args, **kwargs) + + return wrapper + + +@_requires_ewoks +def trigger_workflow( + args: Optional[Tuple] = tuple(), kwargs: Optional[Mapping] = None +) -> Future: + pool = get_active_pool() + if kwargs is None: + kwargs = dict() + execinfo = kwargs.setdefault("execinfo", dict()) + task_id = pool.check_task_id(execinfo.get("job_id")) + execinfo["job_id"] = task_id + return pool.submit(execute_graph, task_id=task_id, args=args, kwargs=kwargs) + + +def trigger_test_workflow(seconds=0) -> Future: + return trigger_workflow( + args=(test_workflow(),), + kwargs={"inputs": [{"id": "sleepnode", "name": 0, "value": seconds}]}, + ) + + +@_requires_ewoks +def discover_tasks_from_modules( + args: Optional[Tuple] = tuple(), kwargs: Optional[Mapping] = None +) -> Future: + pool = get_active_pool() + if kwargs is None: + kwargs = dict() + return pool.submit(_discover_tasks_from_modules, args=args, kwargs=kwargs) + + +def _discover_tasks_from_modules(*args, **kwargs): + return list(task_discovery.discover_tasks_from_modules(*args, **kwargs)) diff --git a/ewoksjob/client/process/utils.py b/ewoksjob/client/process/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..3bd2c8520f39dc4bb0048aaf4086e8ea5a8bbb2e --- /dev/null +++ b/ewoksjob/client/process/utils.py @@ -0,0 +1,28 @@ +from typing import Optional +from concurrent.futures import Future +from .pool import get_active_pool + + +__all__ = ["get_future", "cancel", "get_result", "get_running"] + + +def get_future(task_id) -> Optional[Future]: + pool = get_active_pool() + return pool.get_future(task_id) + + +def cancel(task_id): + future = get_future(task_id) + if future is not None: + future.cancel() + + +def get_result(task_id, **kwargs): + future = get_future(task_id) + if future is not None: + return future.result(**kwargs) + + +def get_running(): + pool = get_active_pool() + return pool.get_running() diff --git a/ewoksjob/client/test_workflow.py b/ewoksjob/client/test_workflow.py new file mode 100644 index 0000000000000000000000000000000000000000..1ba8f99acbd4f2d98f249550583ff10d594b2768 --- /dev/null +++ b/ewoksjob/client/test_workflow.py @@ -0,0 +1,12 @@ +def test_workflow(): + return { + "graph": {"id": "sleepgraph", "version": "1.0"}, + "nodes": [ + { + "id": "sleepnode", + "task_type": "method", + "task_identifier": "time.sleep", + "default_inputs": [{"name": 0, "value": 0}], + } + ], + } diff --git a/ewoksjob/apps/config.py b/ewoksjob/config.py similarity index 100% rename from ewoksjob/apps/config.py rename to ewoksjob/config.py diff --git a/ewoksjob/server.py b/ewoksjob/server.py deleted file mode 100644 index f10ffa7c3a6a85c3c672c89db92a58db60898949..0000000000000000000000000000000000000000 --- a/ewoksjob/server.py +++ /dev/null @@ -1,57 +0,0 @@ -import sys -from typing import Optional, Tuple -from uuid import uuid4 -from contextlib import contextmanager -import multiprocessing -import weakref -from concurrent.futures import ProcessPoolExecutor, Future -from ewoks import execute_graph - -_EWOKS_WORKER_POOL = None - - -def active_workflow_worker_pool(): - return _EWOKS_WORKER_POOL - - -class WorkflowWorkerPool(ProcessPoolExecutor): - def __init__(self, *args, **kwargs) -> None: - if sys.version_info < (3, 7): - kwargs.pop("mp_context", None) - self._jobs = weakref.WeakValueDictionary() - super().__init__(*args, **kwargs) - - def submit(self, args=tuple(), kwargs=None) -> Tuple[Future, int]: - if kwargs is None: - kwargs = dict() - execinfo = kwargs.setdefault("execinfo", dict()) - task_id = execinfo.get("job_id") - if "job_id" not in execinfo: - execinfo["job_id"] = task_id = str(uuid4()) - if task_id in self._jobs: - raise RuntimeError(f"Job '{task_id}' already exists") - future = super().submit(execute_graph, *args, **kwargs) - future.task_id = task_id - self._jobs[task_id] = future - return future - - def get_future(self, task_id) -> Optional[Future]: - return self._jobs.get(task_id) - - -@contextmanager -def workflow_worker_pool(max_workers=1, context="spawn"): - global _EWOKS_WORKER_POOL - if _EWOKS_WORKER_POOL is None: - if sys.version_info < (3, 7): - ctx = None - else: - ctx = multiprocessing.get_context(context) - with WorkflowWorkerPool(max_workers=max_workers, mp_context=ctx) as pool: - _EWOKS_WORKER_POOL = pool - try: - yield pool - finally: - _EWOKS_WORKER_POOL = None - else: - yield _EWOKS_WORKER_POOL diff --git a/ewoksjob/tests/test_cancel.py b/ewoksjob/tests/test_cancel.py new file mode 100644 index 0000000000000000000000000000000000000000..de9573978c8f78309a2dc45f891b874b0e9fb134 --- /dev/null +++ b/ewoksjob/tests/test_cancel.py @@ -0,0 +1,72 @@ +import time +import logging +import pytest +from concurrent.futures import CancelledError +from celery.exceptions import TaskRevokedError +from ewokscore import events +from ..client import celery +from ..client import process + +logger = logging.getLogger(__name__) + + +def test_cancel(celery_session_worker): + try: + assert_normal(celery) + finally: + events.cleanup() + try: + assert_cancel(celery) + finally: + events.cleanup() + + +def test_cancel_local(): + with process.pool_context(): + assert_normal(process) + assert_cancel(process) + + +def assert_normal(mod): + future = mod.submit_test(1) + if mod is process: + wait_running(mod, {future.task_id}) + else: + logger.warning("memory and sqlite does not allow task monitoring") + results = mod.get_result(future.task_id) + assert results == {"return_value": None} + if mod is process: + del future + wait_running(mod, set()) + + +def assert_cancel(mod): + future = mod.submit_test(1) + mod.cancel(future.task_id) + if mod is process: + with pytest.raises(CancelledError): + future.result(timeout=2) + del future + else: + try: + future.get(timeout=2) + except TaskRevokedError: + pass + except Exception as e: + if future.status == "SUCCESS": + pytest.xfail("the task sometimes finishes") + raise AssertionError(f"Task {future.task_id} {future.status}") from e + + wait_running(mod, set()) + + +def wait_running(mod, expected, timeout=3): + t0 = time.time() + while True: + task_ids = set(mod.get_running()) + if task_ids == expected: + return + dt = time.time() - t0 + if dt > timeout: + assert task_ids == expected + time.sleep(0.2) diff --git a/ewoksjob/tests/test_events.py b/ewoksjob/tests/test_events.py index 7640512be5abe0a8bbe23ba4625abeb0aead6530..ba563677e2c5eddc1048131f9afdc876c91deabe 100644 --- a/ewoksjob/tests/test_events.py +++ b/ewoksjob/tests/test_events.py @@ -59,7 +59,7 @@ def assert_event_reader(handlers, reader): events.send_workflow_event(execinfo=execinfo, event="start") events.send_workflow_event(execinfo=execinfo, event="end") - evts = list(reader.wait_events(timeout=0)) + evts = list(reader.wait_events(timeout=1)) assert len(evts) == 2 evts = list(reader.get_events(type="end")) diff --git a/ewoksjob/tests/test_future.py b/ewoksjob/tests/test_future.py new file mode 100644 index 0000000000000000000000000000000000000000..de483657475f29c8364bdf2ed752a93c3c9ac60d --- /dev/null +++ b/ewoksjob/tests/test_future.py @@ -0,0 +1,21 @@ +import pytest +from celery.exceptions import TimeoutError as CeleryTimeoutError +from concurrent.futures import TimeoutError as ProcessTimeoutError +from ..client import celery +from ..client import process +from .utils import get_result + + +def test_task_discovery(celery_session_worker): + future = celery.get_future("abc") + assert future.status == "PENDING" + with pytest.raises(CeleryTimeoutError): + future.get(timeout=1e-8) + + +def test_task_discovery_local(): + with process.pool_context(): + future = process.get_future("abc") + assert not future.running() + with pytest.raises(ProcessTimeoutError): + get_result(future, timeout=0) diff --git a/ewoksjob/tests/test_job.py b/ewoksjob/tests/test_job.py deleted file mode 100644 index 0a23906f95f063a6c0245759c4052a5bbae73adc..0000000000000000000000000000000000000000 --- a/ewoksjob/tests/test_job.py +++ /dev/null @@ -1,27 +0,0 @@ -from ewokscore.tests.examples.graphs import get_graph -from ..server import workflow_worker_pool -from ..client import submit, get_future -from ..client import submit_local, get_local_future - - -def test_submit(celery_session_worker): - graph, expected = get_graph("acyclic1") - expected = expected["task6"] - future1 = submit(args=(graph,)) - future2 = get_future(future1.task_id) - results = future1.get(timeout=3) - assert results == expected - results = future2.get(timeout=0) - assert results == expected - - -def test_submit_local(): - with workflow_worker_pool(): - graph, expected = get_graph("acyclic1") - expected = expected["task6"] - future1 = submit_local(args=(graph,)) - future2 = get_local_future(future1.task_id) - results = future1.result(timeout=3) - assert results == expected - results = future2.result(timeout=0) - assert results == expected diff --git a/ewoksjob/tests/test_submit.py b/ewoksjob/tests/test_submit.py new file mode 100644 index 0000000000000000000000000000000000000000..8a37b2ed3b187cfb4226b078c1e041afe54a3935 --- /dev/null +++ b/ewoksjob/tests/test_submit.py @@ -0,0 +1,35 @@ +from ewokscore.tests.examples.graphs import get_graph +from ..client import celery +from ..client import process +from .utils import get_result + + +def test_submit(celery_session_worker): + assert_submit(celery) + assert_submit_test(celery) + + +def test_submit_local(): + with process.pool_context(): + assert_submit(process) + assert_submit_test(process) + + +def assert_submit(mod): + graph, expected = get_graph("acyclic1") + expected = expected["task6"] + future1 = mod.submit(args=(graph,)) + future2 = mod.get_future(future1.task_id) + results = get_result(future1, timeout=3) + assert results == expected + results = get_result(future2, timeout=0) + assert results == expected + + +def assert_submit_test(mod): + future1 = mod.submit_test() + future2 = mod.get_future(future1.task_id) + results = get_result(future1, timeout=3) + assert results == {"return_value": None} + results = get_result(future2, timeout=0) + assert results == {"return_value": None} diff --git a/ewoksjob/tests/test_task_discovery.py b/ewoksjob/tests/test_task_discovery.py new file mode 100644 index 0000000000000000000000000000000000000000..994bb0efe0b51d45bbb9750b2f2f29d461469451 --- /dev/null +++ b/ewoksjob/tests/test_task_discovery.py @@ -0,0 +1,21 @@ +from ..client import celery +from ..client import process +from .utils import get_result + + +def test_submit(celery_session_worker): + assert_submit(celery) + + +def test_submit_local(): + with process.pool_context(): + assert_submit(process) + + +def assert_submit(mod): + future1 = mod.discover_tasks_from_modules(args=("ewokscore",)) + future2 = mod.get_future(future1.task_id) + results = get_result(future1, timeout=3) + assert results + results = get_result(future2, timeout=0) + assert results diff --git a/ewoksjob/tests/utils.py b/ewoksjob/tests/utils.py index 62e6354962ee43db711f7938659767773cb61278..1ccc96c6f2c10f32ad53a419bc8c5f35ca1d6e1f 100644 --- a/ewoksjob/tests/utils.py +++ b/ewoksjob/tests/utils.py @@ -4,3 +4,10 @@ import os def has_redis_server(): with os.popen("redis-server --version") as output: return bool(output.read()) + + +def get_result(future, **kwargs): + if hasattr(future, "get"): + return future.get(**kwargs) + else: + return future.result(**kwargs) diff --git a/examples/job.py b/examples/job.py index b9db27a220ba4ffdadaa45129228dac5bbc9618c..a5f05e67f43b2a585615ea7b1c6ba2751d6d897b 100644 --- a/examples/job.py +++ b/examples/job.py @@ -3,11 +3,12 @@ intermediate results (ewoks events) or final result (job return value) """ import os +import sys import argparse from typing import Optional from ewoksjob.client import submit -from ewoksjob.client import submit_local -from ewoksjob.server import workflow_worker_pool +from ewoksjob.client.process import submit as submit_local +from ewoksjob.client.process import pool_context from ewoksjob.events.readers import instantiate_reader # Results directory @@ -18,10 +19,6 @@ DATA_DIR = os.path.join(SCRIPT_DIR, "results") def ewoks_event(celery: Optional[bool] = None): # Events during execution if celery: - # Load configuration ("celeryconfig" is the default) - # from celery import current_app - # current_app.config_from_object("celeryconfig") - # Redis backend events_url = "redis://localhost:10003/2" handlers = [ @@ -46,7 +43,7 @@ def ewoks_event(celery: Optional[bool] = None): def test_workflow(): - return { + workflow = { "graph": {"id": "mygraph", "version": "1.0"}, "nodes": [ {"id": "task1", "task_type": "method", "task_identifier": "numpy.add"}, @@ -60,17 +57,18 @@ def test_workflow(): } ], } - - -def job_argument(): - reader, execinfo = ewoks_event() - varinfo = {"root_uri": DATA_DIR, "scheme": "nexus"} inputs = [ {"id": "task1", "name": 0, "value": 1}, {"id": "task1", "name": 1, "value": 2}, {"id": "task2", "name": 1, "value": 3}, ] - workflow = test_workflow() + varinfo = {"root_uri": DATA_DIR, "scheme": "nexus"} + return workflow, inputs, varinfo + + +def job_argument(): + reader, execinfo = ewoks_event() + workflow, inputs, varinfo = test_workflow() args = (workflow,) kwargs = { "binding": None, @@ -116,10 +114,11 @@ if __name__ == "__main__": reader, args, kwargs = job_argument() if options.celery: + sys.path.append(SCRIPT_DIR) # so the celery configuration can be loaded future = submit(args=args, kwargs=kwargs) workflow_results = future.get(timeout=3, interval=0.1) else: - with workflow_worker_pool(): + with pool_context(): future = submit_local(args=args, kwargs=kwargs) workflow_results = future.result(timeout=3) diff --git a/examples/monitor.sh b/examples/monitor.sh new file mode 100755 index 0000000000000000000000000000000000000000..f7962e46e13714a37a11026a8868915e86b0a3ee --- /dev/null +++ b/examples/monitor.sh @@ -0,0 +1,8 @@ +#!/bin/bash +# +# Start a Celery worker pool (processes by default) that serves the ewoks application. +# Not all configurations can be provided through the CLI (e.g. `result_serializer`) +# + +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +( cd $SCRIPT_DIR; celery flower ) \ No newline at end of file