Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
workflow
pypushflow
Commits
1250c8a0
Commit
1250c8a0
authored
Sep 16, 2021
by
Wout De Nolf
Browse files
Merge branch 'enfore_style' into 'main'
Enforce style See merge request
!14
parents
eca70f78
e74daedb
Pipeline
#54695
passed with stages
in 41 seconds
Changes
26
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
.gitlab-ci.yml
View file @
1250c8a0
stages
:
-
style
-
test
flake8
:
stage
:
style
image
:
"
docker-registry.esrf.fr/dau/ewoks:python_3.7"
before_script
:
-
pip install flake8
script
:
-
flake8
black
:
stage
:
style
image
:
"
docker-registry.esrf.fr/dau/ewoks:python_3.7"
before_script
:
-
pip install black
script
:
-
LC_ALL=C.UTF-8 black --check --safe .
test
:
stage
:
test
image
:
python
:
3.
6
image
:
"
docker-registry.esrf.fr/dau/ewoks:
python
_
3.
7"
before_script
:
-
pip install pytest-cov
-
pip install .[test]
script
:
-
pytest --cov=./ .
only
:
refs
:
-
main
-
merge_requests
pypushflow/AbstractActor.py
View file @
1250c8a0
...
...
@@ -28,11 +28,10 @@ import logging
from
pypushflow
import
UtilsMongoDb
from
pypushflow.ThreadCountingActor
import
ThreadCountingActor
logger
=
logging
.
getLogger
(
'
pypushflow
'
)
logger
=
logging
.
getLogger
(
"
pypushflow
"
)
class
AbstractActor
(
ThreadCountingActor
):
def
__init__
(
self
,
parent
=
None
,
name
=
None
,
**
kw
):
super
().
__init__
(
name
=
name
,
parent
=
parent
,
**
kw
)
self
.
listDownStreamActor
=
[]
...
...
@@ -41,29 +40,29 @@ class AbstractActor(ThreadCountingActor):
self
.
finished
=
False
def
connect
(
self
,
actor
):
logger
.
debug
(
'Connecting actor "{0}" to actor "{1}"'
.
format
(
self
.
name
,
actor
.
name
)
)
logger
.
debug
(
'Connecting actor "{0}" to actor "{1}"'
.
format
(
self
.
name
,
actor
.
name
)
)
self
.
listDownStreamActor
.
append
(
actor
)
def
trigger
(
self
,
inData
):
self
.
setStarted
()
self
.
setFinished
()
for
actor
in
self
.
listDownStreamActor
:
logger
.
debug
(
'In actor "{0}", triggering actor "{1}"'
.
format
(
self
.
name
,
actor
.
name
)
)
logger
.
debug
(
'In actor "{0}", triggering actor "{1}"'
.
format
(
self
.
name
,
actor
.
name
)
)
actor
.
trigger
(
inData
)
def
uploadInDataToMongo
(
self
,
actorData
=
{},
script
=
None
):
if
self
.
parent
is
not
None
:
if
self
.
parent
.
mongoId
is
not
None
:
actorPath
=
self
.
getActorPath
()
+
'/'
+
self
.
name
actorPath
=
self
.
getActorPath
()
+
"/"
+
self
.
name
self
.
actorId
=
UtilsMongoDb
.
initActor
(
workflowId
=
self
.
parent
.
mongoId
,
name
=
actorPath
,
actorData
=
actorData
,
script
=
script
script
=
script
,
)
def
uploadOutDataToMongo
(
self
,
actorData
=
{},
script
=
None
):
...
...
@@ -72,7 +71,7 @@ class AbstractActor(ThreadCountingActor):
UtilsMongoDb
.
addDataToActor
(
workflowId
=
self
.
parent
.
mongoId
,
actorId
=
self
.
actorId
,
actorData
=
actorData
actorData
=
actorData
,
)
def
setMongoAttribute
(
self
,
attribute
,
value
):
...
...
@@ -96,4 +95,3 @@ class AbstractActor(ThreadCountingActor):
def
setFinished
(
self
):
logger
.
debug
(
"Setting finished of {0} to True"
.
format
(
self
.
name
))
self
.
finished
=
True
pypushflow/ErrorHandler.py
View file @
1250c8a0
...
...
@@ -29,11 +29,10 @@ from pypushflow.AbstractActor import AbstractActor
class
ErrorHandler
(
AbstractActor
):
def
__init__
(
self
,
parent
=
None
,
name
=
'Error handler'
,
**
kw
):
def
__init__
(
self
,
parent
=
None
,
name
=
"Error handler"
,
**
kw
):
super
().
__init__
(
parent
=
parent
,
name
=
name
,
**
kw
)
def
trigger
(
self
,
inData
):
if
self
.
parent
is
not
None
and
hasattr
(
self
.
parent
,
'
mongoId
'
):
UtilsMongoDb
.
setMongoStatus
(
self
.
parent
.
mongoId
,
'
error
'
)
if
self
.
parent
is
not
None
and
hasattr
(
self
.
parent
,
"
mongoId
"
):
UtilsMongoDb
.
setMongoStatus
(
self
.
parent
.
mongoId
,
"
error
"
)
super
().
trigger
(
inData
=
inData
)
pypushflow/ForkActor.py
View file @
1250c8a0
...
...
@@ -27,6 +27,5 @@ from pypushflow.AbstractActor import AbstractActor
class
ForkActor
(
AbstractActor
):
def
__init__
(
self
,
parent
=
None
,
name
=
'Fork actor'
,
**
kw
):
def
__init__
(
self
,
parent
=
None
,
name
=
"Fork actor"
,
**
kw
):
super
().
__init__
(
parent
=
parent
,
name
=
name
,
**
kw
)
pypushflow/JoinActor.py
View file @
1250c8a0
...
...
@@ -27,8 +27,7 @@ from pypushflow.AbstractActor import AbstractActor
class
JoinActor
(
AbstractActor
):
def
__init__
(
self
,
parent
=
None
,
name
=
'Join actor'
,
**
kw
):
def
__init__
(
self
,
parent
=
None
,
name
=
"Join actor"
,
**
kw
):
super
().
__init__
(
parent
=
parent
,
name
=
name
,
**
kw
)
self
.
numberOfThreads
=
0
self
.
listInData
=
[]
...
...
@@ -45,4 +44,4 @@ class JoinActor(AbstractActor):
for
data
in
self
.
listInData
:
newInData
.
update
(
data
)
for
actor
in
self
.
listDownStreamActor
:
actor
.
trigger
(
newInData
)
\ No newline at end of file
actor
.
trigger
(
newInData
)
pypushflow/PythonActor.py
View file @
1250c8a0
...
...
@@ -37,11 +37,10 @@ import multiprocessing.pool
from
pypushflow.AbstractActor
import
AbstractActor
logger
=
logging
.
getLogger
(
'
pypushflow
'
)
logger
=
logging
.
getLogger
(
"
pypushflow
"
)
class
ActorWrapperException
(
Exception
):
def
__init__
(
self
,
errorMessage
=
""
,
traceBack
=
""
,
data
=
{},
msg
=
None
):
super
(
ActorWrapperException
,
self
).
__init__
(
msg
)
self
.
errorMessage
=
errorMessage
...
...
@@ -55,15 +54,14 @@ def trace_unhandled_exceptions(func):
try
:
outData
=
func
(
*
args
,
**
kwargs
)
except
Exception
as
e
:
errorMessage
=
'
{0}
'
.
format
(
e
)
errorMessage
=
"
{0}
"
.
format
(
e
)
logger
.
exception
(
errorMessage
)
traceBack
=
traceback
.
format_exc
()
return
ActorWrapperException
(
errorMessage
=
errorMessage
,
traceBack
=
traceBack
,
data
=
args
[
1
]
errorMessage
=
errorMessage
,
traceBack
=
traceBack
,
data
=
args
[
1
]
)
return
outData
return
wrapped_func
...
...
@@ -109,32 +107,43 @@ class AsyncFactory:
self
.
hasFinished
=
False
def
call
(
self
,
*
args
,
**
kwargs
):
logger
.
debug
(
'Before apply_async, func={0}, callback={1}, errorCallback={2}'
.
format
(
self
.
func
,
self
.
callback
,
self
.
errorCallback
))
logger
.
debug
(
'args={0}, kwargs={1}'
.
format
(
args
,
kwargs
))
self
.
pool
.
apply_async
(
self
.
func
,
args
=
args
,
kwds
=
kwargs
,
callback
=
self
.
callback
,
error_callback
=
self
.
errorCallback
)
logger
.
debug
(
"Before apply_async, func={0}, callback={1}, errorCallback={2}"
.
format
(
self
.
func
,
self
.
callback
,
self
.
errorCallback
)
)
logger
.
debug
(
"args={0}, kwargs={1}"
.
format
(
args
,
kwargs
))
self
.
pool
.
apply_async
(
self
.
func
,
args
=
args
,
kwds
=
kwargs
,
callback
=
self
.
callback
,
error_callback
=
self
.
errorCallback
,
)
self
.
pool
.
close
()
logger
.
debug
(
'
After apply_async
'
)
logger
.
debug
(
"
After apply_async
"
)
class
ActorWrapper
:
def
__init__
(
self
,
name
,
method
):
self
.
name
=
name
self
.
method
=
method
@
trace_unhandled_exceptions
def
run
(
self
,
*
args
,
**
kwargs
):
logger
.
debug
(
'In actor wrapper for {0}'
.
format
(
self
.
name
))
logger
.
debug
(
'args={0}, kwargs={1}, method={2}'
.
format
(
args
,
kwargs
,
self
.
method
))
logger
.
debug
(
"In actor wrapper for {0}"
.
format
(
self
.
name
))
logger
.
debug
(
"args={0}, kwargs={1}, method={2}"
.
format
(
args
,
kwargs
,
self
.
method
)
)
inData
=
args
[
0
]
outData
=
self
.
method
(
**
inData
)
return
outData
class
PythonActor
(
AbstractActor
):
def
__init__
(
self
,
parent
=
None
,
name
=
'Python Actor'
,
errorHandler
=
None
,
script
=
None
,
**
kw
):
def
__init__
(
self
,
parent
=
None
,
name
=
"Python Actor"
,
errorHandler
=
None
,
script
=
None
,
**
kw
):
super
().
__init__
(
parent
=
parent
,
name
=
name
,
**
kw
)
self
.
parentErrorHandler
=
errorHandler
self
.
listErrorHandler
=
[]
...
...
@@ -150,23 +159,27 @@ class PythonActor(AbstractActor):
def
trigger
(
self
,
inData
):
self
.
setStarted
()
self
.
inData
=
dict
(
inData
)
self
.
uploadInDataToMongo
(
actorData
=
{
'inData'
:
inData
},
script
=
self
.
script
)
logger
.
debug
(
'In trigger {0}, inData = {1}'
.
format
(
self
.
name
,
pprint
.
pformat
(
inData
)))
self
.
uploadInDataToMongo
(
actorData
=
{
"inData"
:
inData
},
script
=
self
.
script
)
logger
.
debug
(
"In trigger {0}, inData = {1}"
.
format
(
self
.
name
,
pprint
.
pformat
(
inData
))
)
if
isinstance
(
inData
,
ActorWrapperException
):
logger
.
error
(
'Error from previous actor! Not running actor {0}'
.
format
(
self
.
name
))
logger
.
error
(
"Error from previous actor! Not running actor {0}"
.
format
(
self
.
name
)
)
if
self
.
parentErrorHandler
is
not
None
:
workflowException
=
inData
oldInData
=
workflowException
.
data
exceptionDict
=
{
'
errorMessage
'
:
workflowException
.
errorMessage
,
'
traceBack
'
:
workflowException
.
traceBack
.
split
(
'
\n
'
),
"
errorMessage
"
:
workflowException
.
errorMessage
,
"
traceBack
"
:
workflowException
.
traceBack
.
split
(
"
\n
"
),
}
oldInData
[
'
WorkflowException
'
]
=
exceptionDict
oldInData
[
"
WorkflowException
"
]
=
exceptionDict
self
.
parentErrorHandler
.
triggerOnError
(
oldInData
)
try
:
module
=
importlib
.
import_module
(
os
.
path
.
splitext
(
self
.
script
)[
0
])
except
Exception
as
e
:
logger
.
error
(
'
Error when trying to import script {0}
'
.
format
(
self
.
script
))
logger
.
error
(
"
Error when trying to import script {0}
"
.
format
(
self
.
script
))
time
.
sleep
(
1
)
self
.
errorHandler
(
inData
,
e
)
else
:
...
...
@@ -184,9 +197,9 @@ class PythonActor(AbstractActor):
self
.
af
.
call
(
self
.
inData
)
def
errorHandler
(
self
,
inData
,
exception
):
logger
.
error
(
'
Error when running actor {0}!
'
.
format
(
self
.
name
))
logger
.
error
(
"
Error when running actor {0}!
"
.
format
(
self
.
name
))
self
.
setFinished
()
errorMessage
=
'
{0}
'
.
format
(
exception
)
errorMessage
=
"
{0}
"
.
format
(
exception
)
logger
.
exception
(
errorMessage
)
traceBack
=
traceback
.
format_exc
().
split
(
"
\n
"
)
# workflowException = WorkflowException(
...
...
@@ -194,9 +207,9 @@ class PythonActor(AbstractActor):
# traceBack=traceBack
# )
outData
=
dict
(
inData
)
outData
[
'
WorkflowException
'
]
=
{
outData
[
"
WorkflowException
"
]
=
{
"errorMessage"
:
errorMessage
,
"traceBack"
:
traceBack
"traceBack"
:
traceBack
,
}
logger
.
error
(
exception
)
for
errorHandler
in
self
.
listErrorHandler
:
...
...
@@ -205,34 +218,46 @@ class PythonActor(AbstractActor):
self
.
parentErrorHandler
.
triggerOnError
(
outData
)
def
triggerDownStreamActor
(
self
,
inData
=
{}):
logger
.
debug
(
'
In triggerDownStreamActor for {0}
'
.
format
(
self
.
name
))
logger
.
debug
(
"
In triggerDownStreamActor for {0}
"
.
format
(
self
.
name
))
self
.
setFinished
()
if
isinstance
(
inData
,
ActorWrapperException
):
logger
.
error
(
'Error from previous actor! Not running down stream actors {0}'
.
format
([
actor
.
name
for
actor
in
self
.
listDownStreamActor
]))
logger
.
error
(
"Error from previous actor! Not running down stream actors {0}"
.
format
(
[
actor
.
name
for
actor
in
self
.
listDownStreamActor
]
)
)
workflowException
=
inData
oldInData
=
workflowException
.
data
exceptionDict
=
{
'
errorMessage
'
:
workflowException
.
errorMessage
,
'
traceBack
'
:
workflowException
.
traceBack
.
split
(
'
\n
'
),
"
errorMessage
"
:
workflowException
.
errorMessage
,
"
traceBack
"
:
workflowException
.
traceBack
.
split
(
"
\n
"
),
}
oldInData
[
'WorkflowException'
]
=
exceptionDict
self
.
uploadOutDataToMongo
(
actorData
=
{
'stopTime'
:
datetime
.
datetime
.
now
(),
'status'
:
'error'
,
'outData'
:
exceptionDict
})
oldInData
[
"WorkflowException"
]
=
exceptionDict
self
.
uploadOutDataToMongo
(
actorData
=
{
"stopTime"
:
datetime
.
datetime
.
now
(),
"status"
:
"error"
,
"outData"
:
exceptionDict
,
}
)
for
errorHandler
in
self
.
listErrorHandler
:
errorHandler
.
trigger
(
oldInData
)
if
self
.
parentErrorHandler
is
not
None
:
logger
.
error
(
'Trigger on error on errorHandler "{0}"'
.
format
(
self
.
parentErrorHandler
.
name
))
logger
.
error
(
'Trigger on error on errorHandler "{0}"'
.
format
(
self
.
parentErrorHandler
.
name
)
)
self
.
parentErrorHandler
.
triggerOnError
(
inData
=
oldInData
)
else
:
outData
=
dict
(
inData
)
self
.
uploadOutDataToMongo
(
actorData
=
{
'stopTime'
:
datetime
.
datetime
.
now
(),
'status'
:
'finished'
,
'outData'
:
outData
})
self
.
uploadOutDataToMongo
(
actorData
=
{
"stopTime"
:
datetime
.
datetime
.
now
(),
"status"
:
"finished"
,
"outData"
:
outData
,
}
)
if
"workflowLogFile"
in
outData
:
self
.
setMongoAttribute
(
"logFile"
,
outData
[
"workflowLogFile"
])
if
"workflowDebugLogFile"
in
outData
:
...
...
@@ -240,6 +265,9 @@ class PythonActor(AbstractActor):
downstreamData
=
dict
(
self
.
inData
)
downstreamData
.
update
(
outData
)
for
downStreamActor
in
self
.
listDownStreamActor
:
logger
.
debug
(
'In trigger {0}, triggering actor {1}, inData={2}'
.
format
(
self
.
name
,
downStreamActor
.
name
,
downstreamData
))
logger
.
debug
(
"In trigger {0}, triggering actor {1}, inData={2}"
.
format
(
self
.
name
,
downStreamActor
.
name
,
downstreamData
)
)
downStreamActor
.
trigger
(
downstreamData
)
pypushflow/RequestStatus.py
View file @
1250c8a0
...
...
@@ -27,13 +27,17 @@ from pypushflow import UtilsMongoDb
from
pypushflow.AbstractActor
import
AbstractActor
class
RequestStatus
(
AbstractActor
):
def
__init__
(
self
,
parent
=
None
,
name
=
'Request status'
,
status
=
None
,
**
kw
):
class
RequestStatus
(
AbstractActor
):
def
__init__
(
self
,
parent
=
None
,
name
=
"Request status"
,
status
=
None
,
**
kw
):
super
().
__init__
(
parent
=
parent
,
name
=
name
,
**
kw
)
self
.
status
=
status
self
.
status
=
status
def
trigger
(
self
,
inData
):
if
self
.
parent
is
not
None
and
hasattr
(
self
.
parent
,
'mongoId'
)
and
self
.
status
is
not
None
:
if
(
self
.
parent
is
not
None
and
hasattr
(
self
.
parent
,
"mongoId"
)
and
self
.
status
is
not
None
):
UtilsMongoDb
.
setMongoStatus
(
self
.
parent
.
mongoId
,
self
.
status
)
super
().
trigger
(
inData
=
inData
)
pypushflow/RouterActor.py
View file @
1250c8a0
...
...
@@ -26,13 +26,20 @@ __date__ = "28/05/2019"
from
pypushflow.AbstractActor
import
AbstractActor
import
logging
logger
=
logging
.
getLogger
(
'pypushflow'
)
logger
=
logging
.
getLogger
(
"pypushflow"
)
class
RouterActor
(
AbstractActor
):
def
__init__
(
self
,
parent
=
None
,
errorHandler
=
None
,
name
=
'Router'
,
itemName
=
None
,
listPort
=
None
,
**
kw
):
class
RouterActor
(
AbstractActor
):
def
__init__
(
self
,
parent
=
None
,
errorHandler
=
None
,
name
=
"Router"
,
itemName
=
None
,
listPort
=
None
,
**
kw
):
super
().
__init__
(
parent
=
parent
,
name
=
name
,
**
kw
)
self
.
errorHandler
=
errorHandler
self
.
name
=
name
...
...
@@ -43,14 +50,16 @@ class RouterActor(AbstractActor):
self
.
listPort
=
listPort
self
.
dictValues
=
{}
def
connect
(
self
,
actor
,
expectedValue
=
'
other
'
):
if
expectedValue
!=
'
other
'
and
not
expectedValue
in
self
.
listPort
:
def
connect
(
self
,
actor
,
expectedValue
=
"
other
"
):
if
expectedValue
!=
"
other
"
and
expectedValue
not
in
self
.
listPort
:
raise
RuntimeError
(
'Port {0} not defined for router actor {1}!'
.
format
(
expectedValue
,
self
.
name
))
"Port {0} not defined for router actor {1}!"
.
format
(
expectedValue
,
self
.
name
)
)
if
expectedValue
in
self
.
dictValues
:
self
.
dictValues
[
expectedValue
].
append
(
actor
)
else
:
else
:
self
.
dictValues
[
expectedValue
]
=
[
actor
]
def
trigger
(
self
,
inData
):
...
...
@@ -61,13 +70,13 @@ class RouterActor(AbstractActor):
if
self
.
itemName
in
inData
:
logger
.
debug
(
'In router actor "{0}", itemName {1} in inData'
.
format
(
self
.
name
,
self
.
itemName
))
self
.
name
,
self
.
itemName
)
)
value
=
inData
[
self
.
itemName
]
logger
.
debug
(
'In router actor "{0}", value = {1}'
.
format
(
self
.
name
,
value
))
if
value
in
[
None
,
'None'
,
'null'
]:
value
=
'null'
logger
.
debug
(
'In router actor "{0}", value = {1}'
.
format
(
self
.
name
,
value
))
if
value
in
[
None
,
"None"
,
"null"
]:
value
=
"null"
elif
type
(
value
)
==
bool
:
if
value
:
value
=
"true"
...
...
@@ -77,13 +86,16 @@ class RouterActor(AbstractActor):
listActor
=
self
.
dictValues
[
value
]
if
listActor
is
None
:
logger
.
debug
(
'In router actor "{0}", actor is None'
)
if
'
other
'
in
self
.
dictValues
:
listActor
=
self
.
dictValues
[
'
other
'
]
if
"
other
"
in
self
.
dictValues
:
listActor
=
self
.
dictValues
[
"
other
"
]
else
:
raise
RuntimeError
(
'No "other" port for router actor "{0}"'
.
format
(
self
.
name
))
'No "other" port for router actor "{0}"'
.
format
(
self
.
name
)
)
for
actor
in
listActor
:
logger
.
debug
(
'In router actor "{0}", triggering actor "{1}"'
.
format
(
self
.
name
,
actor
.
name
))
self
.
name
,
actor
.
name
)
)
actor
.
trigger
(
inData
)
pypushflow/StartActor.py
View file @
1250c8a0
...
...
@@ -27,6 +27,5 @@ from pypushflow.AbstractActor import AbstractActor
class
StartActor
(
AbstractActor
):
def
__init__
(
self
,
parent
=
None
,
name
=
'Start actor'
,
**
kw
):
def
__init__
(
self
,
parent
=
None
,
name
=
"Start actor"
,
**
kw
):
super
().
__init__
(
parent
=
parent
,
name
=
name
,
**
kw
)
pypushflow/StopActor.py
View file @
1250c8a0
...
...
@@ -29,18 +29,19 @@ from pypushflow import UtilsMongoDb
from
pypushflow
import
Submodel
from
pypushflow.ThreadCountingActor
import
ThreadCountingActor
logger
=
logging
.
getLogger
(
'
pypushflow
'
)
logger
=
logging
.
getLogger
(
"
pypushflow
"
)
class
StopActor
(
ThreadCountingActor
):
def
__init__
(
self
,
parent
=
None
,
errorHandler
=
None
,
name
=
'Stop actor'
,
**
kw
):
def
__init__
(
self
,
parent
=
None
,
errorHandler
=
None
,
name
=
"Stop actor"
,
**
kw
):
super
().
__init__
(
name
=
name
,
parent
=
parent
,
**
kw
)
self
.
errorHandler
=
errorHandler
self
.
outData
=
None
def
trigger
(
self
,
inData
):
logger
.
debug
(
'In trigger {0}, errorHandler = {1}'
.
format
(
self
.
name
,
self
.
errorHandler
))
logger
.
debug
(
"In trigger {0}, errorHandler = {1}"
.
format
(
self
.
name
,
self
.
errorHandler
)
)
if
self
.
parent
is
not
None
and
not
isinstance
(
self
.
parent
,
Submodel
.
Submodel
):
# Parent is a Workflow
self
.
outData
=
inData
...
...
@@ -51,10 +52,18 @@ class StopActor(ThreadCountingActor):
def
join
(
self
,
timeout
=
7200
):
if
self
.
parent
is
not
None
:
logger
.
debug
(
'In {0}, parent {1}, before wait_threads_finished'
.
format
(
self
.
name
,
self
.
parent
.
name
))
logger
.
debug
(
"In {0}, parent {1}, before wait_threads_finished"
.
format
(
self
.
name
,
self
.
parent
.
name
)
)
success
=
self
.
_wait_threads_finished
(
timeout
=
timeout
)
if
self
.
parent
is
not
None
:
logger
.
debug
(
'In {0}, parent {1}, after wait_threads_finished'
.
format
(
self
.
name
,
self
.
parent
.
name
))
logger
.
debug
(
"In {0}, parent {1}, after wait_threads_finished"
.
format
(
self
.
name
,
self
.
parent
.
name
)
)
self
.
_finalizeInMongo
(
success
)
return
success
...
...
@@ -62,8 +71,14 @@ class StopActor(ThreadCountingActor):
if
self
.
parent
is
None
:
return
if
success
:
logger
.
debug
(
'In {0}, parent {1}, finished'
.
format
(
self
.
name
,
self
.
parent
.
name
))
UtilsMongoDb
.
closeMongo
(
self
.
parent
.
mongoId
,
status
=
'finished'
)
logger
.
debug
(
"In {0}, parent {1}, finished"
.
format
(
self
.
name
,
self
.
parent
.
name
)
)
UtilsMongoDb
.
closeMongo
(
self
.
parent
.
mongoId
,
status
=
"finished"
)
else
:
logger
.
error
(
'In {0}, parent {1}, timeout detected'
.
format
(
self
.
name
,
self
.
parent
.
name
))
UtilsMongoDb
.
closeMongo
(
self
.
parent
.
mongoId
,
status
=
'timeout'
)
logger
.
error
(
"In {0}, parent {1}, timeout detected"
.
format
(
self
.
name
,
self
.
parent
.
name
)
)
UtilsMongoDb
.
closeMongo
(
self
.
parent
.
mongoId
,
status
=
"timeout"
)
pypushflow/UtilsMongoDb.py
View file @
1250c8a0
...
...
@@ -31,9 +31,12 @@ try:
import
pymongo
import
bson
from
bson.objectid
import
ObjectId
USE_MONGODB
=
True
except
:
print
(
"Error when trying to import pymongo and/or bson - no MongoDB connection possible"
)
except
ImportError
:
print
(
"Error when trying to import pymongo and/or bson - no MongoDB connection possible"
)
USE_MONGODB
=
False
...
...
@@ -44,7 +47,7 @@ def getDateTimeString():
def
getMongoUrl
():
mongoUrl
=
None
if
USE_MONGODB
:
mongoUrl
=
os
.
environ
.
get
(
'
PYPUSHFLOW_MONGOURL
'
,
None
)
mongoUrl
=
os
.
environ
.
get
(
"
PYPUSHFLOW_MONGOURL
"
,
None
)
return
mongoUrl
...
...
@@ -52,21 +55,21 @@ def initMongo(name):
workflowId
=
None
mongoUrl
=
getMongoUrl
()
if
mongoUrl
is
not
None
:
initiator
=
os
.
environ
.
get
(
'
PYPUSHFLOW_INITIATOR
'
,
'
Unknown
'
)
host
=
os
.
environ
.
get
(
'
PYPUSHFLOW_HOST
'
,
'
Unknown
'
)
port
=
os
.
environ
.
get
(
'
PYPUSHFLOW_PORT
'
,
'
Unknown
'
)
objectId
=
os
.
environ
.
get
(
'
PYPUSHFLOW_OBJECTID
'
,
str
(
bson
.
objectid
.
ObjectId
()))
initiator
=
os
.
environ
.
get
(
"
PYPUSHFLOW_INITIATOR
"
,
"
Unknown
"
)
host
=
os
.
environ
.
get
(
"
PYPUSHFLOW_HOST
"
,
"
Unknown
"
)
port
=
os
.
environ
.
get
(
"
PYPUSHFLOW_PORT
"
,
"
Unknown
"
)
objectId
=
os
.
environ
.
get
(
"
PYPUSHFLOW_OBJECTID
"
,
str
(
bson
.
objectid
.
ObjectId
()))
collection
=
pymongo
.
MongoClient
(
mongoUrl
).
pybes
.
pybes
workflowData
=
{
'
_id
'
:
bson
.
objectid
.
ObjectId
(
objectId
),
'
Request ID
'
:
objectId
,
'
startTime
'
:
getDateTimeString
(),
'
initiator
'
:
initiator
,
'
host
'
:
host
,
'
port
'
:
port
,
'
name
'
:
name
,
'
status
'
:
'
started
'
,
'
actors
'
:
[]
"
_id
"
:
bson
.
objectid
.
ObjectId
(
objectId
),
"
Request ID
"
:
objectId
,
"
startTime
"
:
getDateTimeString
(),
"
initiator
"
:
initiator
,
"
host
"
:
host
,
"
port
"
:
port
,
"
name
"
:
name
,
"
status
"
:
"
started
"
,
"
actors
"
:
[]
,
}
insertOneResult
=
collection
.
insert_one
(
workflowData
)
workflowId
=
insertOneResult
.
inserted_id
...
...
@@ -77,29 +80,29 @@ def setMongoStatus(workflowId, status):