Commit 6e9d6766 authored by payno's avatar payno
Browse files

Using a pool with no daemon process

parent 7cc4f442
...@@ -28,12 +28,13 @@ import pprint ...@@ -28,12 +28,13 @@ import pprint
import logging import logging
import traceback import traceback
import functools import functools
import multiprocessing.pool import multiprocessing
from multiprocessing.pool import Pool as _Pool
from pypushflow.AbstractActor import AbstractActor from pypushflow.AbstractActor import AbstractActor
from pypushflow.representation.scheme.node import Node from pypushflow.representation.scheme.node import Node
logger = logging.getLogger('pypushflow') logger = logging.getLogger(__name__)
class WorkflowException(Exception): class WorkflowException(Exception):
...@@ -81,7 +82,7 @@ class NoDaemonProcess(multiprocessing.Process): ...@@ -81,7 +82,7 @@ class NoDaemonProcess(multiprocessing.Process):
# because the latter is only a wrapper function, not a proper class. # because the latter is only a wrapper function, not a proper class.
class Edna2Pool(multiprocessing.pool.Pool): class Pool(_Pool):
Process = NoDaemonProcess Process = NoDaemonProcess
# #
...@@ -115,9 +116,7 @@ class AsyncFactory: ...@@ -115,9 +116,7 @@ class AsyncFactory:
self.node = node self.node = node
self.callback = callback self.callback = callback
self.errorCallback = errorCallback self.errorCallback = errorCallback
# self.pool = Edna2Pool(1) self.pool = Pool(1)
self.pool = multiprocessing.Pool(1)
# TODO: this shouldn't be limited to 1
def call(self, *args, **kwargs): def call(self, *args, **kwargs):
logger.debug('Before apply_async, func={0}, callback={1}, errorCallback={2}'.format( logger.debug('Before apply_async, func={0}, callback={1}, errorCallback={2}'.format(
...@@ -273,6 +272,7 @@ class PythonActor(AbstractActor): ...@@ -273,6 +272,7 @@ class PythonActor(AbstractActor):
'errorMessage': workflowException.errorMessage, 'errorMessage': workflowException.errorMessage,
'traceBack': workflowException.traceBack.split('\n'), 'traceBack': workflowException.traceBack.split('\n'),
} }
logger.warning('oldInData type: {}, value: {}'.format(type(oldInData), oldInData))
oldInData['WorkflowException'] = exceptionDict oldInData['WorkflowException'] = exceptionDict
for errorHandler in self.list_error_handler: for errorHandler in self.list_error_handler:
errorHandler.trigger((None, oldInData)) errorHandler.trigger((None, oldInData))
......
Subproject commit e99104e134c9932d2be81c3870f5c923ba62cccc Subproject commit 4513c7e79561e6e687badce4f98fb9dd8a26ac04
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment