GitLab will be upgraded on June 23rd evening. During the upgrade the service will be unavailable, sorry for the inconvenience.

Commit e7022207 authored by payno's avatar payno

format file to fit `black` style

parent f01b6f59
Pipeline #31666 passed with stages
in 38 seconds
......@@ -17,12 +17,12 @@
# -- Project information -----------------------------------------------------
project = 'pypushflow'
copyright = '2020, O. Svensson, H. Payno'
author = 'O. Svensson, H. Payno'
project = "pypushflow"
copyright = "2020, O. Svensson, H. Payno"
author = "O. Svensson, H. Payno"
# The full version, including alpha/beta/rc tags
release = '0.1'
release = "0.1"
# -- General configuration ---------------------------------------------------
......@@ -30,19 +30,17 @@ release = '0.1'
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.autodoc',
]
extensions = ["sphinx.ext.autodoc"]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
templates_path = ["_templates"]
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#
# This is also used if you do content translation via gettext catalogs.
# Usually you set "language" from the command line for these cases.
language = 'python'
language = "python"
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
......@@ -55,9 +53,9 @@ exclude_patterns = []
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = 'alabaster'
html_theme = "alabaster"
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
\ No newline at end of file
html_static_path = ["_static"]
......@@ -28,16 +28,17 @@ from pypushflow.addon import utils
from pypushflow.addon.classes import BaseActorAddOn
logger = logging.getLogger('pypushflow')
logger = logging.getLogger("pypushflow")
class AbstractActor(object):
"""
TODO
"""
def __init__(self, parent=None, name=None):
if name is None:
raise RuntimeError('Actor name is None!')
raise RuntimeError("Actor name is None!")
self.name = name
self.listDownStreamActor = []
self.parent = parent
......@@ -48,16 +49,16 @@ class AbstractActor(object):
self._add_ons.append(add_on_class())
def connect(self, actor):
logger.debug('Connecting actor "{0}" to actor "{1}"'.format(
self.name, actor.name
))
logger.debug(
'Connecting actor "{0}" to actor "{1}"'.format(self.name, actor.name)
)
self.listDownStreamActor.append(actor)
def trigger(self, inData):
for actor in self.listDownStreamActor:
logger.debug('In actor "{0}", triggering actor "{1}"'.format(
self.name, actor.name
))
logger.debug(
'In actor "{0}", triggering actor "{1}"'.format(self.name, actor.name)
)
self._process_pre_trigger_add_on(inData)
actor.trigger(inData)
self._process_post_trigger_add_on()
......@@ -82,6 +83,7 @@ class AbstractActor(object):
for _, classes in utils.get_registered_add_ons_classes().items():
for class_ in classes:
import inspect
if BaseActorAddOn in (inspect.getmro(class_)):
add_ons.append(class_)
return add_ons
......@@ -23,14 +23,13 @@ __authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
#from pypushflow import UtilsMongoDb
# from pypushflow import UtilsMongoDb
from pypushflow.AbstractActor import AbstractActor
class ErrorHandler(AbstractActor):
def __init__(self, parent=None, name='Error handler'):
def __init__(self, parent=None, name="Error handler"):
AbstractActor.__init__(self, parent=parent, name=name)
def trigger(self, inData):
......
......@@ -27,6 +27,5 @@ from pypushflow.AbstractActor import AbstractActor
class ForkActor(AbstractActor):
def __init__(self, parent=None, name='Fork actor'):
def __init__(self, parent=None, name="Fork actor"):
AbstractActor.__init__(self, parent=parent, name=name)
......@@ -27,8 +27,7 @@ from pypushflow.AbstractActor import AbstractActor
class JoinActor(AbstractActor):
def __init__(self, parent=None, name='Join actor'):
def __init__(self, parent=None, name="Join actor"):
AbstractActor.__init__(self, parent=parent, name=name)
self.numberOfThreads = 0
self.listInData = []
......@@ -63,9 +62,13 @@ class JoinUntilStopSignal(AbstractActor):
self.listDownStreamActor.append(actor)
def trigger(self, inData):
if type(inData) is dict and 'sig_type' in inData and inData['sig_type'] == 'stop':
if (
type(inData) is dict
and "sig_type" in inData
and inData["sig_type"] == "stop"
):
self._can_stop = True
self._nprocess_waited = inData['n_process']
self._nprocess_waited = inData["n_process"]
else:
self._nprocess_received += 1
......
This diff is collapsed.
......@@ -27,6 +27,5 @@ from pypushflow.AbstractActor import AbstractActor
class RequestStatus(AbstractActor):
def __init__(self, parent, name='Request status'):
def __init__(self, parent, name="Request status"):
AbstractActor.__init__(self, parent=parent, name=name)
......@@ -26,12 +26,14 @@ __date__ = "28/05/2019"
from pypushflow.AbstractActor import AbstractActor
import logging
logger = logging.getLogger('pypushflow')
logger = logging.getLogger("pypushflow")
class RouterActor(AbstractActor):
def __init__(self, parent, errorHandler=None, name='Router', itemName=None, listPort=[]):
class RouterActor(AbstractActor):
def __init__(
self, parent, errorHandler=None, name="Router", itemName=None, listPort=[]
):
AbstractActor.__init__(self, parent, name)
self.errorHandler = errorHandler
self.name = name
......@@ -39,29 +41,47 @@ class RouterActor(AbstractActor):
self.listPort = listPort
self.dictValues = {}
def connect(self, actor, expectedValue='other'):
if expectedValue != 'other' and not expectedValue in self.listPort:
raise RuntimeError('Port {0} not defined for router actor {1}!'.format(expectedValue, self.name))
def connect(self, actor, expectedValue="other"):
if expectedValue != "other" and not expectedValue in self.listPort:
raise RuntimeError(
"Port {0} not defined for router actor {1}!".format(
expectedValue, self.name
)
)
if expectedValue in self.dictValues:
self.dictValues[expectedValue].append(actor)
else:
else:
self.dictValues[expectedValue] = [actor]
def trigger(self, inData):
logger.debug('In router actor "{0}"'.format(self.name))
listActor = None
if self.itemName in inData and not inData[self.itemName] in [None, 'None', 'null']:
logger.debug('In router actor "{0}", itemName {1} in inData'.format(self.name, self.itemName))
if self.itemName in inData and not inData[self.itemName] in [
None,
"None",
"null",
]:
logger.debug(
'In router actor "{0}", itemName {1} in inData'.format(
self.name, self.itemName
)
)
value = inData[self.itemName]
logger.debug('In router actor "{0}", value = {1}'.format(self.name, value))
if not isinstance(value, dict) and value in self.dictValues:
listActor = self.dictValues[value]
if listActor is None:
logger.debug('In router actor "{0}", actor is None')
if 'other' in self.dictValues:
listActor = self.dictValues['other']
if "other" in self.dictValues:
listActor = self.dictValues["other"]
else:
raise RuntimeError('No "other" port for router actor "{0}"'.format(self.name))
raise RuntimeError(
'No "other" port for router actor "{0}"'.format(self.name)
)
for actor in listActor:
logger.debug('In router actor "{0}", triggering actor "{1}"'.format(self.name, actor.name))
logger.debug(
'In router actor "{0}", triggering actor "{1}"'.format(
self.name, actor.name
)
)
actor.trigger(inData)
......@@ -30,5 +30,6 @@ class StartActor(AbstractActor):
"""
TODO
"""
def __init__(self, parent=None, name='Start actor'):
def __init__(self, parent=None, name="Start actor"):
AbstractActor.__init__(self, parent=parent, name=name)
......@@ -28,12 +28,11 @@ import multiprocessing
# from pypushflow import UtilsMongoDb
logger = logging.getLogger('pypushflow')
logger = logging.getLogger("pypushflow")
class StopActor(object):
def __init__(self, parent=None, errorHandler=None, name='Stop actor'):
def __init__(self, parent=None, errorHandler=None, name="Stop actor"):
self.errorHandler = errorHandler
self.name = name
self.lock = multiprocessing.Lock()
......@@ -42,7 +41,9 @@ class StopActor(object):
self.parent = parent
def trigger(self, inData):
logger.debug('In trigger {0}, errorHandler = {1}'.format(self.name, self.errorHandler))
logger.debug(
"In trigger {0}, errorHandler = {1}".format(self.name, self.errorHandler)
)
# if self.parent is not None and hasattr(self.parent, 'mongoId'):
# UtilsMongoDb.closeMongo(self.parent.mongoId)
if self.errorHandler is not None:
......
......@@ -25,13 +25,13 @@ __date__ = "28/05/2019"
import logging
logger = logging.getLogger('pypushflow')
logger = logging.getLogger("pypushflow")
class Port(object):
class Port(object):
def __init__(self, errorHandler, name):
self.name = errorHandler.name + '.' + name
self.name = errorHandler.name + "." + name
self.errorHandler = errorHandler
self.listActor = []
self.inPortTrigger = None
......@@ -47,16 +47,23 @@ class Port(object):
logger.debug("In {0} trigger".format(self.name))
if len(self.listActor) > 0:
for actor in self.listActor:
logger.debug("In trigger {0} -> actorName {1}".format(self.errorHandler.name, actor.name))
logger.debug(
"In trigger {0} -> actorName {1}".format(
self.errorHandler.name, actor.name
)
)
actor.trigger(*args, **kwargs)
if self.inPortTrigger is not None:
logger.debug("In {0} trigger, trigger = {1}".format(self.errorHandler.name, self.inPortTrigger))
logger.debug(
"In {0} trigger, trigger = {1}".format(
self.errorHandler.name, self.inPortTrigger
)
)
self.inPortTrigger(*args, **kwargs)
class Submodel(object):
def __init__(self, parent, errorHandler=None, name=None, portNames=['In', 'Out']):
def __init__(self, parent, errorHandler=None, name=None, portNames=["In", "Out"]):
self.parent = parent
# self.mongoId = self.parent.mongoId
self.name = name
......@@ -67,23 +74,35 @@ class Submodel(object):
self.dictPort[portName] = Port(self, portName)
def getActorPath(self):
return self.parent.getActorPath() + '/' + self.name.replace('%', ' ')
return self.parent.getActorPath() + "/" + self.name.replace("%", " ")
def getPort(self, portName):
logger.debug("In {0} getPort, portName = {1}".format(self.name, portName))
return self.dictPort[portName]
def connect(self, actor, portName='Out'):
logger.debug("In {0} connect, portName = {2} -> actorName = {1}".format(self.name, actor.name, portName))
def connect(self, actor, portName="Out"):
logger.debug(
"In {0} connect, portName = {2} -> actorName = {1}".format(
self.name, actor.name, portName
)
)
self.dictPort[portName].connect(actor)
def connectOnError(self, actor):
logger.debug("In connectOnError in subModule {0}, actor name {1}".format(self.name, actor.name))
logger.debug(
"In connectOnError in subModule {0}, actor name {1}".format(
self.name, actor.name
)
)
self.listOnErrorActor.append(actor)
def triggerOnError(self, *args, **kwargs):
for onErrorActor in self.listOnErrorActor:
logger.debug("In triggerOnError in subModule {0}, trigger actor {1}, inData = {2}".format(self.name, onErrorActor.name, args[0]))
logger.debug(
"In triggerOnError in subModule {0}, trigger actor {1}, inData = {2}".format(
self.name, onErrorActor.name, args[0]
)
)
onErrorActor.trigger(*args, **kwargs)
if self.errorHandler is not None:
self.errorHandler.triggerOnError(*args, **kwargs)
\ No newline at end of file
self.errorHandler.triggerOnError(*args, **kwargs)
......@@ -36,28 +36,28 @@ def getDateTimeString():
def getMongoUrl():
return os.environ.get('PYPUSHFLOW_MONGOURL', None)
return os.environ.get("PYPUSHFLOW_MONGOURL", None)
def initMongo(name):
workflowId = None
mongoUrl = getMongoUrl()
if mongoUrl is not None:
initiator = os.environ.get('PYPUSHFLOW_INITIATOR', 'Unknown')
host = os.environ.get('PYPUSHFLOW_HOST', 'Unknown')
port = os.environ.get('PYPUSHFLOW_PORT', 'Unknown')
objectId = os.environ.get('PYPUSHFLOW_OBJECTID', str(bson.objectid.ObjectId()))
initiator = os.environ.get("PYPUSHFLOW_INITIATOR", "Unknown")
host = os.environ.get("PYPUSHFLOW_HOST", "Unknown")
port = os.environ.get("PYPUSHFLOW_PORT", "Unknown")
objectId = os.environ.get("PYPUSHFLOW_OBJECTID", str(bson.objectid.ObjectId()))
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
workflowData = {
'_id': bson.objectid.ObjectId(objectId),
'Request ID': objectId,
'startTime': getDateTimeString(),
'initiator': initiator,
'host': host,
'port': port,
'name': name,
'status': 'started',
'actors': []
"_id": bson.objectid.ObjectId(objectId),
"Request ID": objectId,
"startTime": getDateTimeString(),
"initiator": initiator,
"host": host,
"port": port,
"name": name,
"status": "started",
"actors": [],
}
insertOneResult = collection.insert_one(workflowData)
workflowId = insertOneResult.inserted_id
......@@ -68,20 +68,20 @@ def setMongoStatus(workflowId, status):
mongoUrl = getMongoUrl()
if mongoUrl is not None:
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
dictWorkflow = collection.find_one({'_id': workflowId})
dictWorkflow['status'] = status
collection.update_one({'_id': workflowId}, {"$set": dictWorkflow}, upsert=False)
dictWorkflow = collection.find_one({"_id": workflowId})
dictWorkflow["status"] = status
collection.update_one({"_id": workflowId}, {"$set": dictWorkflow}, upsert=False)
def closeMongo(workflowId):
mongoUrl = getMongoUrl()
if mongoUrl is not None:
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
dictWorkflow = collection.find_one({'_id': workflowId})
if dictWorkflow['status'] != 'error':
dictWorkflow['status'] = 'finished'
dictWorkflow['stopTime'] = getDateTimeString()
collection.update_one({'_id': workflowId}, {"$set": dictWorkflow}, upsert=False)
dictWorkflow = collection.find_one({"_id": workflowId})
if dictWorkflow["status"] != "error":
dictWorkflow["status"] = "finished"
dictWorkflow["stopTime"] = getDateTimeString()
collection.update_one({"_id": workflowId}, {"$set": dictWorkflow}, upsert=False)
def initActor(name, workflowId, actorData={}, script=None):
......@@ -91,14 +91,14 @@ def initActor(name, workflowId, actorData={}, script=None):
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
actorId = ObjectId()
actor = {
'_id': actorId,
'startTime': getDateTimeString(),
'name': name,
'status': 'started',
'script': script
"_id": actorId,
"startTime": getDateTimeString(),
"name": name,
"status": "started",
"script": script,
}
actor.update(actorData)
collection.update({'_id': workflowId}, {'$push': {'actors': actor}})
collection.update({"_id": workflowId}, {"$push": {"actors": actor}})
return actorId
......@@ -106,9 +106,9 @@ def addDataToActor(workflowId, actorId, actorData={}):
mongoUrl = getMongoUrl()
if mongoUrl is not None:
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
dictWorkflow = collection.find_one({'_id': workflowId})
for actor in dictWorkflow['actors']:
if actor['_id'] == actorId:
dictWorkflow = collection.find_one({"_id": workflowId})
for actor in dictWorkflow["actors"]:
if actor["_id"] == actorId:
actor.update(actorData)
break
collection.update_one({'_id': workflowId}, {"$set": dictWorkflow}, upsert=False)
collection.update_one({"_id": workflowId}, {"$set": dictWorkflow}, upsert=False)
......@@ -19,7 +19,7 @@
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
__authors__ = ["O. Svensson", 'H. Payno']
__authors__ = ["O. Svensson", "H. Payno"]
__license__ = "MIT"
__date__ = "28/05/2019"
......@@ -35,7 +35,7 @@ from pypushflow.addon import utils
from pypushflow.addon.classes import BaseWorkflowAddOn
logger = logging.getLogger('pypushflow')
logger = logging.getLogger("pypushflow")
class _BaseWorkflow:
......@@ -43,15 +43,16 @@ class _BaseWorkflow:
self._configuration = configuration or {}
self._add_ons = []
for add_on_class in self._getAddOnsClasses():
self._add_ons.append(add_on_class(workflow=self,
configuration=self._configuration
))
self._add_ons.append(
add_on_class(workflow=self, configuration=self._configuration)
)
def _getAddOnsClasses(self):
add_ons = []
for _, classes in utils.get_registered_add_ons_classes().items():
for class_ in classes:
import inspect
if BaseWorkflowAddOn in (inspect.getmro(class_)):
add_ons.append(class_)
return add_ons
......@@ -59,13 +60,18 @@ class _BaseWorkflow:
class Workflow(_BaseWorkflow):
"""TODO"""
def __init__(self, name):
super(Workflow, self).__init__()
self.name = name
self.listOnErrorActor = []
def connectOnError(self, actor):
logger.debug("In connectOnError in subModule {0}, actor name {1}".format(self.name, actor.name))
logger.debug(
"In connectOnError in subModule {0}, actor name {1}".format(
self.name, actor.name
)
)
self.listOnErrorActor.append(actor)
def triggerOnError(self, inData):
......@@ -75,7 +81,7 @@ class Workflow(_BaseWorkflow):
onErrorActor.trigger(inData)
def getActorPath(self):
return '/' + self.name
return "/" + self.name
class ProcessableWorkflow(_BaseWorkflow):
......@@ -95,11 +101,10 @@ class ProcessableWorkflow(_BaseWorkflow):
self._actor_factory = {}
for node in self._representation.nodes:
name = '-'.join((str(node.id), node._process_pt))
self._actor_factory[node] = ActorFactory(parent=None,
name=name,
node=node,
errorHandler=None)
name = "-".join((str(node.id), node._process_pt))
self._actor_factory[node] = ActorFactory(
parent=None, name=name, node=node, errorHandler=None
)
# deal with connect
for node in self._representation.nodes:
......@@ -123,7 +128,7 @@ class ProcessableWorkflow(_BaseWorkflow):
self._end_actor = StopActor()
if self.has_final_join():
self._join_actor = JoinUntilStopSignal('stop join')
self._join_actor = JoinUntilStopSignal("stop join")
connect_finals_nodes(self._join_actor)
self._join_actor.connect(self._end_actor)
else:
......@@ -143,10 +148,10 @@ class Converter:
"""
Write a Workflow to a python file which can be executed later
"""
def __init__(self, workflow, output_file):
if not isinstance(workflow, _BaseWorkflow):
raise TypeError(
'workflow should be an instance of `_BaseWorkflow`')
raise TypeError("workflow should be an instance of `_BaseWorkflow`")
self.workflow = workflow
self.output_file = output_file
......@@ -159,68 +164,88 @@ class Converter:
self._close_main_section()
def _write_main_section(self):
with open(self.output_file, 'a') as file_:
file_.write('\n\n')
file_.write('def main(input_data, channel):\n')
with open(self.output_file, "a") as file_:
file_.write("\n\n")
file_.write("def main(input_data, channel):\n")
def _close_main_section(self):
with open(self.output_file, 'a') as file_:
file_.write('\n\n')
with open(self.output_file, "a") as file_:
file_.write("\n\n")
def _write_import(self):
with open(self.output_file, 'a') as file_:
file_.write('\n')
with open(self.output_file, "a") as file_:
file_.write("\n")
for node in self.workflow._representation.nodes:
class_name, mod_name = node.get_class_name_and_module_name()
file_.write('import {}\n'.format(mod_name))
file_.write("import {}\n".format(mod_name))
# create logger
file_.write('{}\n{}\n'.format('import logging',
'_logger = logging.getLogger(__name__)'))
file_.write(
"{}\n{}\n".format(
"import logging", "_logger = logging.getLogger(__name__)"
)
)
# add import ignore process
file_.write('{}\n'.format('from tomwer.core.process.utils import IgnoreProcess'))
file_.write(
"{}\n".format("from tomwer.core.process.utils import IgnoreProcess")
)
def _write_processes_creation(self):
with open(self.output_file, 'a') as file_:
file_.write('\n')
with open(self.output_file, "a") as file_:
file_.write("\n")
for node in self.workflow._representation.nodes:
class_name, mod_name = node.get_class_name_and_module_name()
file_.write(' process_{} = {}.{}()\n'.format(node.id, mod_name, class_name))
if hasattr(node.class_instance, 'set_properties'):
file_.write(
" process_{} = {}.{}()\n".format(node.id, mod_name, class_name)
)
if hasattr(node.class_instance, "set_properties"):
# filter some orange properties