GitLab will be upgraded on June 23rd evening. During the upgrade the service will be unavailable, sorry for the inconvenience.

Commit fc455539 authored by Operator for ID30's avatar Operator for ID30

Now updates the mongodb database in case of timeout

parent 93c4bd95
......@@ -46,16 +46,21 @@ class StopActor(object):
logger.debug('In trigger {0}, errorHandler = {1}'.format(self.name, self.errorHandler))
# Check if parent is workflow
if self.parent is not None and not isinstance(self.parent, Submodel.Submodel):
UtilsMongoDb.closeMongo(self.parent.mongoId)
UtilsMongoDb.closeMongo(self.parent.mongoId, status='finished')
self.outData = inData
logger.error('In {0}, parent {1}, before lock.release'.format(self.name, self.parent.name))
logger.debug('In {0}, parent {1}, before lock.release'.format(self.name, self.parent.name))
self.lock.release()
logger.error('In {0}, parent {1}, after lock.release'.format(self.name, self.parent.name))
logger.debug('In {0}, parent {1}, after lock.release'.format(self.name, self.parent.name))
else:
self.errorHandler.errorHandler.stopActor.trigger(inData)
def join(self, timeout=7200):
logger.error('In {0}, parent {1}, before lock.acquire'.format(self.name, self.parent.name))
self.lock.acquire(timeout=timeout)
logger.error('In {0}, parent {1}, after lock.acquire'.format(self.name, self.parent.name))
logger.debug('In {0}, parent {1}, before lock.acquire'.format(self.name, self.parent.name))
success = self.lock.acquire(timeout=timeout)
logger.debug('In {0}, parent {1}, after lock.acquire'.format(self.name, self.parent.name))
if not success:
logger.error('In {0}, parent {1}, timeout detected'.format(self.name, self.parent.name))
if self.parent is not None:
UtilsMongoDb.closeMongo(self.parent.mongoId, status='timeout')
......@@ -73,13 +73,13 @@ def setMongoStatus(workflowId, status):
collection.update_one({'_id': workflowId}, {"$set": dictWorkflow}, upsert=False)
def closeMongo(workflowId):
def closeMongo(workflowId, status='finished'):
mongoUrl = getMongoUrl()
if mongoUrl is not None:
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
dictWorkflow = collection.find_one({'_id': workflowId})
if dictWorkflow['status'] != 'error':
dictWorkflow['status'] = 'finished'
dictWorkflow['status'] = status
dictWorkflow['stopTime'] = getDateTimeString()
collection.update_one({'_id': workflowId}, {"$set": dictWorkflow}, upsert=False)
......
......@@ -135,6 +135,23 @@ class Submodel2(Submodel):
self.pythonActor.connect(self.getPort('Out'))
class Submodel3(Submodel):
"""
Submodel containing one python actor which has a long execution time
"""
def __init__(self, parent, name):
Submodel.__init__(self, parent, name=name)
self.pythonActor = PythonActor(
parent=self,
script='pypushflow.test.pythonLongExecutionTest.py',
name='Python Long Execution Test',
errorHandler=self
)
self.getPort('In').connect(self.pythonActor)
self.pythonActor.connect(self.getPort('Out'))
class Workflow4(Workflow):
"""
Workflow containing one start actor,
......@@ -153,3 +170,23 @@ class Workflow4(Workflow):
self.startActor.trigger(inData)
self.stopActor.join(timeout=5)
return self.stopActor.outData
class Workflow5(Workflow):
"""
Workflow containing one start actor,
one submodel which has a long execution and one stop actor with short timeout.
"""
def __init__(self, name):
Workflow.__init__(self, name)
self.startActor = StartActor(self)
self.submodel3 = Submodel3(self, name='Submodel 3')
self.stopActor = StopActor(self)
self.startActor.connect(self.submodel3.getPort('In'))
self.submodel3.getPort('Out').connect(self.stopActor)
def run(self, inData):
self.startActor.trigger(inData)
self.stopActor.join(timeout=5)
return self.stopActor.outData
#
# Copyright (c) European Synchrotron Radiation Facility (ESRF)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
__authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
import time
def run(name, **kwargs):
reply = None
# This actor sleeps for a long time (1 minute)
time.sleep(10)
reply = 'Hello ' + name + '!'
return {'reply': reply}
......@@ -30,6 +30,7 @@ import unittest
from pypushflow.test.TestWorkflow import Workflow1
from pypushflow.test.TestWorkflow import Workflow2
from pypushflow.test.TestWorkflow import Workflow3
from pypushflow.test.TestWorkflow import Workflow5
logging.basicConfig(level=logging.DEBUG)
......@@ -70,3 +71,8 @@ class TestWorkflows(unittest.TestCase):
self.assertIsNotNone(outData)
self.assertTrue('WorkflowException' in outData)
def test_Workflow5(self):
testWorkflow5 = Workflow5('Test workflow 5')
inData = {'name': 'Dog'}
outData = testWorkflow5.run(inData)
self.assertIsNone(outData)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment