Commit 19eb428d authored by Wout De Nolf's avatar Wout De Nolf
Browse files

simplify result/error handling in PythonActor

parent 81b6802c
Pipeline #54753 passed with stages
in 41 seconds
......@@ -29,7 +29,6 @@ import pprint
import logging
import datetime
import traceback
import functools
import importlib
import multiprocessing
import multiprocessing.pool
......@@ -40,31 +39,6 @@ from pypushflow.AbstractActor import AbstractActor
logger = logging.getLogger("pypushflow")
class ActorWrapperException(Exception):
def __init__(self, errorMessage="", traceBack="", data={}, msg=None):
super(ActorWrapperException, self).__init__(msg)
self.errorMessage = errorMessage
self.data = data
self.traceBack = traceBack
def trace_unhandled_exceptions(func):
@functools.wraps(func)
def wrapped_func(*args, **kwargs):
try:
outData = func(*args, **kwargs)
except Exception as e:
errorMessage = "{0}".format(e)
logger.exception(errorMessage)
traceBack = traceback.format_exc()
return ActorWrapperException(
errorMessage=errorMessage, traceBack=traceBack, data=args[1]
)
return outData
return wrapped_func
#############################################################################
# Create no daemon processes
# See : https://stackoverflow.com/a/53180921
......@@ -104,15 +78,15 @@ class AsyncFactory:
self.callback = callback
self.errorCallback = errorCallback
self.pool = Edna2Pool(1)
self.hasFinished = False
def call(self, *args, **kwargs):
logger.debug(
"Before apply_async, func={0}, callback={1}, errorCallback={2}".format(
self.func, self.callback, self.errorCallback
)
"Before apply_async, func=%s, callback=%s, errorCallback=%s",
self.func,
self.callback,
self.errorCallback,
)
logger.debug("args={0}, kwargs={1}".format(args, kwargs))
logger.debug("args=%s, kwargs=%s", args, kwargs)
self.pool.apply_async(
self.func,
args=args,
......@@ -121,23 +95,12 @@ class AsyncFactory:
error_callback=self.errorCallback,
)
self.pool.close()
logger.debug("After apply_async")
class ActorWrapper:
def __init__(self, name, method):
self.name = name
self.method = method
@trace_unhandled_exceptions
def run(self, *args, **kwargs):
logger.debug("In actor wrapper for {0}".format(self.name))
logger.debug(
"args={0}, kwargs={1}, method={2}".format(args, kwargs, self.method)
"After apply_async, func=%s, callback=%s, errorCallback=%s",
self.func,
self.callback,
self.errorCallback,
)
inData = args[0]
outData = self.method(**inData)
return outData
class PythonActor(AbstractActor):
......@@ -148,126 +111,117 @@ class PythonActor(AbstractActor):
self.parentErrorHandler = errorHandler
self.listErrorHandler = []
self.script = script
self.actorWrapper = None
self.inData = None
self.outData = None
self.af = None
def connectOnError(self, errorHandler):
self.listErrorHandler.append(errorHandler)
def trigger(self, inData):
def trigger(self, inData: dict):
logger.info("In trigger %s, inData = %s", self.name, pprint.pformat(inData))
self.setStarted()
self.inData = dict(inData)
self.uploadInDataToMongo(actorData={"inData": inData}, script=self.script)
logger.debug(
"In trigger {0}, inData = {1}".format(self.name, pprint.pformat(inData))
)
if isinstance(inData, ActorWrapperException):
logger.error(
"Error from previous actor! Not running actor {0}".format(self.name)
)
if self.parentErrorHandler is not None:
workflowException = inData
oldInData = workflowException.data
exceptionDict = {
"errorMessage": workflowException.errorMessage,
"traceBack": workflowException.traceBack.split("\n"),
}
oldInData["WorkflowException"] = exceptionDict
self.parentErrorHandler.triggerOnError(oldInData)
try:
module = importlib.import_module(os.path.splitext(self.script)[0])
except Exception as e:
logger.error("Error when trying to import script {0}".format(self.script))
logger.error("Error when trying to import script '%s'", self.script)
time.sleep(1)
self.errorHandler(inData, e)
self.errorHandler(e)
return
with self._postpone_end_thread(self.resultHandler, self.errorHandler) as (
resultHandler,
errorHandler,
):
self.af = AsyncFactory(
module.run, callback=resultHandler, errorCallback=errorHandler
)
self.af.call(**self.inData)
def resultHandler(self, result: dict):
# Handle the result
logger.debug("In resultHandler for '%s'", self.name)
self._finishedSuccess(result)
# Trigger actors
downstreamData = dict(self.inData)
downstreamData.update(result)
self._triggerDownStreamActors(downstreamData)
def errorHandler(self, exception: Exception):
# Handle the result
logger.error(
"Error in python actor '%s'! Not running down stream actors %s",
self.name,
[actor.name for actor in self.listDownStreamActor],
)
result = self._parseException(exception)
self._finishedFailure(result)
# Trigger actors
downstreamData = dict(self.inData)
downstreamData["WorkflowException"] = result
self._triggerErrorHandlers(downstreamData)
def _parseException(self, exception: Exception) -> dict:
errorMessage = str(exception)
if isinstance(exception.__cause__, multiprocessing.pool.RemoteTraceback):
exception = exception.__cause__
logger.error(exception)
elif isinstance(exception, multiprocessing.pool.MaybeEncodingError):
# This exception has no traceback
logger.error(exception)
else:
def errorHandler(e):
self.errorHandler(inData, e)
with self._postpone_end_thread(
self.triggerDownStreamActor, errorHandler
) as (callback, errorCallback):
actorWrapper = ActorWrapper(self.name, module.run)
self.af = AsyncFactory(
actorWrapper.run, callback=callback, errorCallback=errorCallback
)
self.af.call(self.inData)
def errorHandler(self, inData, exception):
logger.error("Error when running actor {0}!".format(self.name))
self.setFinished()
errorMessage = "{0}".format(exception)
logger.exception(errorMessage)
traceBack = traceback.format_exc().split("\n")
# workflowException = WorkflowException(
# errorMessage=errorMessage,
# traceBack=traceBack
# )
outData = dict(inData)
outData["WorkflowException"] = {
logger.exception(exception)
traceBack = traceback.format_exception(
type(exception), exception, exception.__traceback__
)
return {
"errorMessage": errorMessage,
"traceBack": traceBack,
}
logger.error(exception)
def _triggerDownStreamActors(self, downstreamData: dict):
for downStreamActor in self.listDownStreamActor:
logger.debug(
"In trigger %s, triggering actor %s, inData=%s",
self.name,
downStreamActor.name,
downstreamData,
)
downStreamActor.trigger(downstreamData)
def _triggerErrorHandlers(self, downstreamData: dict):
for errorHandler in self.listErrorHandler:
errorHandler.trigger(outData)
errorHandler.trigger(downstreamData)
if self.parentErrorHandler is not None:
self.parentErrorHandler.triggerOnError(outData)
def triggerDownStreamActor(self, inData={}):
logger.debug("In triggerDownStreamActor for {0}".format(self.name))
self.setFinished()
if isinstance(inData, ActorWrapperException):
logger.error(
"Error from previous actor! Not running down stream actors {0}".format(
[actor.name for actor in self.listDownStreamActor]
)
"Trigger on error on errorHandler '%s'", self.parentErrorHandler.name
)
workflowException = inData
oldInData = workflowException.data
exceptionDict = {
"errorMessage": workflowException.errorMessage,
"traceBack": workflowException.traceBack.split("\n"),
self.parentErrorHandler.triggerOnError(inData=downstreamData)
def _finishedSuccess(self, result: dict):
self.setFinished()
self.uploadOutDataToMongo(
actorData={
"stopTime": datetime.datetime.now(),
"status": "finished",
"outData": result,
}
oldInData["WorkflowException"] = exceptionDict
self.uploadOutDataToMongo(
actorData={
"stopTime": datetime.datetime.now(),
"status": "error",
"outData": exceptionDict,
}
)
for errorHandler in self.listErrorHandler:
errorHandler.trigger(oldInData)
if self.parentErrorHandler is not None:
logger.error(
'Trigger on error on errorHandler "{0}"'.format(
self.parentErrorHandler.name
)
)
self.parentErrorHandler.triggerOnError(inData=oldInData)
else:
outData = dict(inData)
self.uploadOutDataToMongo(
actorData={
"stopTime": datetime.datetime.now(),
"status": "finished",
"outData": outData,
}
)
if "workflowLogFile" in outData:
self.setMongoAttribute("logFile", outData["workflowLogFile"])
if "workflowDebugLogFile" in outData:
self.setMongoAttribute("debugLogFile", outData["workflowDebugLogFile"])
downstreamData = dict(self.inData)
downstreamData.update(outData)
for downStreamActor in self.listDownStreamActor:
logger.debug(
"In trigger {0}, triggering actor {1}, inData={2}".format(
self.name, downStreamActor.name, downstreamData
)
)
downStreamActor.trigger(downstreamData)
)
if "workflowLogFile" in result:
self.setMongoAttribute("logFile", result["workflowLogFile"])
if "workflowDebugLogFile" in result:
self.setMongoAttribute("debugLogFile", result["workflowDebugLogFile"])
def _finishedFailure(self, result: dict):
self.setFinished()
self.uploadOutDataToMongo(
actorData={
"stopTime": datetime.datetime.now(),
"status": "error",
"outData": result,
}
)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment