Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
workflow
pypushflow
Commits
9125b26f
Commit
9125b26f
authored
Dec 09, 2021
by
Wout De Nolf
Browse files
Merge branch 'logging_format' into 'main'
improve logging by adding log context (workflow name, actor name) See merge request
!21
parents
e5aa438f
7a458935
Pipeline
#61749
passed with stages
in 37 seconds
Changes
17
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
pypushflow/AbstractActor.py
View file @
9125b26f
...
...
@@ -23,12 +23,9 @@ __authors__ = ["O. Svensson"]
__license__
=
"MIT"
__date__
=
"28/05/2019"
import
logging
import
pprint
from
pypushflow.ThreadCountingActor
import
ThreadCountingActor
logger
=
logging
.
getLogger
(
"pypushflow"
)
class
AbstractActor
(
ThreadCountingActor
):
def
__init__
(
self
,
parent
=
None
,
name
=
None
,
**
kw
):
...
...
@@ -38,19 +35,18 @@ class AbstractActor(ThreadCountingActor):
self
.
started
=
False
self
.
finished
=
False
def
__str__
(
self
)
->
str
:
return
self
.
name
def
connect
(
self
,
actor
):
logger
.
debug
(
'Connecting actor "{0}" to actor "{1}"'
.
format
(
self
.
name
,
actor
.
name
)
)
self
.
logger
.
debug
(
"connect to actor '%s'"
,
actor
.
name
)
self
.
listDownStreamActor
.
append
(
actor
)
def
trigger
(
self
,
inData
):
self
.
logger
.
info
(
"triggered with inData =
\n
%s"
,
pprint
.
pformat
(
inData
))
self
.
setStarted
()
self
.
setFinished
()
for
actor
in
self
.
listDownStreamActor
:
logger
.
debug
(
'In actor "{0}", triggering actor "{1}"'
.
format
(
self
.
name
,
actor
.
name
)
)
actor
.
trigger
(
inData
)
def
uploadInDataToMongo
(
self
,
actorData
=
None
,
script
=
None
):
...
...
@@ -79,12 +75,12 @@ class AbstractActor(ThreadCountingActor):
return
self
.
started
def
setStarted
(
self
):
logger
.
debug
(
"Setting started of {0} to True"
.
format
(
self
.
name
)
)
self
.
logger
.
info
(
"started"
)
self
.
started
=
True
def
hasFinished
(
self
):
return
self
.
finished
def
setFinished
(
self
):
logger
.
debug
(
"Setting finished of {0} to True"
.
format
(
self
.
name
)
)
self
.
logger
.
info
(
"finished"
)
self
.
finished
=
True
pypushflow/JoinActor.py
View file @
9125b26f
...
...
@@ -23,6 +23,7 @@ __authors__ = ["O. Svensson"]
__license__
=
"MIT"
__date__
=
"28/05/2019"
import
pprint
from
pypushflow.AbstractActor
import
AbstractActor
...
...
@@ -36,6 +37,7 @@ class JoinActor(AbstractActor):
self
.
numberOfThreads
+=
1
def
trigger
(
self
,
inData
):
self
.
logger
.
info
(
"triggered with inData =
\n
%s"
,
pprint
.
pformat
(
inData
))
self
.
setStarted
()
self
.
setFinished
()
self
.
listInData
.
append
(
inData
)
...
...
pypushflow/PythonActor.py
View file @
9125b26f
...
...
@@ -26,7 +26,6 @@ __date__ = "28/05/2019"
import
os
import
time
import
pprint
import
logging
import
datetime
import
traceback
import
importlib
...
...
@@ -34,9 +33,7 @@ import multiprocessing
import
multiprocessing.pool
from
pypushflow.AbstractActor
import
AbstractActor
logger
=
logging
.
getLogger
(
"pypushflow"
)
from
pypushflow.logutils
import
PyPushflowLoggedObject
#############################################################################
...
...
@@ -72,21 +69,20 @@ class Edna2Pool(multiprocessing.pool.Pool):
#############################################################################
class
AsyncFactory
:
def
__init__
(
self
,
func
,
callback
=
None
,
errorCallback
=
None
):
class
AsyncFactory
(
PyPushflowLoggedObject
):
def
__init__
(
self
,
func
,
callback
=
None
,
errorCallback
=
None
,
parent
=
None
):
super
().
__init__
(
parent
=
parent
)
self
.
func
=
func
self
.
callback
=
callback
self
.
errorCallback
=
errorCallback
self
.
pool
=
Edna2Pool
(
1
)
def
call
(
self
,
*
args
,
**
kwargs
):
logger
.
debug
(
"Before apply_async, func=%s, callback=%s, errorCallback=%s"
,
self
.
func
,
self
.
callback
,
self
.
errorCallback
,
self
.
logger
.
debug
(
"asynchronous execution of '%s.%s'"
,
self
.
func
.
__module__
,
self
.
func
.
__name__
,
)
logger
.
debug
(
"args=%s, kwargs=%s"
,
args
,
kwargs
)
self
.
pool
.
apply_async
(
self
.
func
,
args
=
args
,
...
...
@@ -95,12 +91,6 @@ class AsyncFactory:
error_callback
=
self
.
errorCallback
,
)
self
.
pool
.
close
()
logger
.
debug
(
"After apply_async, func=%s, callback=%s, errorCallback=%s"
,
self
.
func
,
self
.
callback
,
self
.
errorCallback
,
)
class
PythonActor
(
AbstractActor
):
...
...
@@ -114,11 +104,12 @@ class PythonActor(AbstractActor):
self
.
inData
=
None
self
.
af
=
None
def
connectOnError
(
self
,
errorHandler
):
self
.
listErrorHandler
.
append
(
errorHandler
)
def
connectOnError
(
self
,
actor
):
self
.
logger
.
debug
(
"connect to error handler '%s'"
,
actor
.
name
)
self
.
listErrorHandler
.
append
(
actor
)
def
trigger
(
self
,
inData
:
dict
):
logger
.
debug
(
"In trigger %s,
inData = %s"
,
self
.
name
,
pprint
.
pformat
(
inData
))
self
.
logger
.
info
(
"triggered with
inData =
\n
%s"
,
pprint
.
pformat
(
inData
))
self
.
setStarted
()
self
.
inData
=
dict
(
inData
)
self
.
uploadInDataToMongo
(
actorData
=
{
"inData"
:
inData
},
script
=
self
.
script
)
...
...
@@ -126,7 +117,7 @@ class PythonActor(AbstractActor):
try
:
module
=
importlib
.
import_module
(
os
.
path
.
splitext
(
self
.
script
)[
0
])
except
Exception
as
e
:
logger
.
error
(
"Error when trying to import script '%s'"
,
self
.
script
)
self
.
logger
.
error
(
"Error when trying to import script '%s'"
,
self
.
script
)
time
.
sleep
(
1
)
self
.
errorHandler
(
e
)
return
...
...
@@ -136,7 +127,10 @@ class PythonActor(AbstractActor):
errorHandler
,
):
self
.
af
=
AsyncFactory
(
module
.
run
,
callback
=
resultHandler
,
errorCallback
=
errorHandler
module
.
run
,
callback
=
resultHandler
,
errorCallback
=
errorHandler
,
parent
=
self
,
)
self
.
af
.
call
(
**
self
.
inData
)
...
...
@@ -144,7 +138,6 @@ class PythonActor(AbstractActor):
"""Async callback in case of success"""
try
:
# Handle the result
logger
.
debug
(
"In resultHandler for '%s'"
,
self
.
name
)
self
.
_finishedSuccess
(
result
)
# Trigger actors
...
...
@@ -158,12 +151,7 @@ class PythonActor(AbstractActor):
"""Async callback in case of exception"""
try
:
# Handle the result
logger
.
debug
(
"In errorHandler for '%s'"
,
self
.
name
)
logger
.
error
(
"Error in python actor '%s'! Not running down stream actors %s"
,
self
.
name
,
[
actor
.
name
for
actor
in
self
.
listDownStreamActor
],
)
self
.
_logException
(
exception
)
result
=
self
.
_parseException
(
exception
)
self
.
_finishedFailure
(
result
)
...
...
@@ -172,18 +160,28 @@ class PythonActor(AbstractActor):
downstreamData
[
"WorkflowException"
]
=
result
self
.
_triggerErrorHandlers
(
downstreamData
)
except
Exception
:
logger
.
exception
(
"In errorHandler for '%s'"
,
self
.
name
)
self
.
logger
.
exception
(
"In errorHandler for '%s'"
,
self
.
name
)
def
_parseException
(
self
,
exception
:
Exception
)
->
dict
:
errorMessage
=
str
(
exception
)
def
_logException
(
self
,
exception
:
Exception
)
->
None
:
if
isinstance
(
exception
.
__cause__
,
multiprocessing
.
pool
.
RemoteTraceback
):
exception
=
exception
.
__cause__
log
ger
.
error
(
exception
)
log
func
=
self
.
logger
.
error
elif
isinstance
(
exception
,
multiprocessing
.
pool
.
MaybeEncodingError
):
# This exception has no traceback
log
ger
.
error
(
exception
)
log
func
=
self
.
logger
.
error
else
:
logger
.
exception
(
exception
)
logfunc
=
self
.
logger
.
exception
logfunc
(
"Error in python actor '%s'!
\n
Not running down stream actors %s
\n
Exception:%s"
,
self
.
name
,
[
actor
.
name
for
actor
in
self
.
listDownStreamActor
],
exception
,
)
def
_parseException
(
self
,
exception
:
Exception
)
->
dict
:
errorMessage
=
str
(
exception
)
if
isinstance
(
exception
.
__cause__
,
multiprocessing
.
pool
.
RemoteTraceback
):
exception
=
exception
.
__cause__
traceBack
=
traceback
.
format_exception
(
type
(
exception
),
exception
,
exception
.
__traceback__
)
...
...
@@ -194,11 +192,10 @@ class PythonActor(AbstractActor):
def
_triggerDownStreamActors
(
self
,
downstreamData
:
dict
):
for
downStreamActor
in
self
.
listDownStreamActor
:
logger
.
debug
(
"In trigger %s, triggering actor %s, inData=%s"
,
self
.
name
,
self
.
logger
.
debug
(
"trigger actor '%s' with inData =
\n
%s"
,
downStreamActor
.
name
,
downstreamData
,
pprint
.
pformat
(
downstreamData
)
,
)
downStreamActor
.
trigger
(
downstreamData
)
...
...
@@ -206,9 +203,6 @@ class PythonActor(AbstractActor):
for
errorHandler
in
self
.
listErrorHandler
:
errorHandler
.
trigger
(
downstreamData
)
if
self
.
parentErrorHandler
is
not
None
:
logger
.
error
(
"Trigger on error on errorHandler '%s'"
,
self
.
parentErrorHandler
.
name
)
self
.
parentErrorHandler
.
triggerOnError
(
inData
=
downstreamData
)
def
_finishedSuccess
(
self
,
result
:
dict
):
...
...
pypushflow/RouterActor.py
View file @
9125b26f
...
...
@@ -23,12 +23,9 @@ __authors__ = ["O. Svensson"]
__license__
=
"MIT"
__date__
=
"28/05/2019"
import
pprint
from
pypushflow.AbstractActor
import
AbstractActor
import
logging
logger
=
logging
.
getLogger
(
"pypushflow"
)
class
RouterActor
(
AbstractActor
):
def
__init__
(
...
...
@@ -38,7 +35,7 @@ class RouterActor(AbstractActor):
name
=
"Router"
,
itemName
=
None
,
listPort
=
None
,
**
kw
**
kw
,
):
super
().
__init__
(
parent
=
parent
,
name
=
name
,
**
kw
)
self
.
errorHandler
=
errorHandler
...
...
@@ -53,9 +50,7 @@ class RouterActor(AbstractActor):
def
connect
(
self
,
actor
,
expectedValue
=
"other"
):
if
expectedValue
!=
"other"
and
expectedValue
not
in
self
.
listPort
:
raise
RuntimeError
(
"Port {0} not defined for router actor {1}!"
.
format
(
expectedValue
,
self
.
name
)
f
"Port
{
expectedValue
}
not defined for router actor
{
self
.
name
}
!"
)
if
expectedValue
in
self
.
dictValues
:
self
.
dictValues
[
expectedValue
].
append
(
actor
)
...
...
@@ -63,18 +58,14 @@ class RouterActor(AbstractActor):
self
.
dictValues
[
expectedValue
]
=
[
actor
]
def
trigger
(
self
,
inData
):
logger
.
debug
(
'In router actor "{0}"'
.
format
(
self
.
name
))
self
.
logger
.
info
(
"triggered with inData =
\n
%s"
,
pprint
.
pformat
(
inData
))
self
.
setStarted
()
self
.
setFinished
()
listActor
=
None
if
self
.
itemName
in
inData
:
logger
.
debug
(
'In router actor "{0}", itemName {1} in inData'
.
format
(
self
.
name
,
self
.
itemName
)
)
self
.
logger
.
debug
(
"router item = '%s'"
,
self
.
itemName
)
value
=
inData
[
self
.
itemName
]
logger
.
debug
(
'In
router
actor "{0}", value = {1}'
.
format
(
self
.
n
ame
,
value
)
)
self
.
logger
.
debug
(
"
router
item value = %s"
,
self
.
itemN
ame
,
value
)
if
value
in
[
None
,
"None"
,
"null"
]:
value
=
"null"
elif
type
(
value
)
==
bool
:
...
...
@@ -85,17 +76,10 @@ class RouterActor(AbstractActor):
if
not
isinstance
(
value
,
dict
)
and
value
in
self
.
dictValues
:
listActor
=
self
.
dictValues
[
value
]
if
listActor
is
None
:
logger
.
debug
(
'In
router
actor "{0}", actor is None'
)
self
.
logger
.
debug
(
"no
router
destinations for inData"
)
if
"other"
in
self
.
dictValues
:
listActor
=
self
.
dictValues
[
"other"
]
else
:
raise
RuntimeError
(
'No "other" port for router actor "{0}"'
.
format
(
self
.
name
)
)
raise
RuntimeError
(
f
"No 'other' port for router actor '
{
self
.
name
}
'"
)
for
actor
in
listActor
:
logger
.
debug
(
'In router actor "{0}", triggering actor "{1}"'
.
format
(
self
.
name
,
actor
.
name
)
)
actor
.
trigger
(
inData
)
pypushflow/StopActor.py
View file @
9125b26f
...
...
@@ -23,13 +23,10 @@ __authors__ = ["O. Svensson"]
__license__
=
"MIT"
__date__
=
"28/05/2019"
import
logging
import
pprint
from
pypushflow
import
Submodel
from
pypushflow.ThreadCountingActor
import
ThreadCountingActor
logger
=
logging
.
getLogger
(
"pypushflow"
)
class
StopActor
(
ThreadCountingActor
):
def
__init__
(
self
,
parent
=
None
,
errorHandler
=
None
,
name
=
"Stop actor"
,
**
kw
):
...
...
@@ -38,9 +35,7 @@ class StopActor(ThreadCountingActor):
self
.
outData
=
None
def
trigger
(
self
,
inData
):
logger
.
debug
(
"In trigger {0}, errorHandler = {1}"
.
format
(
self
.
name
,
self
.
errorHandler
)
)
self
.
logger
.
info
(
"triggered with inData =
\n
%s"
,
pprint
.
pformat
(
inData
))
if
self
.
parent
is
not
None
and
not
isinstance
(
self
.
parent
,
Submodel
.
Submodel
):
# Parent is a Workflow
self
.
outData
=
inData
...
...
@@ -51,18 +46,10 @@ class StopActor(ThreadCountingActor):
def
join
(
self
,
timeout
=
7200
):
if
self
.
parent
is
not
None
:
logger
.
debug
(
"In {0}, parent {1}, before wait_threads_finished"
.
format
(
self
.
name
,
self
.
parent
.
name
)
)
self
.
logger
.
debug
(
"wait for scheduler threads to be finished"
)
success
=
self
.
_wait_threads_finished
(
timeout
=
timeout
)
if
self
.
parent
is
not
None
:
logger
.
debug
(
"In {0}, parent {1}, after wait_threads_finished"
.
format
(
self
.
name
,
self
.
parent
.
name
)
)
self
.
logger
.
debug
(
"scheduler threads are finished"
)
self
.
_finalizeInMongo
(
success
)
return
success
...
...
@@ -70,14 +57,8 @@ class StopActor(ThreadCountingActor):
if
self
.
parent
is
None
:
return
if
success
:
logger
.
debug
(
"In {0}, parent {1}, finished"
.
format
(
self
.
name
,
self
.
parent
.
name
)
)
self
.
logger
.
debug
(
"finished"
)
self
.
parent
.
setStatus
(
"finished"
)
else
:
logger
.
error
(
"In {0}, parent {1}, timeout detected"
.
format
(
self
.
name
,
self
.
parent
.
name
)
)
self
.
logger
.
error
(
"timeout detected"
)
self
.
parent
.
setStatus
(
"timeout"
)
pypushflow/Submodel.py
View file @
9125b26f
...
...
@@ -24,11 +24,9 @@ __license__ = "MIT"
__date__
=
"28/05/2019"
import
logg
in
g
import
ppr
in
t
from
pypushflow.ThreadCountingActor
import
ThreadCountingActor
logger
=
logging
.
getLogger
(
"pypushflow"
)
from
pypushflow.logutils
import
PyPushflowLoggedObject
class
Port
(
ThreadCountingActor
):
...
...
@@ -39,32 +37,35 @@ class Port(ThreadCountingActor):
self
.
inPortTrigger
=
None
def
connect
(
self
,
actor
):
logger
.
debug
(
"
C
onnect
{0} -> actorName {1}"
.
format
(
self
.
name
,
actor
.
name
)
)
self
.
logger
.
debug
(
"
c
onnect
ig to '%s'"
,
actor
.
name
)
self
.
listActor
.
append
(
actor
)
def
setTrigger
(
self
,
trigger
):
self
.
inPortTrigger
=
trigger
def
trigger
(
self
,
*
args
,
**
kwargs
):
logger
.
debug
(
"In {0} trigger"
.
format
(
self
.
name
))
def
trigger
(
self
,
inData
):
self
.
logger
.
info
(
"triggered with inData =
\n
%s"
,
pprint
.
pformat
(
inData
),
)
if
len
(
self
.
listActor
)
>
0
:
for
actor
in
self
.
listActor
:
logger
.
debug
(
"In trigger
{0}
-> actorName
{1}"
.
format
(
self
.
errorHandler
.
name
,
actor
.
name
)
self
.
logger
.
debug
(
"In trigger
'%s'
-> actorName
'%s'"
,
self
.
errorHandler
.
name
,
actor
.
name
,
)
actor
.
trigger
(
*
args
,
**
kwargs
)
actor
.
trigger
(
inData
)
if
self
.
inPortTrigger
is
not
None
:
logger
.
debug
(
"In
{0}
trigger, trigger =
{1}"
.
format
(
self
.
errorHandler
.
name
,
self
.
inPortTrigger
)
self
.
logger
.
debug
(
"In
'%s'
trigger, trigger =
'%s'"
,
self
.
errorHandler
.
name
,
self
.
inPortTrigger
,
)
self
.
inPortTrigger
(
*
args
,
**
kwargs
)
self
.
inPortTrigger
(
inData
)
class
Submodel
:
class
Submodel
(
PyPushflowLoggedObject
)
:
def
__init__
(
self
,
parent
=
None
,
...
...
@@ -73,6 +74,7 @@ class Submodel:
portNames
=
[
"In"
,
"Out"
],
thread_counter
=
None
,
):
super
().
__init__
(
log_metadata
=
{
"submodel"
:
name
},
parent
=
parent
)
self
.
parent
=
parent
self
.
name
=
name
if
errorHandler
is
None
:
...
...
@@ -97,38 +99,23 @@ class Submodel:
return
self
.
parent
.
getActorPath
()
+
"/"
+
self
.
name
.
replace
(
"%"
,
" "
)
def
getPort
(
self
,
portName
):
logger
.
debug
(
"In {0} getPort, portName = {1}"
.
format
(
self
.
name
,
portName
))
return
self
.
dictPort
[
portName
]
def
connect
(
self
,
actor
,
portName
=
"Out"
):
logger
.
debug
(
"In {0} connect, portName = {2} -> actorName = {1}"
.
format
(
self
.
name
,
actor
.
name
,
portName
)
)
self
.
dictPort
[
portName
].
connect
(
actor
)
def
connectOnError
(
self
,
actor
):
logger
.
debug
(
"In connectOnError in subModule {0}, actor name {1}"
.
format
(
self
.
name
,
actor
.
name
)
)
self
.
logger
.
debug
(
"connect to error handler '%s'"
,
actor
.
name
)
self
.
listOnErrorActor
.
append
(
actor
)
def
triggerOnError
(
self
,
*
args
,
**
kwargs
):
def
triggerOnError
(
self
,
inData
):
self
.
logger
.
info
(
"triggered due to error with inData =
\n
%s"
,
pprint
.
pformat
(
inData
)
)
for
onErrorActor
in
self
.
listOnErrorActor
:
logger
.
debug
(
"In triggerOnError in subModule {0}, trigger actor {1}, inData = {2}"
.
format
(
self
.
name
,
onErrorActor
.
name
,
args
[
0
]
)
)
onErrorActor
.
trigger
(
*
args
,
**
kwargs
)
onErrorActor
.
trigger
(
inData
)
if
self
.
errorHandler
is
not
None
:
logger
.
error
(
'Trigger on error on errorHandler "{0}"'
.
format
(
self
.
errorHandler
.
name
)
)
self
.
errorHandler
.
triggerOnError
(
*
args
,
**
kwargs
)
self
.
errorHandler
.
triggerOnError
(
inData
)
def
addActorRef
(
self
,
actorRef
):
if
self
.
parent
is
not
None
:
...
...
pypushflow/ThreadCounter.py
View file @
9125b26f
import
logging
from
threading
import
Condition
from
pypushflow.logutils
import
PyPushflowLoggedObject
logger
=
logging
.
getLogger
(
"pypushflow"
)
class
ThreadCounter
:
class
ThreadCounter
(
PyPushflowLoggedObject
):
"""Scheduling thread counter"""
def
__init__
(
self
):
def
__init__
(
self
,
parent
=
None
):
self
.
__counter
=
0
self
.
__condition
=
Condition
()
super
().
__init__
(
parent
=
parent
)
def
start_thread
(
self
,
msg
=
None
):
with
self
.
__condition
:
...
...
@@ -48,4 +46,4 @@ class ThreadCounter:
def
_log_counter_change
(
self
,
msg
=
None
):
if
msg
is
None
:
msg
=
"Thread counter changed"
logger
.
debug
(
"%s (%d threads running)"
,
msg
,
self
.
__counter
)
self
.
logger
.
debug
(
"%s (%d threads running)"
,
msg
,
self
.
__counter
)
pypushflow/ThreadCountingActor.py
View file @
9125b26f
from
functools
import
wraps
from
contextlib
import
contextmanager
from
pypushflow.logutils
import
PyPushflowLoggedObject
def
with_thread_context
(
trigger
):
...
...
@@ -26,7 +27,7 @@ def callback_with_end_thread(async_callback, end_thread, log_msg):
return
wrapper
class
ThreadCountingActor
:
class
ThreadCountingActor
(
PyPushflowLoggedObject
)
:
"""The `trigger` method of will increase the thread counter
at the start and decrease the thread counter at the end.
"""
...
...
@@ -36,6 +37,7 @@ class ThreadCountingActor:
raise
RuntimeError
(
"Actor name is None!"
)
if
thread_counter
is
None
:
raise
ValueError
(
"Actor requires a 'thread_counter' argument"
)
super
().
__init__
(
log_metadata
=
{
"actor"
:
name
},
parent
=
parent
)
self
.
name
=
name
self
.
parent
=
parent
if
parent
is
not
None
:
...
...
@@ -101,5 +103,5 @@ class ThreadCountingActor:
def
_wait_threads_finished
(
self
,
**
kw
):
return
self
.
__thread_counter
.
wait_threads_finished
(
**
kw
)
def
trigger
(
self
,
**
kw
):
def
trigger
(
self
,
inData
):
raise
NotImplementedError
pypushflow/Workflow.py
View file @
9125b26f
...
...
@@ -24,79 +24,43 @@ __license__ = "MIT"
__date__
=
"28/05/2019"
import
os
import
pprint
import
logging
import
pathlib
import
logging.handlers
from
pypushflow.persistence
import
db_client
from
pypushflow.logutils
import
PyPushflowLoggedObject
from
pypushflow.logutils
import
basicConfig
class
Workflow
(
o
bject
):
class
Workflow
(
PyPushflowLoggedO
bject
):
def
__init__
(
self
,
name
):
basicConfig
(
filename
=
name
)
super
().
__init__
(
log_metadata
=
{
"workflow"
:
name
})
self
.
logger
.
info
(
"
\n\n
Starting new workflow %s
\n
"
,
name
)
self
.
name
=
name
self
.
listOnErrorActor
=
[]
self
.
db_client
=
db_client
()
self
.
db_client
.
startWorkflow
(
name
)
self
.
listActorRef
=
[]
self
.
logger
=
self
.
initLogger
(
name
)
def
connectOnError
(
self
,
actor
):
self
.
logger
.
debug
(
"In Workflow '{0}' connectOnError, actor name {1}"
.
format
(
self
.
name
,
actor
.
name
)
)
self
.
logger
.
debug
(
"connect to error handler '%s'"
,
actor
.
name
)
self
.
listOnErrorActor
.
append
(
actor
)
def
triggerOnError
(
self
,
inData
):
self
.
logger
.
debug
(
"In Workflow '{0}' triggerOnError, inData:"
.
format
(
self
.
name
))
self
.
logger
.
debug
(
pprint
.
pformat
(
inData
))
self
.
logger
.
info
(
"triggered due to error with inData =
\n
%s"
,
pprint
.
pformat
(
inData
)
)
for
onErrorActor
in
self
.
listOnErrorActor
:
self
.
logger
.
debug
(
"In Workflow '{0}' triggerOnError, triggering actor name {1}"
.
format
(
self
.
name
,
onErrorActor
.
name
)
)
onErrorActor
.
trigger
(
inData
)
def
getActorPath
(
self
):
return
"/"
+
self
.
name