Commit af0c9a30 authored by Matias Guijarro's avatar Matias Guijarro
Browse files

Merge branch '2833-killmask-and-interrupts' into 'master'

Refactor KillMask

See merge request !3816
parents b528a2ed 36a25f11
Pipeline #50586 failed with stages
in 116 minutes and 31 seconds
import sys
from contextlib import contextmanager
from functools import wraps
from contextlib import contextmanager
import logging
import asyncio
import aiogevent
from gevent import monkey
from gevent import greenlet, timeout, getcurrent
from gevent.timeout import string_types
from gevent import greenlet, timeout
import gevent
logger = logging.getLogger(__name__)
class KillMask:
"""All exceptions which are the result of a `kill(SomeException)`
call on the greenlet in which the KillMask context is entered,
will be DELAYED until the context exit. The only exception that
is not delayed is `gevent.Timeout`.
Upon exiting the `KillMask` context, only the last captured
exception is re-raised.
Optionally we can set a limit to the number of `kill` calls
that will be delayed. The prevent the possibility that a greenlet
can never be killed (see example below).
As this is entirely based on intercepting `kill` calls:
- this can never work in the main greenlet
- this only works in `BlissGreenlet` greenlets
Note: `KeyboardInterrupt` is always raised in the main greenlet,
never in other greenlets unless someone explicitely calls
`kill(KeyboardInterrupt)`. Apart from the later case, `KillMask`
does not delay `KeyboardInterrupt`.
A typical use case of `KillMask` is this:
def greenlet_main():
with KillMask(masked_kill_nb=1):
We assume this code does not run in the main greenlet. Whenever
you use a cooperative call, you could receive an exception in
`greenlet_main` originating from a `kill` on the executing greenlet.
The default exception is `GreenletExit` which is a `BaseException`.
In the exception could occur in three locations (assuming they
all use cooperative calls):
1. exception in <setup>: <cleanup> is not called
2. exception in <body>: <cleanup> is called thanks to try-finally
3. exception in <cleanup>: <cleanup> is fully executed thanks to `Killmask`
If <cleanup> is blocking, the `Killmask` prevents the executing
greenlet from being killed. Hence the usage of `masked_kill_nb=1`.
The first `kill` gets intercepted but the second `kill` does not.
def __init__(self, masked_kill_nb=-1):
masked_kill_nb: nb of masked kill
< 0 mean all kills are masked.
if > 0, at each kill attempt the counter decrements until 0, then the greenlet can be killed
masked_kill_nb: number of masked `kill` calls that will be delayed
> 0 this ammount of kills will be delayed
== 0 no kill is delayed
< 0 unlimited
self.__greenlet = gevent.getcurrent()
self.__kill_counter = masked_kill_nb
self.__masked_kill_nb = masked_kill_nb
self.__allowed_capture_nb = masked_kill_nb
self.__last_captured_exception = None
def _bliss_greenlet(self):
glt = gevent.getcurrent()
if isinstance(glt, BlissGreenlet):
return glt
elif glt.parent is not None:
logger.warning("KillMask will not work in the current greenlet: %s", glt)
return None
def __enter__(self):
self.__exception = None
MASKED_GREENLETS.setdefault(self.__greenlet, set()).add(self)
glt = self._bliss_greenlet
if glt is None:
self.__allowed_capture_nb = self.__masked_kill_nb
self.__last_captured_exception = None
def __exit__(self, exc_type, value, traceback):
if MASKED_GREENLETS[self.__greenlet]:
glt = self._bliss_greenlet
if glt is None:
if self.__exception is not None:
raise self.__exception
if not glt.kill_masks and self.__last_captured_exception is not None:
raise self.__last_captured_exception
def exception(self):
return self.__exception
def set_throw(self, exception):
if self.__kill_counter:
self.__exception = exception
else: # reach 0
self.__exception = None
cnt = self.__kill_counter
self.__kill_counter -= 1
return not cnt
def last_captured_exception(self):
return self.__last_captured_exception
def capture_exception(self, exception):
capture = bool(self.__allowed_capture_nb)
if capture:
self.__last_captured_exception = exception
self.__allowed_capture_nb -= 1
self.__last_captured_exception = None
return capture
......@@ -53,75 +116,111 @@ def AllowKill():
This will unmask the kill protection for the current greenlet.
current_greenlet = gevent.getcurrent()
previous_set_mask = MASKED_GREENLETS.pop(current_greenlet, set())
for killmask in previous_set_mask:
if killmask.exception:
raise killmask.exception
glt = gevent.getcurrent()
if isinstance(glt, BlissGreenlet):
with glt.disable_kill_masks() as kill_masks:
for kill_mask in kill_masks:
if kill_mask.last_captured_exception:
raise kill_mask.last_captured_exception
if previous_set_mask:
MASKED_GREENLETS[current_greenlet] = previous_set_mask
def protect_from_kill(fu):
def func(*args, **kwargs):
def protect_from_kill(method):
def wrapper(*args, **kwargs):
with KillMask():
return fu(*args, **kwargs)
return method(*args, **kwargs)
return func
return wrapper
def protect_from_one_kill(fu):
def func(*args, **kwargs):
def protect_from_one_kill(method):
def wrapper(*args, **kwargs):
with KillMask(masked_kill_nb=1):
return fu(*args, **kwargs)
return method(*args, **kwargs)
return func
return wrapper
# gevent.greenlet module patch
_ori_timeout = gevent.timeout.Timeout
_GeventTimeout = gevent.timeout.Timeout
_GeventGreenlet = greenlet.Greenlet
class BlissGreenlet(_GeventGreenlet):
"""The `KillMask``context can only work when entered in
a greenlet of type `BlissGreenlet`.
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
self.__kill_masks = set()
def kill_masks(self):
return self.__kill_masks
def disable_kill_masks(self):
kill_masks = self.__kill_masks
self.__kill_masks = set()
yield kill_masks
self.__kill_masks = kill_masks
class Greenlet(greenlet.Greenlet):
def throw(self, exception):
if isinstance(exception, gevent.timeout.Timeout):
# This is executed in the Hub which is the reason
# we cannot use gevent.local.local to store the
# kill masks for each greenlet.
if isinstance(exception, _GeventTimeout):
return super().throw(exception)
masks = MASKED_GREENLETS.get(self)
if masks:
for m in list(masks):
if m.set_throw(exception):
if self.__kill_masks:
captured_in_all_masks = True
for kill_mask in self.__kill_masks:
captured_in_all_masks &= kill_mask.capture_exception(exception)
if captured_in_all_masks:
def get(self, *args, **keys):
return super().get(*args, **keys)
except _ori_timeout as tmout:
t = Timeout(exception=tmout.exception)
raise t
except BlissTimeout:
except _GeventTimeout as tmout:
raise BlissTimeout(exception=tmout.exception)
# timeout patch
class Timeout(gevent.timeout.Timeout):
class BlissTimeout(_GeventTimeout):
"""KillMask can only work when timeouts are of type `BlissTimeout`.
def _on_expiration(self, prev_greenlet, ex):
if isinstance(prev_greenlet, Greenlet): # bliss greenlet
super(Greenlet, prev_greenlet).throw(ex)
else: # default
if isinstance(prev_greenlet, BlissGreenlet):
# Make sure the exception is not captured by
# a KillMask
super(BlissGreenlet, prev_greenlet).throw(ex)
def patch_gevent():
gevent.spawn = Greenlet.spawn
gevent.spawn_later = Greenlet.spawn_later
timeout.Timeout = Timeout
gevent.Timeout = Timeout
# For KillMask
gevent.spawn = BlissGreenlet.spawn
gevent.spawn_later = BlissGreenlet.spawn_later
timeout.Timeout = BlissTimeout
gevent.Timeout = BlissTimeout
# For backward compatibility
Greenlet = BlissGreenlet
Timeout = BlissTimeout
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment