From 93e355505466bfe5c110a8aa7aab15c30826e4f9 Mon Sep 17 00:00:00 2001 From: woutdenolf Date: Sun, 26 Mar 2023 12:31:28 +0200 Subject: [PATCH] add solo pool (no concurrency) --- doc/concurrency.rst | 7 +++- src/pypushflow/concurrent/factory.py | 2 + src/pypushflow/concurrent/solo.py | 40 +++++++++++++++++++ .../tests/concurrent/test_interrupt.py | 4 +- src/pypushflow/tests/test_workflow5.py | 1 + 5 files changed, 50 insertions(+), 4 deletions(-) create mode 100644 src/pypushflow/concurrent/solo.py diff --git a/doc/concurrency.rst b/doc/concurrency.rst index 8afc015..a43374a 100644 --- a/doc/concurrency.rst +++ b/doc/concurrency.rst @@ -43,6 +43,7 @@ Pypushflow supports these pools for concurrent execution of workflow tasks * multiprocessing: pool of daemonic processes from the `multiprocessing` module * ndmultiprocessing: pool of non-daemonic processes from the `multiprocessing` module * billiard: pool of daemonic processes from the `billiard` library +* solo: does not provide concurrency and executes tasks in the current process The pool type `process` and `ndprocess` are the same, but `ndprocess` uses explicit non-daemonic processes like `ndmultiprocessing`. @@ -60,7 +61,7 @@ Workflow tasks * bpool: process pool from the `billiard` library * bprocess: process from the `billiard` library -Pool-task compatibility +Pool-Task compatibility ----------------------- Workflow tasks that can be used in each pool type (no gevent monkey patching) @@ -72,6 +73,7 @@ Workflow tasks that can be used in each pool type (no gevent monkey patching) * multiprocessing: simple, subprocess, bpool (hangs sometimes), bprocess * ndmultiprocessing: simple, subprocess, cfpool, mppool, mpprocess, bpool (hangs sometimes), bprocess * billiard: simple, subprocess, bpool (hangs sometimes), bprocess +* solo: simple, subprocess, cfpool, mppool, mpprocess, bpool, bprocess Workflow tasks that can be used in each pool type (with gevent monkey patching) @@ -81,4 +83,5 @@ Workflow tasks that can be used in each pool type (with gevent monkey patching) * ndprocess: simple, subprocess, cfpool (not spawn), mpprocess (not spawn), bprocess (not spawn) * multiprocessing: - * ndmultiprocessing: - -* billiard: - \ No newline at end of file +* billiard: - +* solo: simple, subprocess, cfpool, mppool, mpprocess, bpool, bprocess \ No newline at end of file diff --git a/src/pypushflow/concurrent/factory.py b/src/pypushflow/concurrent/factory.py index ea7ff72..9574738 100644 --- a/src/pypushflow/concurrent/factory.py +++ b/src/pypushflow/concurrent/factory.py @@ -2,6 +2,7 @@ import sys import logging from typing import Optional +from .solo import SoloPool from .thread import ThreadPool from .process import ProcessPool @@ -38,6 +39,7 @@ from .scaling import ScalingPool logger = logging.getLogger(__name__) _POOLS = { + "solo": SoloPool, "thread": ThreadPool, "process": ProcessPool, "ndprocess": NdProcessPool, diff --git a/src/pypushflow/concurrent/solo.py b/src/pypushflow/concurrent/solo.py new file mode 100644 index 0000000..00f9188 --- /dev/null +++ b/src/pypushflow/concurrent/solo.py @@ -0,0 +1,40 @@ +from numbers import Number +from typing import Callable, Optional +from . import base + + +class SoloPool(base.BasePool): + def __init__(self, **kw) -> None: + self._closed = False + super().__init__(**kw) + + def close(self): + self._closed = True + + def join(self, timeout: Optional[Number] = None) -> bool: + return True + + def interrupt(self) -> None: + pass + + def apply_async( + self, + fn: Callable, + callback: Optional[Callable] = None, + error_callback: Optional[Callable] = None, + args=tuple(), + kwargs=None, + ) -> None: + """Since a solo pool does not provide concurrency, this is a synchronous call.""" + if self._closed: + raise RuntimeError("the pool is closed") + if kwargs is None: + kwargs = dict() + try: + result = fn(*args, **kwargs) + except Exception as e: + if error_callback is not None: + error_callback(e) + else: + if callback is not None: + callback(result) diff --git a/src/pypushflow/tests/concurrent/test_interrupt.py b/src/pypushflow/tests/concurrent/test_interrupt.py index e93d5cd..7596e67 100644 --- a/src/pypushflow/tests/concurrent/test_interrupt.py +++ b/src/pypushflow/tests/concurrent/test_interrupt.py @@ -17,8 +17,8 @@ def create_file(sleep_time: Number, filename: str) -> str: @pytest.mark.parametrize("max_workers", [None, 1]) @pytest.mark.parametrize("pool_type", utils.POOLS) def test_interrupt(scaling, max_workers, pool_type, tmpdir): - if pool_type == "thread": - pytest.skip("threads cannot be interrupted") + if pool_type in ("thread", "solo"): + pytest.skip("cannot be interrupted") if os.name == "nt" and pool_type in ( "process", "ndprocess", diff --git a/src/pypushflow/tests/test_workflow5.py b/src/pypushflow/tests/test_workflow5.py index 9ef6799..8e757fc 100644 --- a/src/pypushflow/tests/test_workflow5.py +++ b/src/pypushflow/tests/test_workflow5.py @@ -68,6 +68,7 @@ class Workflow5(Workflow): class TestWorkflow5(WorkflowTestCase): def test_Workflow5(self): + """This test will fail for a solo pool""" testWorkflow5 = Workflow5("Test workflow 5") inData = {"name": "Dog", "sleep": 5} outData = testWorkflow5.run( -- GitLab