Commit 2902c292 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

replace UtilsMongoDb

parent 690d444f
Pipeline #54701 passed with stages
in 44 seconds
......@@ -25,7 +25,6 @@ __date__ = "28/05/2019"
import logging
from pypushflow import UtilsMongoDb
from pypushflow.ThreadCountingActor import ThreadCountingActor
logger = logging.getLogger("pypushflow")
......@@ -54,30 +53,24 @@ class AbstractActor(ThreadCountingActor):
)
actor.trigger(inData)
def uploadInDataToMongo(self, actorData={}, script=None):
def uploadInDataToMongo(self, actorData=None, script=None):
if self.parent is not None:
if self.parent.mongoId is not None:
actorPath = self.getActorPath() + "/" + self.name
self.actorId = UtilsMongoDb.initActor(
workflowId=self.parent.mongoId,
name=actorPath,
actorData=actorData,
script=script,
)
def uploadOutDataToMongo(self, actorData={}, script=None):
if self.parent is not None:
if self.parent.mongoId is not None:
UtilsMongoDb.addDataToActor(
workflowId=self.parent.mongoId,
actorId=self.actorId,
actorData=actorData,
)
name = self.getActorPath() + "/" + self.name
if actorData:
info = dict(actorData)
else:
info = dict()
if script:
info["script"] = script
self.actorId = self.parent.db_client.startActor(name=name, info=info)
def uploadOutDataToMongo(self, actorData=None):
if actorData and self.actorId is not None:
self.parent.db_client.updateActorInfo(self.actorId, info=actorData)
def setMongoAttribute(self, attribute, value):
if self.parent is not None:
if self.parent.mongoId is not None:
UtilsMongoDb.setMongoAttribute(self.parent.mongoId, attribute, value)
if self.actorId is not None:
self.parent.db_client.updateActorInfo(self.actorId, info={attribute: value})
def getActorPath(self):
return self.parent.getActorPath()
......
......@@ -23,8 +23,6 @@ __authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
from pypushflow import UtilsMongoDb
from pypushflow.AbstractActor import AbstractActor
......@@ -33,6 +31,6 @@ class ErrorHandler(AbstractActor):
super().__init__(parent=parent, name=name, **kw)
def trigger(self, inData):
if self.parent is not None and hasattr(self.parent, "mongoId"):
UtilsMongoDb.setMongoStatus(self.parent.mongoId, "error")
if self.parent is not None:
self.parent.setStatus("error")
super().trigger(inData=inData)
......@@ -23,8 +23,6 @@ __authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
from pypushflow import UtilsMongoDb
from pypushflow.AbstractActor import AbstractActor
......@@ -34,10 +32,6 @@ class RequestStatus(AbstractActor):
self.status = status
def trigger(self, inData):
if (
self.parent is not None
and hasattr(self.parent, "mongoId")
and self.status is not None
):
UtilsMongoDb.setMongoStatus(self.parent.mongoId, self.status)
if self.status is not None and self.parent is not None:
self.parent.setStatus(self.status)
super().trigger(inData=inData)
......@@ -25,7 +25,6 @@ __date__ = "28/05/2019"
import logging
from pypushflow import UtilsMongoDb
from pypushflow import Submodel
from pypushflow.ThreadCountingActor import ThreadCountingActor
......@@ -74,11 +73,11 @@ class StopActor(ThreadCountingActor):
logger.debug(
"In {0}, parent {1}, finished".format(self.name, self.parent.name)
)
UtilsMongoDb.closeMongo(self.parent.mongoId, status="finished")
self.parent.setStatus("finished")
else:
logger.error(
"In {0}, parent {1}, timeout detected".format(
self.name, self.parent.name
)
)
UtilsMongoDb.closeMongo(self.parent.mongoId, status="timeout")
self.parent.setStatus("timeout")
......@@ -74,7 +74,6 @@ class Submodel:
thread_counter=None,
):
self.parent = parent
self.mongoId = self.parent.mongoId
self.name = name
if errorHandler is None:
self.errorHandler = parent
......@@ -87,6 +86,13 @@ class Submodel:
self, portName, thread_counter=thread_counter
)
@property
def db_client(self):
return self.parent.db_client
def setStatus(self, status):
self.parent.setStatus(status)
def getActorPath(self):
return self.parent.getActorPath() + "/" + self.name.replace("%", " ")
......
#
# Copyright (c) European Synchrotron Radiation Facility (ESRF)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
__authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "16/10/2019"
import os
import datetime
try:
import pymongo
import bson
from bson.objectid import ObjectId
USE_MONGODB = True
except ImportError:
print(
"Error when trying to import pymongo and/or bson - no MongoDB connection possible"
)
USE_MONGODB = False
def getDateTimeString():
return datetime.datetime.now()
def getMongoUrl():
mongoUrl = None
if USE_MONGODB:
mongoUrl = os.environ.get("PYPUSHFLOW_MONGOURL", None)
return mongoUrl
def initMongo(name):
workflowId = None
mongoUrl = getMongoUrl()
if mongoUrl is not None:
initiator = os.environ.get("PYPUSHFLOW_INITIATOR", "Unknown")
host = os.environ.get("PYPUSHFLOW_HOST", "Unknown")
port = os.environ.get("PYPUSHFLOW_PORT", "Unknown")
objectId = os.environ.get("PYPUSHFLOW_OBJECTID", str(bson.objectid.ObjectId()))
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
workflowData = {
"_id": bson.objectid.ObjectId(objectId),
"Request ID": objectId,
"startTime": getDateTimeString(),
"initiator": initiator,
"host": host,
"port": port,
"name": name,
"status": "started",
"actors": [],
}
insertOneResult = collection.insert_one(workflowData)
workflowId = insertOneResult.inserted_id
return workflowId
def setMongoStatus(workflowId, status):
mongoUrl = getMongoUrl()
if mongoUrl is not None:
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
dictWorkflow = collection.find_one({"_id": workflowId})
dictWorkflow["status"] = status
collection.update_one({"_id": workflowId}, {"$set": dictWorkflow}, upsert=False)
def setMongoAttribute(workflowId, attribute, value):
mongoUrl = getMongoUrl()
if mongoUrl is not None:
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
dictWorkflow = collection.find_one({"_id": workflowId})
dictWorkflow[attribute] = value
collection.update_one({"_id": workflowId}, {"$set": dictWorkflow}, upsert=False)
def closeMongo(workflowId, status="finished"):
mongoUrl = getMongoUrl()
if mongoUrl is not None:
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
dictWorkflow = collection.find_one({"_id": workflowId})
if dictWorkflow["status"] != "error":
dictWorkflow["status"] = status
dictWorkflow["stopTime"] = getDateTimeString()
collection.update_one({"_id": workflowId}, {"$set": dictWorkflow}, upsert=False)
def initActor(name, workflowId, actorData={}, script=None):
actorId = None
mongoUrl = getMongoUrl()
if mongoUrl is not None:
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
actorId = ObjectId()
actor = {
"_id": actorId,
"startTime": getDateTimeString(),
"name": name,
"status": "started",
"script": script,
}
actor.update(actorData)
collection.update({"_id": workflowId}, {"$push": {"actors": actor}})
return actorId
def addDataToActor(workflowId, actorId, actorData={}):
mongoUrl = getMongoUrl()
if mongoUrl is not None:
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
dictWorkflow = collection.find_one({"_id": workflowId})
for actor in dictWorkflow["actors"]:
if actor["_id"] == actorId:
actor.update(actorData)
break
collection.update_one({"_id": workflowId}, {"$set": dictWorkflow}, upsert=False)
......@@ -30,14 +30,15 @@ import logging
import pathlib
import logging.handlers
from pypushflow import UtilsMongoDb
from pypushflow.persistence import db_client
class Workflow(object):
def __init__(self, name):
self.name = name
self.listOnErrorActor = []
self.mongoId = UtilsMongoDb.initMongo(name=name)
self.db_client = db_client()
self.db_client.startWorkflow(name)
self.listActorRef = []
self.logger = self.initLogger(name)
......@@ -96,3 +97,6 @@ class Workflow(object):
logger.debug("Starting new workflow " + name)
logger.debug("")
return logger
def setStatus(self, status):
self.db_client.setWorkflowStatus(status)
#
# Copyright (c) European Synchrotron Radiation Facility (ESRF)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
__authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
import os
import logging
import unittest
from pypushflow import UtilsMongoDb
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("testPythonActor")
class TestUtilsMongoDb(unittest.TestCase):
def setUp(self):
os.environ[
"PYPUSHFLOW_MONGOURL"
] = "mongodb://pybes:pybes@linsvensson.esrf.fr:27017/pybes"
os.environ["PYPUSHFLOW_CREATOR"] = "TestUtilsMongoDb"
def tes_initMongo(self):
name = "test_initMongo"
workflowId = UtilsMongoDb.initMongo(name=name)
self.assertIsNotNone(workflowId)
def tes_initActor(self):
name = "test_initMongo"
workflowId = UtilsMongoDb.initMongo(name=name)
self.assertIsNotNone(workflowId)
actorName = "TestActor"
actorId = UtilsMongoDb.initActor(name=actorName, workflowId=workflowId)
self.assertIsNotNone(actorId)
def tes_addDataToActor(self):
name = "test_initMongo"
workflowId = UtilsMongoDb.initMongo(name=name)
self.assertIsNotNone(workflowId)
actorName1 = "TestActor1"
actorId1 = UtilsMongoDb.initActor(name=actorName1, workflowId=workflowId)
self.assertIsNotNone(actorId1)
actorName2 = "TestActor2"
UtilsMongoDb.initActor(name=actorName2, workflowId=workflowId)
inData = {"a": 1}
UtilsMongoDb.addDataToActor(
workflowId=workflowId, actorId=actorId1, actorData={"inData": inData}
)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment