Commit 95dd2ca5 authored by Olof Svensson's avatar Olof Svensson
Browse files

Initial commit

parents
<component name="ProjectDictionaryState">
<dictionary name="svensson">
<words>
<w>indata</w>
<w>pypushflow</w>
</words>
</dictionary>
</component>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyPep8NamingInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredErrors">
<list>
<option value="N806" />
<option value="N803" />
<option value="N802" />
</list>
</option>
</inspection_tool>
</profile>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.7 (edna2)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/pypushflow.iml" filepath="$PROJECT_DIR$/.idea/pypushflow.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/pypushflow" isTestSource="false" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="TestRunnerService">
<option name="projectConfiguration" value="pytest" />
<option name="PROJECT_TEST_RUNNER" value="pytest" />
</component>
</module>
\ No newline at end of file
This diff is collapsed.
#
# Copyright (c) European Synchrotron Radiation Facility (ESRF)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
__authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
import logging
logger = logging.getLogger('pypushflow')
class AbstractActor(object):
def __init__(self, name=None):
if name is None:
raise RuntimeError('Actor name is None!')
self.name = name
self.listDownStreamActor = []
def connect(self, actor):
logger.debug('Connecting actor "{0}" to actor "{1}"'.format(
self.name, actor.name
))
self.listDownStreamActor.append(actor)
def trigger(self, inData):
for actor in self.listDownStreamActor:
logger.debug('In actor "{0}", triggering actor "{1}"'.format(
self.name, actor.name
))
actor.trigger(inData)
#
# Copyright (c) European Synchrotron Radiation Facility (ESRF)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
__authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
from pypushflow.AbstractActor import AbstractActor
class ErrorHandler(AbstractActor):
def __init__(self, name='Error handler'):
AbstractActor.__init__(self, name)
#
# Copyright (c) European Synchrotron Radiation Facility (ESRF)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
__authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
from pypushflow.AbstractActor import AbstractActor
class ForkActor(AbstractActor):
def __init__(self, name='Fork actor'):
AbstractActor.__init__(self, name)
#
# Copyright (c) European Synchrotron Radiation Facility (ESRF)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
__authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
from pypushflow.AbstractActor import AbstractActor
class JoinActor(AbstractActor):
def __init__(self, name='Join actor'):
AbstractActor.__init__(self, name)
self.numberOfThreads = 0
self.listInData = []
def increaseNumberOfThreads(self):
self.numberOfThreads += 1
def trigger(self, inData):
self.listInData.append(inData)
if len(self.listInData) == self.numberOfThreads:
newInData = {}
for data in self.listInData:
newInData.update(data)
for actor in self.listDownStreamActor:
actor.trigger(newInData)
\ No newline at end of file
#
# Copyright (c) European Synchrotron Radiation Facility (ESRF)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
__authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
import os
import time
import pprint
import logging
import traceback
import functools
import importlib
import multiprocessing
from pypushflow.AbstractActor import AbstractActor
logger = logging.getLogger('pypushflow')
class WorkflowException(Exception):
def __init__(self, errorMessage="", traceBack="", data={}, msg=None):
super(WorkflowException, self).__init__(msg)
self.errorMessage = errorMessage
self.data = data
self.traceBack = traceBack
def trace_unhandled_exceptions(func):
@functools.wraps(func)
def wrapped_func(*args, **kwargs):
try:
outData = func(*args, **kwargs)
except Exception as e:
errorMessage = '{0}'.format(e)
logger.exception(errorMessage)
traceBack = traceback.format_exc()
return WorkflowException(
errorMessage=errorMessage,
traceBack=traceBack,
data=args[1]
)
return outData
return wrapped_func
class AsyncFactory:
def __init__(self, func, callback=None, errorCallback=None):
self.func = func
self.callback = callback
self.errorCallback = errorCallback
self.pool = multiprocessing.Pool()
def call(self, *args, **kwargs):
logger.debug('Before apply_async, func={0}, callback={1}, errorCallback={2}'.format(
self.func, self.callback, self.errorCallback))
logger.debug('args={0}, kwargs={1}'.format(args, kwargs))
if self.callback is None:
self.pool.apply_async(self.func, args, kwargs)
elif self.errorCallback is None:
self.pool.apply_async(self.func, args, kwargs, self.callback)
else:
self.pool.apply_async(self.func, args, kwargs, self.callback, self.errorCallback)
logger.debug('After apply_async')
def wait(self):
self.pool.close()
self.pool.join()
class ActorWrapper(object):
def __init__(self, name, method):
self.name = name
self.method = method
@trace_unhandled_exceptions
def run(self, *args, **kwargs):
logger.debug('In actor wrapper for {0}'.format(self.name))
logger.debug('args={0}, kwargs={1}, method={2}'.format(args, kwargs, self.method))
inData = args[0]
outData = self.method(**inData)
inData.update(outData)
return inData
class PythonActor(AbstractActor):
def __init__(self, name='Python Actor', parent=None, script=None):
AbstractActor.__init__(self, name)
self.parent = parent
self.listErrorHandler = []
# Import script
module = importlib.import_module(os.path.splitext(script)[0])
self.actorWrapper = ActorWrapper(name, module.run)
self.outData = None
self.af = None
def connectOnError(self, errorHandler):
self.listErrorHandler.append(errorHandler)
def trigger(self, inData):
logger.debug('In trigger {0}, inData = {1}'.format(self.name, pprint.pformat(inData)))
if isinstance(inData, WorkflowException):
logger.error('Error from previous actor! Not running actor {0}'.format(self.name))
if self.parent is not None:
workflowException = inData
oldInData = workflowException.data
exceptionDict = {
'errorMessage': workflowException.errorMessage,
'traceBack': workflowException.traceBack,
}
oldInData['WorkflowException'] = exceptionDict
self.triggerOnError(oldInData)
elif len(self.listDownStreamActor) == 0:
self.af = AsyncFactory(self.actorWrapper.run)
self.af.call(inData)
# elif self.parent is None:
# self.af = AsyncFactory(self.actorWrapper.run,
# callback=self.triggerDownStreamActor,
# errorCallback=self.errorHandler
# )
# self.af.call(inData)
else:
self.af = AsyncFactory(self.actorWrapper.run,
callback=self.triggerDownStreamActor,
errorCallback=self.errorHandler)
self.af.call(inData)
def errorHandler(self, exception):
logger.error('Error when running actor {0}!'.format(self.name))
workflowException = WorkflowException(
errorMessage=exception,
traceBack=None,
data=None
)
inData = {'WorkflowException': workflowException}
logger.error(exception)
for errorHandler in self.listErrorHandler:
errorHandler.trigger(inData)
if self.parent is not None:
self.parent.triggerOnError(inData)
def triggerDownStreamActor(self, inData={}):
logger.debug('In triggerDownStreamActor for {0}'.format(self.name))
if isinstance(inData, WorkflowException):
logger.error('Error from previous actor! Not running down stream actors {0}'.format([actor.name for actor in self.listDownStreamActor]))
workflowException = inData
oldInData = workflowException.data
exceptionDict = {
'errorMessage': workflowException.errorMessage,
'traceBack': workflowException.traceBack,
}
oldInData['WorkflowException'] = exceptionDict
for errorHandler in self.listErrorHandler:
errorHandler.trigger(oldInData)
if self.parent is not None:
logger.error('Trigger on error on parent {0}'.format(self.parent.name))
self.parent.triggerOnError(inData=oldInData)
else:
for downStreamActor in self.listDownStreamActor:
logger.debug('In trigger {0}, triggering actor {1}, inData={2}'.format(self.name, downStreamActor.name, inData))
downStreamActor.trigger(inData)
#
# Copyright (c) European Synchrotron Radiation Facility (ESRF)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
__authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
from pypushflow.AbstractActor import AbstractActor
class RequestStatus(AbstractActor):
def __init__(self, name='Request status'):
AbstractActor.__init__(self, name)
#
# Copyright (c) European Synchrotron Radiation Facility (ESRF)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
__authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
from pypushflow.AbstractActor import AbstractActor
import logging
logger = logging.getLogger('pypushflow')
class RouterActor(AbstractActor):
def __init__(self, parent=None, name='Router', itemName=None, listPort=[]):
AbstractActor.__init__(self, name)
self.parent=parent
self.name = name
self.itemName = itemName
self.listPort = listPort
self.dictValues = {}
def connect(self, actor, expectedValue='other'):
if expectedValue != 'other' and not expectedValue in self.listPort:
raise RuntimeError('Port {0} not defined for router actor {1}!'.format(expectedValue, self.name))
if expectedValue in self.dictValues:
self.dictValues[expectedValue].append(actor)
else:
self.dictValues[expectedValue] = [actor]
def trigger(self, inData):
logger.debug('In router actor "{0}"'.format(self.name))
listActor = None
if self.itemName in inData and not inData[self.itemName] in [None, 'None', 'null']:
logger.debug('In router actor "{0}", itemName {1} in inData'.format(self.name, self.itemName))
value = inData[self.itemName]
logger.debug('In router actor "{0}", value = {1}'.format(self.name, value))
if not isinstance(value, dict) and value in self.dictValues:
listActor = self.dictValues[value]
if listActor is None:
logger.debug('In router actor "{0}", actor is None')
if 'other' in self.dictValues:
listActor = self.dictValues['other']
else:
raise RuntimeError('No "other" port for router actor "{0}"'.format(self.name))
for actor in listActor:
logger.debug('In router actor "{0}", triggering actor "{1}"'.format(self.name, actor.name))
actor.trigger(inData)
#
# Copyright (c) European Synchrotron Radiation Facility (ESRF)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
__authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
from pypushflow.AbstractActor import AbstractActor
class StartActor(AbstractActor):
def __init__(self, name='Start actor'):
AbstractActor.__init__(self, name)
#
# Copyright (c) European Synchrotron Radiation Facility (ESRF)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
__authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
import logging
import multiprocessing
logger = logging.getLogger('pypushflow')
class StopActor(object):
def __init__(self, parent=None, name='Stop actor'):
self.parent=parent
self.name = name
self.lock = multiprocessing.Lock()
self.lock.acquire()
self.outData = None
def trigger(self, inData):
logger.debug('In trigger {0}, parent = {1}'.format(self.name, self.parent))
if self.parent is not None:
self.parent.parent.stopActor.trigger(inData)
else:
self.outData = inData
self.lock.release()
def join(self, timeout=7200):
self.lock.acquire(timeout=timeout)
#
# Copyright (c) European Synchrotron Radiation Facility (ESRF)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.