Commit 3f8990b0 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

black auto format

parent b382c6cd
......@@ -28,11 +28,10 @@ import logging
from pypushflow import UtilsMongoDb
from pypushflow.ThreadCountingActor import ThreadCountingActor
logger = logging.getLogger('pypushflow')
logger = logging.getLogger("pypushflow")
class AbstractActor(ThreadCountingActor):
def __init__(self, parent=None, name=None, **kw):
super().__init__(name=name, parent=parent, **kw)
self.listDownStreamActor = []
......@@ -41,29 +40,29 @@ class AbstractActor(ThreadCountingActor):
self.finished = False
def connect(self, actor):
logger.debug('Connecting actor "{0}" to actor "{1}"'.format(
self.name, actor.name
))
logger.debug(
'Connecting actor "{0}" to actor "{1}"'.format(self.name, actor.name)
)
self.listDownStreamActor.append(actor)
def trigger(self, inData):
self.setStarted()
self.setFinished()
for actor in self.listDownStreamActor:
logger.debug('In actor "{0}", triggering actor "{1}"'.format(
self.name, actor.name
))
logger.debug(
'In actor "{0}", triggering actor "{1}"'.format(self.name, actor.name)
)
actor.trigger(inData)
def uploadInDataToMongo(self, actorData={}, script=None):
if self.parent is not None:
if self.parent.mongoId is not None:
actorPath = self.getActorPath() + '/' + self.name
actorPath = self.getActorPath() + "/" + self.name
self.actorId = UtilsMongoDb.initActor(
workflowId=self.parent.mongoId,
name=actorPath,
actorData=actorData,
script=script
script=script,
)
def uploadOutDataToMongo(self, actorData={}, script=None):
......@@ -72,7 +71,7 @@ class AbstractActor(ThreadCountingActor):
UtilsMongoDb.addDataToActor(
workflowId=self.parent.mongoId,
actorId=self.actorId,
actorData=actorData
actorData=actorData,
)
def setMongoAttribute(self, attribute, value):
......@@ -96,4 +95,3 @@ class AbstractActor(ThreadCountingActor):
def setFinished(self):
logger.debug("Setting finished of {0} to True".format(self.name))
self.finished = True
......@@ -29,11 +29,10 @@ from pypushflow.AbstractActor import AbstractActor
class ErrorHandler(AbstractActor):
def __init__(self, parent=None, name='Error handler', **kw):
def __init__(self, parent=None, name="Error handler", **kw):
super().__init__(parent=parent, name=name, **kw)
def trigger(self, inData):
if self.parent is not None and hasattr(self.parent, 'mongoId'):
UtilsMongoDb.setMongoStatus(self.parent.mongoId, 'error')
if self.parent is not None and hasattr(self.parent, "mongoId"):
UtilsMongoDb.setMongoStatus(self.parent.mongoId, "error")
super().trigger(inData=inData)
......@@ -27,6 +27,5 @@ from pypushflow.AbstractActor import AbstractActor
class ForkActor(AbstractActor):
def __init__(self, parent=None, name='Fork actor', **kw):
def __init__(self, parent=None, name="Fork actor", **kw):
super().__init__(parent=parent, name=name, **kw)
......@@ -27,8 +27,7 @@ from pypushflow.AbstractActor import AbstractActor
class JoinActor(AbstractActor):
def __init__(self, parent=None, name='Join actor', **kw):
def __init__(self, parent=None, name="Join actor", **kw):
super().__init__(parent=parent, name=name, **kw)
self.numberOfThreads = 0
self.listInData = []
......@@ -45,4 +44,4 @@ class JoinActor(AbstractActor):
for data in self.listInData:
newInData.update(data)
for actor in self.listDownStreamActor:
actor.trigger(newInData)
\ No newline at end of file
actor.trigger(newInData)
......@@ -37,11 +37,10 @@ import multiprocessing.pool
from pypushflow.AbstractActor import AbstractActor
logger = logging.getLogger('pypushflow')
logger = logging.getLogger("pypushflow")
class ActorWrapperException(Exception):
def __init__(self, errorMessage="", traceBack="", data={}, msg=None):
super(ActorWrapperException, self).__init__(msg)
self.errorMessage = errorMessage
......@@ -55,15 +54,14 @@ def trace_unhandled_exceptions(func):
try:
outData = func(*args, **kwargs)
except Exception as e:
errorMessage = '{0}'.format(e)
errorMessage = "{0}".format(e)
logger.exception(errorMessage)
traceBack = traceback.format_exc()
return ActorWrapperException(
errorMessage=errorMessage,
traceBack=traceBack,
data=args[1]
errorMessage=errorMessage, traceBack=traceBack, data=args[1]
)
return outData
return wrapped_func
......@@ -109,32 +107,43 @@ class AsyncFactory:
self.hasFinished = False
def call(self, *args, **kwargs):
logger.debug('Before apply_async, func={0}, callback={1}, errorCallback={2}'.format(
self.func, self.callback, self.errorCallback))
logger.debug('args={0}, kwargs={1}'.format(args, kwargs))
self.pool.apply_async(self.func, args=args, kwds=kwargs, callback=self.callback, error_callback=self.errorCallback)
logger.debug(
"Before apply_async, func={0}, callback={1}, errorCallback={2}".format(
self.func, self.callback, self.errorCallback
)
)
logger.debug("args={0}, kwargs={1}".format(args, kwargs))
self.pool.apply_async(
self.func,
args=args,
kwds=kwargs,
callback=self.callback,
error_callback=self.errorCallback,
)
self.pool.close()
logger.debug('After apply_async')
logger.debug("After apply_async")
class ActorWrapper:
def __init__(self, name, method):
self.name = name
self.method = method
@trace_unhandled_exceptions
def run(self, *args, **kwargs):
logger.debug('In actor wrapper for {0}'.format(self.name))
logger.debug('args={0}, kwargs={1}, method={2}'.format(args, kwargs, self.method))
logger.debug("In actor wrapper for {0}".format(self.name))
logger.debug(
"args={0}, kwargs={1}, method={2}".format(args, kwargs, self.method)
)
inData = args[0]
outData = self.method(**inData)
return outData
class PythonActor(AbstractActor):
def __init__(self, parent=None, name='Python Actor', errorHandler=None, script=None, **kw):
def __init__(
self, parent=None, name="Python Actor", errorHandler=None, script=None, **kw
):
super().__init__(parent=parent, name=name, **kw)
self.parentErrorHandler = errorHandler
self.listErrorHandler = []
......@@ -150,23 +159,27 @@ class PythonActor(AbstractActor):
def trigger(self, inData):
self.setStarted()
self.inData = dict(inData)
self.uploadInDataToMongo(actorData={'inData': inData}, script=self.script)
logger.debug('In trigger {0}, inData = {1}'.format(self.name, pprint.pformat(inData)))
self.uploadInDataToMongo(actorData={"inData": inData}, script=self.script)
logger.debug(
"In trigger {0}, inData = {1}".format(self.name, pprint.pformat(inData))
)
if isinstance(inData, ActorWrapperException):
logger.error('Error from previous actor! Not running actor {0}'.format(self.name))
logger.error(
"Error from previous actor! Not running actor {0}".format(self.name)
)
if self.parentErrorHandler is not None:
workflowException = inData
oldInData = workflowException.data
exceptionDict = {
'errorMessage': workflowException.errorMessage,
'traceBack': workflowException.traceBack.split('\n'),
"errorMessage": workflowException.errorMessage,
"traceBack": workflowException.traceBack.split("\n"),
}
oldInData['WorkflowException'] = exceptionDict
oldInData["WorkflowException"] = exceptionDict
self.parentErrorHandler.triggerOnError(oldInData)
try:
module = importlib.import_module(os.path.splitext(self.script)[0])
except Exception as e:
logger.error('Error when trying to import script {0}'.format(self.script))
logger.error("Error when trying to import script {0}".format(self.script))
time.sleep(1)
self.errorHandler(inData, e)
else:
......@@ -180,9 +193,9 @@ class PythonActor(AbstractActor):
self.af.call(self.inData)
def errorHandler(self, inData, exception):
logger.error('Error when running actor {0}!'.format(self.name))
logger.error("Error when running actor {0}!".format(self.name))
self.setFinished()
errorMessage = '{0}'.format(exception)
errorMessage = "{0}".format(exception)
logger.exception(errorMessage)
traceBack = traceback.format_exc().split("\n")
# workflowException = WorkflowException(
......@@ -190,9 +203,9 @@ class PythonActor(AbstractActor):
# traceBack=traceBack
# )
outData = dict(inData)
outData['WorkflowException'] = {
outData["WorkflowException"] = {
"errorMessage": errorMessage,
"traceBack": traceBack
"traceBack": traceBack,
}
logger.error(exception)
for errorHandler in self.listErrorHandler:
......@@ -201,34 +214,46 @@ class PythonActor(AbstractActor):
self.parentErrorHandler.triggerOnError(outData)
def triggerDownStreamActor(self, inData={}):
logger.debug('In triggerDownStreamActor for {0}'.format(self.name))
logger.debug("In triggerDownStreamActor for {0}".format(self.name))
self.setFinished()
if isinstance(inData, ActorWrapperException):
logger.error('Error from previous actor! Not running down stream actors {0}'.format([actor.name for actor in self.listDownStreamActor]))
logger.error(
"Error from previous actor! Not running down stream actors {0}".format(
[actor.name for actor in self.listDownStreamActor]
)
)
workflowException = inData
oldInData = workflowException.data
exceptionDict = {
'errorMessage': workflowException.errorMessage,
'traceBack': workflowException.traceBack.split('\n'),
"errorMessage": workflowException.errorMessage,
"traceBack": workflowException.traceBack.split("\n"),
}
oldInData['WorkflowException'] = exceptionDict
self.uploadOutDataToMongo(actorData={
'stopTime': datetime.datetime.now(),
'status': 'error',
'outData': exceptionDict
})
oldInData["WorkflowException"] = exceptionDict
self.uploadOutDataToMongo(
actorData={
"stopTime": datetime.datetime.now(),
"status": "error",
"outData": exceptionDict,
}
)
for errorHandler in self.listErrorHandler:
errorHandler.trigger(oldInData)
if self.parentErrorHandler is not None:
logger.error('Trigger on error on errorHandler "{0}"'.format(self.parentErrorHandler.name))
logger.error(
'Trigger on error on errorHandler "{0}"'.format(
self.parentErrorHandler.name
)
)
self.parentErrorHandler.triggerOnError(inData=oldInData)
else:
outData = dict(inData)
self.uploadOutDataToMongo(actorData={
'stopTime': datetime.datetime.now(),
'status': 'finished',
'outData': outData
})
self.uploadOutDataToMongo(
actorData={
"stopTime": datetime.datetime.now(),
"status": "finished",
"outData": outData,
}
)
if "workflowLogFile" in outData:
self.setMongoAttribute("logFile", outData["workflowLogFile"])
if "workflowDebugLogFile" in outData:
......@@ -236,6 +261,9 @@ class PythonActor(AbstractActor):
downstreamData = dict(self.inData)
downstreamData.update(outData)
for downStreamActor in self.listDownStreamActor:
logger.debug('In trigger {0}, triggering actor {1}, inData={2}'.format(self.name, downStreamActor.name, downstreamData))
logger.debug(
"In trigger {0}, triggering actor {1}, inData={2}".format(
self.name, downStreamActor.name, downstreamData
)
)
downStreamActor.trigger(downstreamData)
......@@ -27,13 +27,17 @@ from pypushflow import UtilsMongoDb
from pypushflow.AbstractActor import AbstractActor
class RequestStatus(AbstractActor):
def __init__(self, parent=None, name='Request status', status=None, **kw):
class RequestStatus(AbstractActor):
def __init__(self, parent=None, name="Request status", status=None, **kw):
super().__init__(parent=parent, name=name, **kw)
self.status=status
self.status = status
def trigger(self, inData):
if self.parent is not None and hasattr(self.parent, 'mongoId') and self.status is not None:
if (
self.parent is not None
and hasattr(self.parent, "mongoId")
and self.status is not None
):
UtilsMongoDb.setMongoStatus(self.parent.mongoId, self.status)
super().trigger(inData=inData)
......@@ -26,13 +26,20 @@ __date__ = "28/05/2019"
from pypushflow.AbstractActor import AbstractActor
import logging
logger = logging.getLogger('pypushflow')
logger = logging.getLogger("pypushflow")
class RouterActor(AbstractActor):
def __init__(self, parent=None, errorHandler=None, name='Router',
itemName=None, listPort=None, **kw):
class RouterActor(AbstractActor):
def __init__(
self,
parent=None,
errorHandler=None,
name="Router",
itemName=None,
listPort=None,
**kw
):
super().__init__(parent=parent, name=name, **kw)
self.errorHandler = errorHandler
self.name = name
......@@ -43,14 +50,16 @@ class RouterActor(AbstractActor):
self.listPort = listPort
self.dictValues = {}
def connect(self, actor, expectedValue='other'):
if expectedValue != 'other' and not expectedValue in self.listPort:
def connect(self, actor, expectedValue="other"):
if expectedValue != "other" and not expectedValue in self.listPort:
raise RuntimeError(
'Port {0} not defined for router actor {1}!'.format(
expectedValue, self.name))
"Port {0} not defined for router actor {1}!".format(
expectedValue, self.name
)
)
if expectedValue in self.dictValues:
self.dictValues[expectedValue].append(actor)
else:
else:
self.dictValues[expectedValue] = [actor]
def trigger(self, inData):
......@@ -61,13 +70,13 @@ class RouterActor(AbstractActor):
if self.itemName in inData:
logger.debug(
'In router actor "{0}", itemName {1} in inData'.format(
self.name, self.itemName))
self.name, self.itemName
)
)
value = inData[self.itemName]
logger.debug(
'In router actor "{0}", value = {1}'.format(
self.name, value))
if value in [None, 'None', 'null']:
value = 'null'
logger.debug('In router actor "{0}", value = {1}'.format(self.name, value))
if value in [None, "None", "null"]:
value = "null"
elif type(value) == bool:
if value:
value = "true"
......@@ -77,13 +86,16 @@ class RouterActor(AbstractActor):
listActor = self.dictValues[value]
if listActor is None:
logger.debug('In router actor "{0}", actor is None')
if 'other' in self.dictValues:
listActor = self.dictValues['other']
if "other" in self.dictValues:
listActor = self.dictValues["other"]
else:
raise RuntimeError(
'No "other" port for router actor "{0}"'.format(self.name))
'No "other" port for router actor "{0}"'.format(self.name)
)
for actor in listActor:
logger.debug(
'In router actor "{0}", triggering actor "{1}"'.format(
self.name, actor.name))
self.name, actor.name
)
)
actor.trigger(inData)
......@@ -27,6 +27,5 @@ from pypushflow.AbstractActor import AbstractActor
class StartActor(AbstractActor):
def __init__(self, parent=None, name='Start actor', **kw):
def __init__(self, parent=None, name="Start actor", **kw):
super().__init__(parent=parent, name=name, **kw)
......@@ -29,18 +29,19 @@ from pypushflow import UtilsMongoDb
from pypushflow import Submodel
from pypushflow.ThreadCountingActor import ThreadCountingActor
logger = logging.getLogger('pypushflow')
logger = logging.getLogger("pypushflow")
class StopActor(ThreadCountingActor):
def __init__(self, parent=None, errorHandler=None, name='Stop actor', **kw):
def __init__(self, parent=None, errorHandler=None, name="Stop actor", **kw):
super().__init__(name=name, parent=parent, **kw)
self.errorHandler = errorHandler
self.outData = None
def trigger(self, inData):
logger.debug('In trigger {0}, errorHandler = {1}'.format(self.name, self.errorHandler))
logger.debug(
"In trigger {0}, errorHandler = {1}".format(self.name, self.errorHandler)
)
if self.parent is not None and not isinstance(self.parent, Submodel.Submodel):
# Parent is a Workflow
self.outData = inData
......@@ -51,10 +52,18 @@ class StopActor(ThreadCountingActor):
def join(self, timeout=7200):
if self.parent is not None:
logger.debug('In {0}, parent {1}, before wait_threads_finished'.format(self.name, self.parent.name))
logger.debug(
"In {0}, parent {1}, before wait_threads_finished".format(
self.name, self.parent.name
)
)
success = self._wait_threads_finished(timeout=timeout)
if self.parent is not None:
logger.debug('In {0}, parent {1}, after wait_threads_finished'.format(self.name, self.parent.name))
logger.debug(
"In {0}, parent {1}, after wait_threads_finished".format(
self.name, self.parent.name
)
)
self._finalizeInMongo(success)
return success
......@@ -62,8 +71,14 @@ class StopActor(ThreadCountingActor):
if self.parent is None:
return
if success:
logger.debug('In {0}, parent {1}, finished'.format(self.name, self.parent.name))
UtilsMongoDb.closeMongo(self.parent.mongoId, status='finished')
logger.debug(
"In {0}, parent {1}, finished".format(self.name, self.parent.name)
)
UtilsMongoDb.closeMongo(self.parent.mongoId, status="finished")
else:
logger.error('In {0}, parent {1}, timeout detected'.format(self.name, self.parent.name))
UtilsMongoDb.closeMongo(self.parent.mongoId, status='timeout')
logger.error(
"In {0}, parent {1}, timeout detected".format(
self.name, self.parent.name
)
)
UtilsMongoDb.closeMongo(self.parent.mongoId, status="timeout")
......@@ -31,9 +31,12 @@ try:
import pymongo
import bson
from bson.objectid import ObjectId
USE_MONGODB = True
except:
print("Error when trying to import pymongo and/or bson - no MongoDB connection possible")
print(
"Error when trying to import pymongo and/or bson - no MongoDB connection possible"
)
USE_MONGODB = False
......@@ -44,7 +47,7 @@ def getDateTimeString():
def getMongoUrl():
mongoUrl = None
if USE_MONGODB:
mongoUrl = os.environ.get('PYPUSHFLOW_MONGOURL', None)
mongoUrl = os.environ.get("PYPUSHFLOW_MONGOURL", None)
return mongoUrl
......@@ -52,21 +55,21 @@ def initMongo(name):
workflowId = None
mongoUrl = getMongoUrl()
if mongoUrl is not None:
initiator = os.environ.get('PYPUSHFLOW_INITIATOR', 'Unknown')
host = os.environ.get('PYPUSHFLOW_HOST', 'Unknown')
port = os.environ.get('PYPUSHFLOW_PORT', 'Unknown')
objectId = os.environ.get('PYPUSHFLOW_OBJECTID', str(bson.objectid.ObjectId()))
initiator = os.environ.get("PYPUSHFLOW_INITIATOR", "Unknown")
host = os.environ.get("PYPUSHFLOW_HOST", "Unknown")
port = os.environ.get("PYPUSHFLOW_PORT", "Unknown")
objectId = os.environ.get("PYPUSHFLOW_OBJECTID", str(bson.objectid.ObjectId()))
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
workflowData = {
'_id': bson.objectid.ObjectId(objectId),
'Request ID': objectId,
'startTime': getDateTimeString(),
'initiator': initiator,
'host': host,
'port': port,
'name': name,
'status': 'started',
'actors': []
"_id": bson.objectid.ObjectId(objectId),
"Request ID": objectId,
"startTime": getDateTimeString(),
"initiator": initiator,
"host": host,
"port": port,
"name": name,
"status": "started",
"actors": [],
}
insertOneResult = collection.insert_one(workflowData)
workflowId = insertOneResult.inserted_id
......@@ -77,29 +80,29 @@ def setMongoStatus(workflowId, status):
mongoUrl = getMongoUrl()
if mongoUrl is not None:
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
dictWorkflow = collection.find_one({'_id': workflowId})
dictWorkflow['status'] = status
collection.update_one({'_id': workflowId}, {"$set": dictWorkflow}, upsert=False)
dictWorkflow = collection.find_one({"_id": workflowId})
dictWorkflow["status"] = status
collection.update_one({"_id": workflowId}, {"$set": dictWorkflow}, upsert=False)
def setMongoAttribute(workflowId, attribute, value):
mongoUrl = getMongoUrl()
if mongoUrl is not None:
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
dictWorkflow = collection.find_one({'_id': workflowId})
dictWorkflow = collection.find_one({"_id": workflowId})
dictWorkflow[attribute] = value
collection.update_one({'_id': workflowId}, {"$set": dictWorkflow}, upsert=False)
collection.update_one({"_id": workflowId}, {"$set": dictWorkflow}, upsert=False)
def closeMongo(workflowId, status='finished'):
def closeMongo(workflowId, status="finished"):
mongoUrl = getMongoUrl()
if mongoUrl is not None:
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
dictWorkflow = collection.find_one({'_id': workflowId})
if dictWorkflow['status'] != 'error':
dictWorkflow['status'] = status
dictWorkflow['stopTime'] = getDateTimeString()
collection.update_one({'_id': workflowId}, {"$set": dictWorkflow}, upsert=False)
dictWorkflow = collection.find_one({"_id": workflowId})