Commit 7a458935 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

improve logging by adding log context (workflow name, actor name)

parent e5aa438f
Pipeline #61748 passed with stages
in 36 seconds
......@@ -23,12 +23,9 @@ __authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
import logging
import pprint
from pypushflow.ThreadCountingActor import ThreadCountingActor
logger = logging.getLogger("pypushflow")
class AbstractActor(ThreadCountingActor):
def __init__(self, parent=None, name=None, **kw):
......@@ -38,19 +35,18 @@ class AbstractActor(ThreadCountingActor):
self.started = False
self.finished = False
def __str__(self) -> str:
return self.name
def connect(self, actor):
logger.debug(
'Connecting actor "{0}" to actor "{1}"'.format(self.name, actor.name)
)
self.logger.debug("connect to actor '%s'", actor.name)
self.listDownStreamActor.append(actor)
def trigger(self, inData):
self.logger.info("triggered with inData =\n %s", pprint.pformat(inData))
self.setStarted()
self.setFinished()
for actor in self.listDownStreamActor:
logger.debug(
'In actor "{0}", triggering actor "{1}"'.format(self.name, actor.name)
)
actor.trigger(inData)
def uploadInDataToMongo(self, actorData=None, script=None):
......@@ -79,12 +75,12 @@ class AbstractActor(ThreadCountingActor):
return self.started
def setStarted(self):
logger.debug("Setting started of {0} to True".format(self.name))
self.logger.info("started")
self.started = True
def hasFinished(self):
return self.finished
def setFinished(self):
logger.debug("Setting finished of {0} to True".format(self.name))
self.logger.info("finished")
self.finished = True
......@@ -23,6 +23,7 @@ __authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
import pprint
from pypushflow.AbstractActor import AbstractActor
......@@ -36,6 +37,7 @@ class JoinActor(AbstractActor):
self.numberOfThreads += 1
def trigger(self, inData):
self.logger.info("triggered with inData =\n %s", pprint.pformat(inData))
self.setStarted()
self.setFinished()
self.listInData.append(inData)
......
......@@ -26,7 +26,6 @@ __date__ = "28/05/2019"
import os
import time
import pprint
import logging
import datetime
import traceback
import importlib
......@@ -34,9 +33,7 @@ import multiprocessing
import multiprocessing.pool
from pypushflow.AbstractActor import AbstractActor
logger = logging.getLogger("pypushflow")
from pypushflow.logutils import PyPushflowLoggedObject
#############################################################################
......@@ -72,21 +69,20 @@ class Edna2Pool(multiprocessing.pool.Pool):
#############################################################################
class AsyncFactory:
def __init__(self, func, callback=None, errorCallback=None):
class AsyncFactory(PyPushflowLoggedObject):
def __init__(self, func, callback=None, errorCallback=None, parent=None):
super().__init__(parent=parent)
self.func = func
self.callback = callback
self.errorCallback = errorCallback
self.pool = Edna2Pool(1)
def call(self, *args, **kwargs):
logger.debug(
"Before apply_async, func=%s, callback=%s, errorCallback=%s",
self.func,
self.callback,
self.errorCallback,
self.logger.debug(
"asynchronous execution of '%s.%s'",
self.func.__module__,
self.func.__name__,
)
logger.debug("args=%s, kwargs=%s", args, kwargs)
self.pool.apply_async(
self.func,
args=args,
......@@ -95,12 +91,6 @@ class AsyncFactory:
error_callback=self.errorCallback,
)
self.pool.close()
logger.debug(
"After apply_async, func=%s, callback=%s, errorCallback=%s",
self.func,
self.callback,
self.errorCallback,
)
class PythonActor(AbstractActor):
......@@ -114,11 +104,12 @@ class PythonActor(AbstractActor):
self.inData = None
self.af = None
def connectOnError(self, errorHandler):
self.listErrorHandler.append(errorHandler)
def connectOnError(self, actor):
self.logger.debug("connect to error handler '%s'", actor.name)
self.listErrorHandler.append(actor)
def trigger(self, inData: dict):
logger.debug("In trigger %s, inData = %s", self.name, pprint.pformat(inData))
self.logger.info("triggered with inData =\n %s", pprint.pformat(inData))
self.setStarted()
self.inData = dict(inData)
self.uploadInDataToMongo(actorData={"inData": inData}, script=self.script)
......@@ -126,7 +117,7 @@ class PythonActor(AbstractActor):
try:
module = importlib.import_module(os.path.splitext(self.script)[0])
except Exception as e:
logger.error("Error when trying to import script '%s'", self.script)
self.logger.error("Error when trying to import script '%s'", self.script)
time.sleep(1)
self.errorHandler(e)
return
......@@ -136,7 +127,10 @@ class PythonActor(AbstractActor):
errorHandler,
):
self.af = AsyncFactory(
module.run, callback=resultHandler, errorCallback=errorHandler
module.run,
callback=resultHandler,
errorCallback=errorHandler,
parent=self,
)
self.af.call(**self.inData)
......@@ -144,7 +138,6 @@ class PythonActor(AbstractActor):
"""Async callback in case of success"""
try:
# Handle the result
logger.debug("In resultHandler for '%s'", self.name)
self._finishedSuccess(result)
# Trigger actors
......@@ -158,12 +151,7 @@ class PythonActor(AbstractActor):
"""Async callback in case of exception"""
try:
# Handle the result
logger.debug("In errorHandler for '%s'", self.name)
logger.error(
"Error in python actor '%s'! Not running down stream actors %s",
self.name,
[actor.name for actor in self.listDownStreamActor],
)
self._logException(exception)
result = self._parseException(exception)
self._finishedFailure(result)
......@@ -172,18 +160,28 @@ class PythonActor(AbstractActor):
downstreamData["WorkflowException"] = result
self._triggerErrorHandlers(downstreamData)
except Exception:
logger.exception("In errorHandler for '%s'", self.name)
self.logger.exception("In errorHandler for '%s'", self.name)
def _parseException(self, exception: Exception) -> dict:
errorMessage = str(exception)
def _logException(self, exception: Exception) -> None:
if isinstance(exception.__cause__, multiprocessing.pool.RemoteTraceback):
exception = exception.__cause__
logger.error(exception)
logfunc = self.logger.error
elif isinstance(exception, multiprocessing.pool.MaybeEncodingError):
# This exception has no traceback
logger.error(exception)
logfunc = self.logger.error
else:
logger.exception(exception)
logfunc = self.logger.exception
logfunc(
"Error in python actor '%s'!\n Not running down stream actors %s\n Exception:%s",
self.name,
[actor.name for actor in self.listDownStreamActor],
exception,
)
def _parseException(self, exception: Exception) -> dict:
errorMessage = str(exception)
if isinstance(exception.__cause__, multiprocessing.pool.RemoteTraceback):
exception = exception.__cause__
traceBack = traceback.format_exception(
type(exception), exception, exception.__traceback__
)
......@@ -194,11 +192,10 @@ class PythonActor(AbstractActor):
def _triggerDownStreamActors(self, downstreamData: dict):
for downStreamActor in self.listDownStreamActor:
logger.debug(
"In trigger %s, triggering actor %s, inData=%s",
self.name,
self.logger.debug(
"trigger actor '%s' with inData =\n %s",
downStreamActor.name,
downstreamData,
pprint.pformat(downstreamData),
)
downStreamActor.trigger(downstreamData)
......@@ -206,9 +203,6 @@ class PythonActor(AbstractActor):
for errorHandler in self.listErrorHandler:
errorHandler.trigger(downstreamData)
if self.parentErrorHandler is not None:
logger.error(
"Trigger on error on errorHandler '%s'", self.parentErrorHandler.name
)
self.parentErrorHandler.triggerOnError(inData=downstreamData)
def _finishedSuccess(self, result: dict):
......
......@@ -23,12 +23,9 @@ __authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
import pprint
from pypushflow.AbstractActor import AbstractActor
import logging
logger = logging.getLogger("pypushflow")
class RouterActor(AbstractActor):
def __init__(
......@@ -38,7 +35,7 @@ class RouterActor(AbstractActor):
name="Router",
itemName=None,
listPort=None,
**kw
**kw,
):
super().__init__(parent=parent, name=name, **kw)
self.errorHandler = errorHandler
......@@ -53,9 +50,7 @@ class RouterActor(AbstractActor):
def connect(self, actor, expectedValue="other"):
if expectedValue != "other" and expectedValue not in self.listPort:
raise RuntimeError(
"Port {0} not defined for router actor {1}!".format(
expectedValue, self.name
)
f"Port {expectedValue} not defined for router actor {self.name}!"
)
if expectedValue in self.dictValues:
self.dictValues[expectedValue].append(actor)
......@@ -63,18 +58,14 @@ class RouterActor(AbstractActor):
self.dictValues[expectedValue] = [actor]
def trigger(self, inData):
logger.debug('In router actor "{0}"'.format(self.name))
self.logger.info("triggered with inData =\n %s", pprint.pformat(inData))
self.setStarted()
self.setFinished()
listActor = None
if self.itemName in inData:
logger.debug(
'In router actor "{0}", itemName {1} in inData'.format(
self.name, self.itemName
)
)
self.logger.debug("router item = '%s'", self.itemName)
value = inData[self.itemName]
logger.debug('In router actor "{0}", value = {1}'.format(self.name, value))
self.logger.debug("router item value = %s", self.itemName, value)
if value in [None, "None", "null"]:
value = "null"
elif type(value) == bool:
......@@ -85,17 +76,10 @@ class RouterActor(AbstractActor):
if not isinstance(value, dict) and value in self.dictValues:
listActor = self.dictValues[value]
if listActor is None:
logger.debug('In router actor "{0}", actor is None')
self.logger.debug("no router destinations for inData")
if "other" in self.dictValues:
listActor = self.dictValues["other"]
else:
raise RuntimeError(
'No "other" port for router actor "{0}"'.format(self.name)
)
raise RuntimeError(f"No 'other' port for router actor '{self.name}'")
for actor in listActor:
logger.debug(
'In router actor "{0}", triggering actor "{1}"'.format(
self.name, actor.name
)
)
actor.trigger(inData)
......@@ -23,13 +23,10 @@ __authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
import logging
import pprint
from pypushflow import Submodel
from pypushflow.ThreadCountingActor import ThreadCountingActor
logger = logging.getLogger("pypushflow")
class StopActor(ThreadCountingActor):
def __init__(self, parent=None, errorHandler=None, name="Stop actor", **kw):
......@@ -38,9 +35,7 @@ class StopActor(ThreadCountingActor):
self.outData = None
def trigger(self, inData):
logger.debug(
"In trigger {0}, errorHandler = {1}".format(self.name, self.errorHandler)
)
self.logger.info("triggered with inData =\n %s", pprint.pformat(inData))
if self.parent is not None and not isinstance(self.parent, Submodel.Submodel):
# Parent is a Workflow
self.outData = inData
......@@ -51,18 +46,10 @@ class StopActor(ThreadCountingActor):
def join(self, timeout=7200):
if self.parent is not None:
logger.debug(
"In {0}, parent {1}, before wait_threads_finished".format(
self.name, self.parent.name
)
)
self.logger.debug("wait for scheduler threads to be finished")
success = self._wait_threads_finished(timeout=timeout)
if self.parent is not None:
logger.debug(
"In {0}, parent {1}, after wait_threads_finished".format(
self.name, self.parent.name
)
)
self.logger.debug("scheduler threads are finished")
self._finalizeInMongo(success)
return success
......@@ -70,14 +57,8 @@ class StopActor(ThreadCountingActor):
if self.parent is None:
return
if success:
logger.debug(
"In {0}, parent {1}, finished".format(self.name, self.parent.name)
)
self.logger.debug("finished")
self.parent.setStatus("finished")
else:
logger.error(
"In {0}, parent {1}, timeout detected".format(
self.name, self.parent.name
)
)
self.logger.error("timeout detected")
self.parent.setStatus("timeout")
......@@ -24,11 +24,9 @@ __license__ = "MIT"
__date__ = "28/05/2019"
import logging
import pprint
from pypushflow.ThreadCountingActor import ThreadCountingActor
logger = logging.getLogger("pypushflow")
from pypushflow.logutils import PyPushflowLoggedObject
class Port(ThreadCountingActor):
......@@ -39,32 +37,35 @@ class Port(ThreadCountingActor):
self.inPortTrigger = None
def connect(self, actor):
logger.debug("Connect {0} -> actorName {1}".format(self.name, actor.name))
self.logger.debug("connectig to '%s'", actor.name)
self.listActor.append(actor)
def setTrigger(self, trigger):
self.inPortTrigger = trigger
def trigger(self, *args, **kwargs):
logger.debug("In {0} trigger".format(self.name))
def trigger(self, inData):
self.logger.info(
"triggered with inData =\n %s",
pprint.pformat(inData),
)
if len(self.listActor) > 0:
for actor in self.listActor:
logger.debug(
"In trigger {0} -> actorName {1}".format(
self.errorHandler.name, actor.name
)
self.logger.debug(
"In trigger '%s' -> actorName '%s'",
self.errorHandler.name,
actor.name,
)
actor.trigger(*args, **kwargs)
actor.trigger(inData)
if self.inPortTrigger is not None:
logger.debug(
"In {0} trigger, trigger = {1}".format(
self.errorHandler.name, self.inPortTrigger
)
self.logger.debug(
"In '%s' trigger, trigger = '%s'",
self.errorHandler.name,
self.inPortTrigger,
)
self.inPortTrigger(*args, **kwargs)
self.inPortTrigger(inData)
class Submodel:
class Submodel(PyPushflowLoggedObject):
def __init__(
self,
parent=None,
......@@ -73,6 +74,7 @@ class Submodel:
portNames=["In", "Out"],
thread_counter=None,
):
super().__init__(log_metadata={"submodel": name}, parent=parent)
self.parent = parent
self.name = name
if errorHandler is None:
......@@ -97,38 +99,23 @@ class Submodel:
return self.parent.getActorPath() + "/" + self.name.replace("%", " ")
def getPort(self, portName):
logger.debug("In {0} getPort, portName = {1}".format(self.name, portName))
return self.dictPort[portName]
def connect(self, actor, portName="Out"):
logger.debug(
"In {0} connect, portName = {2} -> actorName = {1}".format(
self.name, actor.name, portName
)
)
self.dictPort[portName].connect(actor)
def connectOnError(self, actor):
logger.debug(
"In connectOnError in subModule {0}, actor name {1}".format(
self.name, actor.name
)
)
self.logger.debug("connect to error handler '%s'", actor.name)
self.listOnErrorActor.append(actor)
def triggerOnError(self, *args, **kwargs):
def triggerOnError(self, inData):
self.logger.info(
"triggered due to error with inData =\n %s", pprint.pformat(inData)
)
for onErrorActor in self.listOnErrorActor:
logger.debug(
"In triggerOnError in subModule {0}, trigger actor {1}, inData = {2}".format(
self.name, onErrorActor.name, args[0]
)
)
onErrorActor.trigger(*args, **kwargs)
onErrorActor.trigger(inData)
if self.errorHandler is not None:
logger.error(
'Trigger on error on errorHandler "{0}"'.format(self.errorHandler.name)
)
self.errorHandler.triggerOnError(*args, **kwargs)
self.errorHandler.triggerOnError(inData)
def addActorRef(self, actorRef):
if self.parent is not None:
......
import logging
from threading import Condition
from pypushflow.logutils import PyPushflowLoggedObject
logger = logging.getLogger("pypushflow")
class ThreadCounter:
class ThreadCounter(PyPushflowLoggedObject):
"""Scheduling thread counter"""
def __init__(self):
def __init__(self, parent=None):
self.__counter = 0
self.__condition = Condition()
super().__init__(parent=parent)
def start_thread(self, msg=None):
with self.__condition:
......@@ -48,4 +46,4 @@ class ThreadCounter:
def _log_counter_change(self, msg=None):
if msg is None:
msg = "Thread counter changed"
logger.debug("%s (%d threads running)", msg, self.__counter)
self.logger.debug("%s (%d threads running)", msg, self.__counter)
from functools import wraps
from contextlib import contextmanager
from pypushflow.logutils import PyPushflowLoggedObject
def with_thread_context(trigger):
......@@ -26,7 +27,7 @@ def callback_with_end_thread(async_callback, end_thread, log_msg):
return wrapper
class ThreadCountingActor:
class ThreadCountingActor(PyPushflowLoggedObject):
"""The `trigger` method of will increase the thread counter
at the start and decrease the thread counter at the end.
"""
......@@ -36,6 +37,7 @@ class ThreadCountingActor:
raise RuntimeError("Actor name is None!")
if thread_counter is None:
raise ValueError("Actor requires a 'thread_counter' argument")
super().__init__(log_metadata={"actor": name}, parent=parent)
self.name = name
self.parent = parent
if parent is not None:
......@@ -101,5 +103,5 @@ class ThreadCountingActor:
def _wait_threads_finished(self, **kw):
return self.__thread_counter.wait_threads_finished(**kw)
def trigger(self, **kw):
def trigger(self, inData):
raise NotImplementedError
......@@ -24,79 +24,43 @@ __license__ = "MIT"
__date__ = "28/05/2019"
import os
import pprint
import logging
import pathlib
import logging.handlers
from pypushflow.persistence import db_client
from pypushflow.logutils import PyPushflowLoggedObject
from pypushflow.logutils import basicConfig
class Workflow(object):
class Workflow(PyPushflowLoggedObject):
def __init__(self, name):
basicConfig(filename=name)
super().__init__(log_metadata={"workflow": name})
self.logger.info("\n\nStarting new workflow %s\n", name)
self.name = name
self.listOnErrorActor = []
self.db_client = db_client()
self.db_client.startWorkflow(name)
self.listActorRef = []
self.logger = self.initLogger(name)
def connectOnError(self, actor):
self.logger.debug(
"In Workflow '{0}' connectOnError, actor name {1}".format(
self.name, actor.name
)
)
self.logger.debug("connect to error handler '%s'", actor.name)
self.listOnErrorActor.append(actor)
def triggerOnError(self, inData):
self.logger.debug("In Workflow '{0}' triggerOnError, inData:".format(self.name))
self.logger.debug(pprint.pformat(inData))
self.logger.info(
"triggered due to error with inData =\n %s", pprint.pformat(inData)
)
for onErrorActor in self.listOnErrorActor:
self.logger.debug(
"In Workflow '{0}' triggerOnError, triggering actor name {1}".format(
self.name, onErrorActor.name
)
)
onErrorActor.trigger(inData)
def getActorPath(self):
return "/" + self.name