Commit e8dd5d88 authored by Olof Svensson's avatar Olof Svensson
Browse files

Merge branch '8-make-moml-logging-configurable' into 'main'

Resolve "Make Mongo logging configurable"

Closes #8

See merge request !12
parents 62fae3fa 2902c292
Pipeline #55193 passed with stages
in 47 seconds
......@@ -25,7 +25,6 @@ __date__ = "28/05/2019"
import logging
from pypushflow import UtilsMongoDb
from pypushflow.ThreadCountingActor import ThreadCountingActor
logger = logging.getLogger("pypushflow")
......@@ -54,30 +53,24 @@ class AbstractActor(ThreadCountingActor):
)
actor.trigger(inData)
def uploadInDataToMongo(self, actorData={}, script=None):
def uploadInDataToMongo(self, actorData=None, script=None):
if self.parent is not None:
if self.parent.mongoId is not None:
actorPath = self.getActorPath() + "/" + self.name
self.actorId = UtilsMongoDb.initActor(
workflowId=self.parent.mongoId,
name=actorPath,
actorData=actorData,
script=script,
)
def uploadOutDataToMongo(self, actorData={}, script=None):
if self.parent is not None:
if self.parent.mongoId is not None:
UtilsMongoDb.addDataToActor(
workflowId=self.parent.mongoId,
actorId=self.actorId,
actorData=actorData,
)
name = self.getActorPath() + "/" + self.name
if actorData:
info = dict(actorData)
else:
info = dict()
if script:
info["script"] = script
self.actorId = self.parent.db_client.startActor(name=name, info=info)
def uploadOutDataToMongo(self, actorData=None):
if actorData and self.actorId is not None:
self.parent.db_client.updateActorInfo(self.actorId, info=actorData)
def setMongoAttribute(self, attribute, value):
if self.parent is not None:
if self.parent.mongoId is not None:
UtilsMongoDb.setMongoAttribute(self.parent.mongoId, attribute, value)
if self.actorId is not None:
self.parent.db_client.updateActorInfo(self.actorId, info={attribute: value})
def getActorPath(self):
return self.parent.getActorPath()
......
......@@ -23,8 +23,6 @@ __authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
from pypushflow import UtilsMongoDb
from pypushflow.AbstractActor import AbstractActor
......@@ -33,6 +31,6 @@ class ErrorHandler(AbstractActor):
super().__init__(parent=parent, name=name, **kw)
def trigger(self, inData):
if self.parent is not None and hasattr(self.parent, "mongoId"):
UtilsMongoDb.setMongoStatus(self.parent.mongoId, "error")
if self.parent is not None:
self.parent.setStatus("error")
super().trigger(inData=inData)
......@@ -23,8 +23,6 @@ __authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
from pypushflow import UtilsMongoDb
from pypushflow.AbstractActor import AbstractActor
......@@ -34,10 +32,6 @@ class RequestStatus(AbstractActor):
self.status = status
def trigger(self, inData):
if (
self.parent is not None
and hasattr(self.parent, "mongoId")
and self.status is not None
):
UtilsMongoDb.setMongoStatus(self.parent.mongoId, self.status)
if self.status is not None and self.parent is not None:
self.parent.setStatus(self.status)
super().trigger(inData=inData)
......@@ -25,7 +25,6 @@ __date__ = "28/05/2019"
import logging
from pypushflow import UtilsMongoDb
from pypushflow import Submodel
from pypushflow.ThreadCountingActor import ThreadCountingActor
......@@ -74,11 +73,11 @@ class StopActor(ThreadCountingActor):
logger.debug(
"In {0}, parent {1}, finished".format(self.name, self.parent.name)
)
UtilsMongoDb.closeMongo(self.parent.mongoId, status="finished")
self.parent.setStatus("finished")
else:
logger.error(
"In {0}, parent {1}, timeout detected".format(
self.name, self.parent.name
)
)
UtilsMongoDb.closeMongo(self.parent.mongoId, status="timeout")
self.parent.setStatus("timeout")
......@@ -74,7 +74,6 @@ class Submodel:
thread_counter=None,
):
self.parent = parent
self.mongoId = self.parent.mongoId
self.name = name
if errorHandler is None:
self.errorHandler = parent
......@@ -87,6 +86,13 @@ class Submodel:
self, portName, thread_counter=thread_counter
)
@property
def db_client(self):
return self.parent.db_client
def setStatus(self, status):
self.parent.setStatus(status)
def getActorPath(self):
return self.parent.getActorPath() + "/" + self.name.replace("%", " ")
......
#
# Copyright (c) European Synchrotron Radiation Facility (ESRF)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
__authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "16/10/2019"
import os
import datetime
try:
import pymongo
import bson
from bson.objectid import ObjectId
USE_MONGODB = True
except ImportError:
print(
"Error when trying to import pymongo and/or bson - no MongoDB connection possible"
)
USE_MONGODB = False
def getDateTimeString():
return datetime.datetime.now()
def getMongoUrl():
mongoUrl = None
if USE_MONGODB:
mongoUrl = os.environ.get("PYPUSHFLOW_MONGOURL", None)
return mongoUrl
def initMongo(name):
workflowId = None
mongoUrl = getMongoUrl()
if mongoUrl is not None:
initiator = os.environ.get("PYPUSHFLOW_INITIATOR", "Unknown")
host = os.environ.get("PYPUSHFLOW_HOST", "Unknown")
port = os.environ.get("PYPUSHFLOW_PORT", "Unknown")
objectId = os.environ.get("PYPUSHFLOW_OBJECTID", str(bson.objectid.ObjectId()))
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
workflowData = {
"_id": bson.objectid.ObjectId(objectId),
"Request ID": objectId,
"startTime": getDateTimeString(),
"initiator": initiator,
"host": host,
"port": port,
"name": name,
"status": "started",
"actors": [],
}
insertOneResult = collection.insert_one(workflowData)
workflowId = insertOneResult.inserted_id
return workflowId
def setMongoStatus(workflowId, status):
mongoUrl = getMongoUrl()
if mongoUrl is not None:
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
dictWorkflow = collection.find_one({"_id": workflowId})
dictWorkflow["status"] = status
collection.update_one({"_id": workflowId}, {"$set": dictWorkflow}, upsert=False)
def setMongoAttribute(workflowId, attribute, value):
mongoUrl = getMongoUrl()
if mongoUrl is not None:
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
dictWorkflow = collection.find_one({"_id": workflowId})
dictWorkflow[attribute] = value
collection.update_one({"_id": workflowId}, {"$set": dictWorkflow}, upsert=False)
def closeMongo(workflowId, status="finished"):
mongoUrl = getMongoUrl()
if mongoUrl is not None:
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
dictWorkflow = collection.find_one({"_id": workflowId})
if dictWorkflow["status"] != "error":
dictWorkflow["status"] = status
dictWorkflow["stopTime"] = getDateTimeString()
collection.update_one({"_id": workflowId}, {"$set": dictWorkflow}, upsert=False)
def initActor(name, workflowId, actorData={}, script=None):
actorId = None
mongoUrl = getMongoUrl()
if mongoUrl is not None:
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
actorId = ObjectId()
actor = {
"_id": actorId,
"startTime": getDateTimeString(),
"name": name,
"status": "started",
"script": script,
}
actor.update(actorData)
collection.update({"_id": workflowId}, {"$push": {"actors": actor}})
return actorId
def addDataToActor(workflowId, actorId, actorData={}):
mongoUrl = getMongoUrl()
if mongoUrl is not None:
collection = pymongo.MongoClient(mongoUrl).pybes.pybes
dictWorkflow = collection.find_one({"_id": workflowId})
for actor in dictWorkflow["actors"]:
if actor["_id"] == actorId:
actor.update(actorData)
break
collection.update_one({"_id": workflowId}, {"$set": dictWorkflow}, upsert=False)
......@@ -30,14 +30,15 @@ import logging
import pathlib
import logging.handlers
from pypushflow import UtilsMongoDb
from pypushflow.persistence import db_client
class Workflow(object):
def __init__(self, name):
self.name = name
self.listOnErrorActor = []
self.mongoId = UtilsMongoDb.initMongo(name=name)
self.db_client = db_client()
self.db_client.startWorkflow(name)
self.listActorRef = []
self.logger = self.initLogger(name)
......@@ -96,3 +97,6 @@ class Workflow(object):
logger.debug("Starting new workflow " + name)
logger.debug("")
return logger
def setStatus(self, status):
self.db_client.setWorkflowStatus(status)
"""Persistent recording of a workflow executions.
When the "PYPUSHFLOW_MONGOURL" environment variable is specified,
the default database client is the PyBes client. Otherwise the
default client is a dummy client.
"""
import os
from typing import Callable
from .interface import WorkflowDbClient
from .pymongo import PyMongoWorkflowDbClient # noqa F401
from .pybes import PyBesWorkflowDbClient # noqa F401
from .mongita import MemoryWorkflowDbClient # noqa F401
from .dummy import DummyWorkflowDbClient # noqa F401
DEFAULT_DB_TYPE = "dummy"
def db_client(*args, db_type=None, **kwargs) -> WorkflowDbClient:
if db_type is None:
if os.environ.get("PYPUSHFLOW_MONGOURL"):
db_type = "pybes"
else:
db_type = DEFAULT_DB_TYPE
db_client_class = WorkflowDbClient.get_dbclient_class(db_type)
client = db_client_class(*args, **kwargs)
client.connect()
return client
def register_actorinfo_filter(method: Callable):
WorkflowDbClient.register_actorinfo_filter(method)
from typing import Optional
from .interface import WorkflowDbClient
class DummyWorkflowDbClient(WorkflowDbClient, register_name="dummy"):
"""Client without a database. Used for testing purposes."""
def connect(self, *args, **kw):
pass
def disconnect(self, *args, **kw):
pass
def startWorkflow(self, name: str):
pass
def endWorkflow(self, status="finished") -> None:
pass
def updateWorkflowInfo(self, info: dict) -> None:
pass
def getWorkflowInfo(self) -> Optional[dict]:
pass
def startActor(
self, name: str, info: Optional[dict] = None, script: Optional[str] = None
):
pass
def endActor(self, actorId, status="finished") -> None:
pass
def updateActorInfo(self, actorId, info: dict) -> None:
pass
def getActorInfo(self, actorId) -> Optional[None]:
pass
from typing import Callable, Optional
class WorkflowDbClient:
"""Client interface of a database for storing workflow executions."""
_REGISTRY = dict()
_ACTORINFO_FILTERS = list()
def __init_subclass__(cls, register_name=None) -> None:
super().__init_subclass__()
if register_name:
WorkflowDbClient._REGISTRY[register_name] = cls
@classmethod
def get_dbclient_class(cls, name):
return WorkflowDbClient._REGISTRY.get(name, None)
@classmethod
def register_actorinfo_filter(cls, method: Callable[[dict], dict]):
if method not in cls._ACTORINFO_FILTERS:
WorkflowDbClient._ACTORINFO_FILTERS.append(method)
@classmethod
def apply_actorinfo_filters(cls, info: dict) -> dict:
for method in WorkflowDbClient._ACTORINFO_FILTERS:
info = method(info)
return info
def connect(self, *args, **kw):
raise NotImplementedError
def disconnect(self, *args, **kw):
raise NotImplementedError
def startWorkflow(self, name: str):
raise NotImplementedError
def endWorkflow(self, status="finished") -> None:
raise NotImplementedError
def updateWorkflowInfo(self, info: dict) -> None:
raise NotImplementedError
def setWorkflowStatus(self, status: str) -> None:
self.updateWorkflowInfo({"status": status})
def getWorkflowInfo(self) -> Optional[dict]:
raise NotImplementedError
def startActor(
self, name: str, info: Optional[dict] = None, script: Optional[str] = None
):
raise NotImplementedError
def endActor(self, actorId, status="finished") -> None:
raise NotImplementedError
def updateActorInfo(self, actorId, info: dict) -> None:
raise NotImplementedError
def setActorStatus(self, actorId, status: str) -> None:
self.updateActorInfo(actorId, {"status": status})
def getActorInfo(self, actorId) -> Optional[None]:
raise NotImplementedError
from typing import Optional
try:
from bson.objectid import ObjectId
from mongita import MongitaClientMemory
except Exception:
ObjectId = None
MongitaClientMemory = None
from .mongo import MongoWorkflowDbClient
class MemoryWorkflowDbClient(MongoWorkflowDbClient, register_name="memory"):
"""Client of an in-memory Mongo database for storing workflow executions.
Used for testing purposes.
"""
def connect(self):
if MongitaClientMemory is None:
return
self._collection = MongitaClientMemory()["ppf"]["ppf"]
def disconnect(self, *args, **kw):
self._collection = None
def generateWorkflowId(self, oid: Optional[str] = None) -> ObjectId:
return ObjectId(oid=oid)
def generateActorId(self, oid: Optional[str] = None) -> ObjectId:
return ObjectId(oid=oid)
def _appendActorInfo(self, actorInfo: dict):
workflowInfo = self._workflowInfo
workflowInfo["actors"].append(actorInfo)
self._workflowInfo = workflowInfo
import logging
from datetime import datetime
from typing import Optional
from .interface import WorkflowDbClient
logger = logging.getLogger(__name__)
class MongoWorkflowDbClient(WorkflowDbClient):
"""Client interface of a Mongo database for storing workflow executions."""
def __init__(self):
self._collection = None
self._workflowId = None
def generateWorkflowId(self, *args, **kw):
raise NotImplementedError
def generateActorId(self, *args, **kw):
raise NotImplementedError
def startWorkflow(self, name: str):
if self._collection is None:
return
if self._workflowId is not None:
raise RuntimeError("Workflow start already logged")
workflowInfo = self._generateInitialWorkflowInfo()
workflowInfo["name"] = name
workflowInfo["status"] = "started"
workflowInfo["startTime"] = datetime.now()
try:
self._collection.insert_one(workflowInfo)
except Exception:
self._collection = None
logger.exception("Mongo database error")
self._workflowId = workflowInfo["_id"]
def endWorkflow(self, status="finished") -> None:
if self._skip:
return
workflowInfo = self._workflowInfo
if workflowInfo["status"] != "error":
workflowInfo["status"] = status
workflowInfo["stopTime"] = datetime.now()
self._workflowInfo = workflowInfo
def updateWorkflowInfo(self, info: dict) -> None:
if self._skip:
return
workflowInfo = self._workflowInfo
workflowInfo.update(info)
self._workflowInfo = workflowInfo
def getWorkflowInfo(self) -> Optional[dict]:
if self._collection is None:
return
return self._workflowInfo
def startActor(self, name: str, info: Optional[str] = None):
if self._skip:
return
actorInfo = self._generateInitialActorInfo()
actorInfo["name"] = name
actorInfo["status"] = "started"
if info:
actorInfo.update(info)
actorInfo["startTime"] = datetime.now()
actorInfo = self.apply_actorinfo_filters(actorInfo)
self._appendActorInfo(actorInfo)
return actorInfo["_id"]
def _appendActorInfo(self, actorInfo: dict):
raise NotImplementedError
def endActor(self, actorId, status="finished") -> None:
if self._skip:
return
workflowInfo = self._workflowInfo
for actorInfo in workflowInfo["actors"]:
if actorInfo["_id"] == actorId:
if actorInfo["status"] != "error":
actorInfo["status"] = status
actorInfo["stopTime"] = datetime.now()
self._workflowInfo = workflowInfo
break
def updateActorInfo(self, actorId, info: dict) -> None:
if self._skip:
return
info = self.apply_actorinfo_filters(info)
workflowInfo = self._workflowInfo
for actorInfo in workflowInfo["actors"]:
if actorInfo["_id"] == actorId:
actorInfo.update(info)
self._workflowInfo = workflowInfo
break
def getActorInfo(self, actorId) -> Optional[None]:
if self._skip:
return
workflowInfo = self._workflowInfo
for actorInfo in workflowInfo["actors"]:
if actorInfo["_id"] == actorId:
return actorInfo
@property
def _skip(self):
if self._collection is None:
return True
if self._workflowId is None:
raise RuntimeError("Workflow start not logged")
return False
def _generateInitialWorkflowInfo(self) -> dict:
oid = self.generateWorkflowId()
return {
"_id": oid,
"Request ID": str(oid),
"name": "unknown",
"status": "unknown",
"actors": [],
}
def _generateInitialActorInfo(self) -> dict: