Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
workflow
pypushflow
Commits
46eaefd8
Commit
46eaefd8
authored
Jun 29, 2021
by
Wout De Nolf
Browse files
Add ThreadCountingActor
parent
48e05e96
Changes
2
Hide whitespace changes
Inline
Side-by-side
pypushflow/ThreadCountingActor.py
0 → 100644
View file @
46eaefd8
from
functools
import
wraps
from
contextlib
import
contextmanager
def
with_thread_context
(
trigger
):
"""Wraps the `trigger` method of all derived classes of ThreadCountingActor"""
@
wraps
(
trigger
)
def
wrapper
(
self
,
*
args
,
**
kw
):
with
self
.
_thread_context
():
return
trigger
(
self
,
*
args
,
**
kw
)
return
wrapper
def
callback_with_end_thread
(
async_callback
,
end_thread
,
log_msg
):
"""Wraps a async_callback"""
@
wraps
(
async_callback
)
def
wrapper
(
*
args
,
**
kw
):
try
:
return
async_callback
(
*
args
,
**
kw
)
finally
:
end_thread
(
msg
=
log_msg
)
return
wrapper
class
ThreadCountingActor
:
"""The `trigger` method of will increase the thread counter
at the start and decrease the thread counter at the end.
"""
def
__init__
(
self
,
name
=
None
,
parent
=
None
,
thread_counter
=
None
):
if
name
is
None
:
raise
RuntimeError
(
"Actor name is None!"
)
if
thread_counter
is
None
:
raise
ValueError
(
"Actor requires a 'thread_counter' argument"
)
self
.
name
=
name
self
.
parent
=
parent
if
parent
is
not
None
:
parent
.
addActorRef
(
self
)
self
.
__thread_counter
=
thread_counter
self
.
__in_thread_context
=
False
self
.
__postpone_end_thread
=
False
def
__init_subclass__
(
subcls
,
**
kw
):
"""Wrap the `trigger` method"""
super
().
__init_subclass__
(
**
kw
)
subcls
.
trigger
=
with_thread_context
(
subcls
.
trigger
)
@
contextmanager
def
_thread_context
(
self
):
"""Re-entrant context manager that starts a thread
on first entrance and ends a thread on last exit,
unless the thread ending is post-poned until after
and async callback.
"""
if
self
.
__in_thread_context
:
yield
return
self
.
__thread_counter
.
start_thread
(
msg
=
"Thread started for "
+
repr
(
self
.
name
))
try
:
self
.
__in_thread_context
=
True
self
.
__postpone_end_thread
=
False
try
:
yield
finally
:
self
.
__in_thread_context
=
False
finally
:
if
self
.
__postpone_end_thread
:
self
.
__postpone_end_thread
=
False
else
:
self
.
__thread_counter
.
end_thread
(
msg
=
"Thread ended for "
+
repr
(
self
.
name
)
)
@
contextmanager
def
_postpone_end_thread
(
self
,
*
async_callbacks
):
"""Post-pone thread ending until after an async callback is executed.
Only one of the async callbacks is expected to be called.
"""
if
self
.
__in_thread_context
:
self
.
__postpone_end_thread
=
True
try
:
async_callbacks
=
tuple
(
callback_with_end_thread
(
async_callback
,
self
.
__thread_counter
.
end_thread
,
"Thread ended for "
+
repr
(
self
.
name
),
)
for
async_callback
in
async_callbacks
)
yield
async_callbacks
except
BaseException
:
if
self
.
__in_thread_context
:
self
.
__postpone_end_thread
=
False
raise
def
_wait_threads_finished
(
self
,
**
kw
):
return
self
.
__thread_counter
.
wait_threads_finished
(
**
kw
)
def
trigger
(
self
,
**
kw
):
raise
NotImplementedError
pypushflow/test/test_threadcounteractor.py
0 → 100644
View file @
46eaefd8
import
unittest
from
time
import
sleep
from
threading
import
Lock
from
pypushflow.ThreadCountingActor
import
ThreadCountingActor
from
pypushflow.ThreadCounter
import
ThreadCounter
from
concurrent.futures
import
ThreadPoolExecutor
class
Counter
:
def
__init__
(
self
):
self
.
value
=
0
self
.
_lock
=
Lock
()
def
increment
(
self
):
with
self
.
_lock
:
self
.
value
+=
1
class
MyThreadCountingActor
(
ThreadCountingActor
):
def
__init__
(
self
,
thread_counter
,
downstream_actors
=
tuple
()):
super
().
__init__
(
thread_counter
=
thread_counter
,
name
=
"MyThreadCountingActor"
)
self
.
downstream_actors
=
downstream_actors
def
trigger
(
self
,
state
):
sleep
(
0.01
)
if
not
self
.
downstream_actors
:
state
[
"ntasks"
].
increment
()
for
actor
in
self
.
downstream_actors
:
actor
.
trigger
(
state
)
class
TestThreadCountingActor
(
unittest
.
TestCase
):
def
setUp
(
self
):
self
.
thread_counter
=
ThreadCounter
()
def
test_multiple_threads
(
self
):
workers1
=
[
MyThreadCountingActor
(
self
.
thread_counter
)
for
_
in
range
(
3
)]
workers2
=
[
MyThreadCountingActor
(
self
.
thread_counter
,
workers1
)
for
_
in
range
(
5
)
]
workers3
=
[
MyThreadCountingActor
(
self
.
thread_counter
,
workers2
)
for
_
in
range
(
10
)
]
state
=
{
"ntasks"
:
Counter
()}
with
ThreadPoolExecutor
(
max_workers
=
10
)
as
executor
:
results
=
executor
.
map
(
lambda
w
:
w
.
trigger
(
state
),
workers3
)
self
.
thread_counter
.
wait_threads_finished
()
self
.
assertEqual
(
state
[
"ntasks"
].
value
,
150
)
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment