Commit 690d444f authored by Wout De Nolf's avatar Wout De Nolf
Browse files

separate classes for workflow persistence

parent 81b6802c
"""Persistent recording of a workflow executions.
When the "PYPUSHFLOW_MONGOURL" environment variable is specified,
the default database client is the PyBes client. Otherwise the
default client is a dummy client.
"""
import os
from typing import Callable
from .interface import WorkflowDbClient
from .pymongo import PyMongoWorkflowDbClient # noqa F401
from .pybes import PyBesWorkflowDbClient # noqa F401
from .mongita import MemoryWorkflowDbClient # noqa F401
from .dummy import DummyWorkflowDbClient # noqa F401
DEFAULT_DB_TYPE = "dummy"
def db_client(*args, db_type=None, **kwargs) -> WorkflowDbClient:
if db_type is None:
if os.environ.get("PYPUSHFLOW_MONGOURL"):
db_type = "pybes"
else:
db_type = DEFAULT_DB_TYPE
db_client_class = WorkflowDbClient.get_dbclient_class(db_type)
client = db_client_class(*args, **kwargs)
client.connect()
return client
def register_actorinfo_filter(method: Callable):
WorkflowDbClient.register_actorinfo_filter(method)
from typing import Optional
from .interface import WorkflowDbClient
class DummyWorkflowDbClient(WorkflowDbClient, register_name="dummy"):
"""Client without a database. Used for testing purposes."""
def connect(self, *args, **kw):
pass
def disconnect(self, *args, **kw):
pass
def startWorkflow(self, name: str):
pass
def endWorkflow(self, status="finished") -> None:
pass
def updateWorkflowInfo(self, info: dict) -> None:
pass
def getWorkflowInfo(self) -> Optional[dict]:
pass
def startActor(
self, name: str, info: Optional[dict] = None, script: Optional[str] = None
):
pass
def endActor(self, actorId, status="finished") -> None:
pass
def updateActorInfo(self, actorId, info: dict) -> None:
pass
def getActorInfo(self, actorId) -> Optional[None]:
pass
from typing import Callable, Optional
class WorkflowDbClient:
"""Client interface of a database for storing workflow executions."""
_REGISTRY = dict()
_ACTORINFO_FILTERS = list()
def __init_subclass__(cls, register_name=None) -> None:
super().__init_subclass__()
if register_name:
WorkflowDbClient._REGISTRY[register_name] = cls
@classmethod
def get_dbclient_class(cls, name):
return WorkflowDbClient._REGISTRY.get(name, None)
@classmethod
def register_actorinfo_filter(cls, method: Callable[[dict], dict]):
if method not in cls._ACTORINFO_FILTERS:
WorkflowDbClient._ACTORINFO_FILTERS.append(method)
@classmethod
def apply_actorinfo_filters(cls, info: dict) -> dict:
for method in WorkflowDbClient._ACTORINFO_FILTERS:
info = method(info)
return info
def connect(self, *args, **kw):
raise NotImplementedError
def disconnect(self, *args, **kw):
raise NotImplementedError
def startWorkflow(self, name: str):
raise NotImplementedError
def endWorkflow(self, status="finished") -> None:
raise NotImplementedError
def updateWorkflowInfo(self, info: dict) -> None:
raise NotImplementedError
def setWorkflowStatus(self, status: str) -> None:
self.updateWorkflowInfo({"status": status})
def getWorkflowInfo(self) -> Optional[dict]:
raise NotImplementedError
def startActor(
self, name: str, info: Optional[dict] = None, script: Optional[str] = None
):
raise NotImplementedError
def endActor(self, actorId, status="finished") -> None:
raise NotImplementedError
def updateActorInfo(self, actorId, info: dict) -> None:
raise NotImplementedError
def setActorStatus(self, actorId, status: str) -> None:
self.updateActorInfo(actorId, {"status": status})
def getActorInfo(self, actorId) -> Optional[None]:
raise NotImplementedError
from typing import Optional
try:
from bson.objectid import ObjectId
from mongita import MongitaClientMemory
except Exception:
ObjectId = None
MongitaClientMemory = None
from .mongo import MongoWorkflowDbClient
class MemoryWorkflowDbClient(MongoWorkflowDbClient, register_name="memory"):
"""Client of an in-memory Mongo database for storing workflow executions.
Used for testing purposes.
"""
def connect(self):
if MongitaClientMemory is None:
return
self._collection = MongitaClientMemory()["ppf"]["ppf"]
def disconnect(self, *args, **kw):
self._collection = None
def generateWorkflowId(self, oid: Optional[str] = None) -> ObjectId:
return ObjectId(oid=oid)
def generateActorId(self, oid: Optional[str] = None) -> ObjectId:
return ObjectId(oid=oid)
def _appendActorInfo(self, actorInfo: dict):
workflowInfo = self._workflowInfo
workflowInfo["actors"].append(actorInfo)
self._workflowInfo = workflowInfo
import logging
from datetime import datetime
from typing import Optional
from .interface import WorkflowDbClient
logger = logging.getLogger(__name__)
class MongoWorkflowDbClient(WorkflowDbClient):
"""Client interface of a Mongo database for storing workflow executions."""
def __init__(self):
self._collection = None
self._workflowId = None
def generateWorkflowId(self, *args, **kw):
raise NotImplementedError
def generateActorId(self, *args, **kw):
raise NotImplementedError
def startWorkflow(self, name: str):
if self._collection is None:
return
if self._workflowId is not None:
raise RuntimeError("Workflow start already logged")
workflowInfo = self._generateInitialWorkflowInfo()
workflowInfo["name"] = name
workflowInfo["status"] = "started"
workflowInfo["startTime"] = datetime.now()
try:
self._collection.insert_one(workflowInfo)
except Exception:
self._collection = None
logger.exception("Mongo database error")
self._workflowId = workflowInfo["_id"]
def endWorkflow(self, status="finished") -> None:
if self._skip:
return
workflowInfo = self._workflowInfo
if workflowInfo["status"] != "error":
workflowInfo["status"] = status
workflowInfo["stopTime"] = datetime.now()
self._workflowInfo = workflowInfo
def updateWorkflowInfo(self, info: dict) -> None:
if self._skip:
return
workflowInfo = self._workflowInfo
workflowInfo.update(info)
self._workflowInfo = workflowInfo
def getWorkflowInfo(self) -> Optional[dict]:
if self._collection is None:
return
return self._workflowInfo
def startActor(self, name: str, info: Optional[str] = None):
if self._skip:
return
actorInfo = self._generateInitialActorInfo()
actorInfo["name"] = name
actorInfo["status"] = "started"
if info:
actorInfo.update(info)
actorInfo["startTime"] = datetime.now()
actorInfo = self.apply_actorinfo_filters(actorInfo)
self._appendActorInfo(actorInfo)
return actorInfo["_id"]
def _appendActorInfo(self, actorInfo: dict):
raise NotImplementedError
def endActor(self, actorId, status="finished") -> None:
if self._skip:
return
workflowInfo = self._workflowInfo
for actorInfo in workflowInfo["actors"]:
if actorInfo["_id"] == actorId:
if actorInfo["status"] != "error":
actorInfo["status"] = status
actorInfo["stopTime"] = datetime.now()
self._workflowInfo = workflowInfo
break
def updateActorInfo(self, actorId, info: dict) -> None:
if self._skip:
return
info = self.apply_actorinfo_filters(info)
workflowInfo = self._workflowInfo
for actorInfo in workflowInfo["actors"]:
if actorInfo["_id"] == actorId:
actorInfo.update(info)
self._workflowInfo = workflowInfo
break
def getActorInfo(self, actorId) -> Optional[None]:
if self._skip:
return
workflowInfo = self._workflowInfo
for actorInfo in workflowInfo["actors"]:
if actorInfo["_id"] == actorId:
return actorInfo
@property
def _skip(self):
if self._collection is None:
return True
if self._workflowId is None:
raise RuntimeError("Workflow start not logged")
return False
def _generateInitialWorkflowInfo(self) -> dict:
oid = self.generateWorkflowId()
return {
"_id": oid,
"Request ID": str(oid),
"name": "unknown",
"status": "unknown",
"actors": [],
}
def _generateInitialActorInfo(self) -> dict:
oid = self.generateActorId()
return {
"_id": oid,
"name": "unknown",
"status": "unknown",
}
@property
def _workflowInfo(self) -> dict:
return self._collection.find_one({"_id": self._workflowId})
@_workflowInfo.setter
def _workflowInfo(self, info: dict):
self._collection.update_one(
{"_id": self._workflowId}, {"$set": info}, upsert=False
)
import os
from typing import Optional
from .pymongo import PyMongoWorkflowDbClient
class PyBesWorkflowDbClient(PyMongoWorkflowDbClient, register_name="pybes"):
"""Client of the PyBes Mongo database for storing workflow executions."""
def connect(self, url: Optional[str] = None):
if not url:
url = os.environ.get("PYPUSHFLOW_MONGOURL", None)
super().connect(url=url, database="pybes", collection="pybes")
def _generateInitialWorkflowInfo(self) -> dict:
initiator = os.environ.get("PYPUSHFLOW_INITIATOR", "Unknown")
host = os.environ.get("PYPUSHFLOW_HOST", "Unknown")
port = os.environ.get("PYPUSHFLOW_PORT", "Unknown")
return {
"initiator": initiator,
"host": host,
"port": port,
}
def generateWorkflowId(self, oid: Optional[str] = None):
if not oid:
oid = os.environ.get("PYPUSHFLOW_OBJECTID")
super().generateWorkflowId(oid=oid)
from typing import Optional
try:
from bson.objectid import ObjectId
from pymongo import MongoClient
except Exception:
ObjectId = None
MongoClient = None
from .mongo import MongoWorkflowDbClient
class PyMongoWorkflowDbClient(MongoWorkflowDbClient, register_name="pymongo"):
"""Client of an external Mongo database for storing workflow executions."""
def connect(self, url: str, database: str, collection: str):
if MongoClient is None:
return
client = MongoClient(url, serverSelectionTimeoutMS=1000)
self._client = client
self._collection = client[database][collection]
def disconnect(self, *args, **kw):
self._collection = None
if self._client is not None:
self._client.close()
self._client = None
def generateWorkflowId(self, oid: Optional[str] = None) -> ObjectId:
return ObjectId(oid=oid)
def generateActorId(self, oid: Optional[str] = None) -> ObjectId:
return ObjectId(oid=oid)
def _appendActorInfo(self, actorInfo: dict):
self._collection.update_one(
{"_id": self._workflowId}, {"$push": {"actors": actorInfo}}
)
......@@ -22,3 +22,14 @@
__authors__ = ["O. Svensson"]
__license__ = "MIT"
__date__ = "28/05/2019"
from pypushflow import persistence
persistence.DEFAULT_DB_TYPE = "memory"
def actorinfo_filter(info):
return info
persistence.register_actorinfo_filter(actorinfo_filter)
import unittest
import os
class PyBesTestCase(unittest.TestCase):
def setUp(self) -> None:
os.environ[
"PYPUSHFLOW_MONGOURL"
] = "mongodb://pybes:pybes@linsvensson.esrf.fr:27017/pybes"
os.environ["PYPUSHFLOW_CREATOR"] = "PyPushflowUnitTests"
def tearDown(self) -> None:
os.environ.pop("PYPUSHFLOW_MONGOURL", None)
os.environ.pop("PYPUSHFLOW_CREATOR", None)
import unittest
from pypushflow.persistence import db_client
from .pybesTestCase import PyBesTestCase
class PersistenceTests:
def setUp(self):
super().setUp()
self.skip_message = ""
def test_Workflow(self):
name = "test_startWorkflow"
self.db_client.startWorkflow(name=name)
info = self.db_client.getWorkflowInfo()
if info is None:
self.skipTest(self.skip_message)
return
self.assertEqual(info["status"], "started")
self.db_client.updateWorkflowInfo({"status": "error"})
status = self.db_client.getWorkflowInfo()["status"]
self.assertEqual(status, "error")
self.db_client.endWorkflow()
status = self.db_client.getWorkflowInfo()["status"]
self.assertEqual(status, "error")
def test_Actor(self):
name = "test_startWorkflow"
self.db_client.startWorkflow(name=name)
actorName1 = "TestActor1"
actorId1 = self.db_client.startActor(actorName1)
info = self.db_client.getActorInfo(actorId1)
if info is None:
self.skipTest(self.skip_message)
return
self.assertEqual(info["status"], "started")
self.db_client.updateActorInfo(actorId1, {"status": "error"})
status = self.db_client.getActorInfo(actorId1)["status"]
self.assertEqual(status, "error")
self.db_client.endActor(actorId1)
status = self.db_client.getActorInfo(actorId1)["status"]
self.assertEqual(status, "error")
actorName2 = "TestActor2"
actorId2 = self.db_client.startActor(name=actorName2)
status = self.db_client.getActorInfo(actorId2)["status"]
self.assertEqual(status, "started")
self.db_client.updateActorInfo(actorId2, {"data": {"a": 1}})
data = self.db_client.getActorInfo(actorId2)["data"]
self.assertEqual(data, {"a": 1})
self.db_client.endActor(actorId2)
status = self.db_client.getActorInfo(actorId2)["status"]
self.assertEqual(status, "finished")
class TestDummyPersistence(unittest.TestCase, PersistenceTests):
def setUp(self):
super().setUp()
self.skip_message = "dummy"
self.db_client = db_client(db_type="dummy")
self.db_client.connect()
class TestMemoryPersistence(unittest.TestCase, PersistenceTests):
def setUp(self):
super().setUp()
self.skip_message = "requires the 'mongita' library"
self.db_client = db_client(db_type="memory")
self.db_client.connect()
class TestPyBesPersistence(PyBesTestCase, PersistenceTests):
def setUp(self):
super().setUp()
self.skip_message = "requires external database and the 'pymongo' library"
self.db_client = db_client(db_type="pybes")
self.db_client.connect()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment