Skip to content
Snippets Groups Projects
Commit 32001140 authored by Linus Pithan's avatar Linus Pithan Committed by Linus Pithan
Browse files

[killing scans] some tests that are failing

parent add2872c
No related merge requests found
......@@ -230,3 +230,153 @@ def test_exception_in_start_and_stop_and_preset(session, first_iteration, preset
assert isinstance(e, ZeroDivisionError)
else:
assert False
class EverMovingAxis:
def __init__(self):
self._position = 0
@property
def position(self):
return self._position
@position.setter
def position(self, value):
i = 1
while True:
print(f"\r{i}", end="")
i += 1
gevent.sleep(.1)
def test_scan_kill_during_pepare(default_session):
diode = default_session.config.get("diode")
ax = SoftAxis("TestAxis", EverMovingAxis())
# ~ with gevent.Timeout(1):
# ~ scans.ascan(ax,1,2,3,.1,diode,save=False)
s = scans.ascan(ax, 1, 2, 3, .1, diode, save=False, run=False)
g = gevent.spawn(s.run)
gevent.sleep(1)
print("greenlet kill")
g.kill(block=False)
# g.kill()
gevent.sleep(1)
assert g.successful() ## is g.successful a good test to do?
def test_scan_kill_during_pepare_timeout(default_session):
diode = default_session.config.get("diode")
ax = SoftAxis("TestAxis", EverMovingAxis())
# ~ with gevent.Timeout(1):
# ~ scans.ascan(ax,1,2,3,.1,diode,save=False)
s = scans.ascan(ax, 1, 2, 3, .1, diode, save=False, run=False)
g = gevent.spawn(s.run)
gevent.sleep(1)
print("send gevent.Timeout")
g.kill(gevent.Timeout, block=False)
gevent.sleep(1)
assert g.successful()
def test_scan_kill_during_pepare_greenlet(default_session):
diode = default_session.config.get("diode")
ax = SoftAxis("TestAxis", EverMovingAxis())
s = scans.ascan(ax, 1, 2, 3, .1, diode, save=False, run=False)
def foo(s):
with gevent.Timeout(.5):
s.run()
g = gevent.spawn(foo, s)
gevent.sleep(1)
assert s.state.name == "DONE" # needs to be changed to KILLED soon
g.kill()
assert s.state.name == "DONE" # needs to be changed to KILLED soon
def test_scan_kill_during_pepare_keyboardinterrupt(default_session):
diode = default_session.config.get("diode")
ax = SoftAxis("TestAxis", EverMovingAxis())
s = scans.ascan(ax, 1, 2, 3, .1, diode, save=False, run=False)
g = gevent.spawn(s.run)
gevent.sleep(1)
print("send KeyboardInterrupt")
g.kill(KeyboardInterrupt, block=False)
gevent.sleep(1)
assert g.successful()
class EverMovingAxis2:
def __init__(self):
self._position = 0
@property
def position(self):
return self._position
@position.setter
def position(self, value):
if value < 1:
self._postion = value
return
i = 1
while True:
print(f"\r{i}", end="")
i += 1
gevent.sleep(.1)
def test_scan_kill_during_running_keyboardinterrupt(default_session):
diode = default_session.config.get("diode")
ax = SoftAxis("TestAxis", EverMovingAxis2())
s = scans.ascan(ax, 1, 2, 3, .1, diode, save=False, run=False)
g = gevent.spawn(s.run)
gevent.sleep(1)
print("send KeyboardInterrupt")
g.kill(KeyboardInterrupt, block=False)
gevent.sleep(1)
assert g.successful()
class EverMovingAxis3:
def __init__(self):
self._position = 0
self._stop_flag = False
@property
def position(self):
return self._position
@position.setter
def position(self, value):
self._stop_flag = False
i = 1
while not self._stop_flag:
print(f"\r{i}", end="")
i += 1
gevent.sleep(0)
def stop(self):
self._stop_flag = True
def test_scan_kill_keyboardinterrupt_with_stop(default_session):
diode = default_session.config.get("diode")
ax = SoftAxis("TestAxis", EverMovingAxis3(), stop="stop")
s = scans.ascan(ax, 1, 2, 3, .1, diode, save=False, run=False)
g = gevent.spawn(s.run)
gevent.sleep(1)
print("send KeyboardInterrupt")
with pytest.raises(KeyboardInterrupt):
g.kill(KeyboardInterrupt, block=False)
gevent.sleep(1)
assert g.successful()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment