Commit 481edcbf authored by payno's avatar payno

refactor wait_until to become wait and removing (wait_finished, wait_finished,...

refactor wait_until to become wait and removing (wait_finished, wait_finished, wait_started, wait_until, wait_states)
parent 39feae3b
......@@ -12,7 +12,7 @@ Hello world (minimal):
jobdef = oarjob.JobDefinition(command='echo "Hello word"')
job = jobdef.submit()
job.wait_finished()
job.wait()
if job.exit_code:
print('Failed:\n{}'.format(job.stderr))
......@@ -34,10 +34,10 @@ Hello world (specify resources and postpone execution):
command='echo "Hello word"',resource=resource)
job = jobdef.submit(hold=True)
job.wait_needsresume()
job.wait(states=('Hold', 'Suspended'))
# job is waiting for you to resume it
job.resume()
job.wait_finished()
job.wait()
if job.exit_code:
print('Failed:\n{}'.format(job.stderr))
......
......@@ -315,7 +315,7 @@
" job = definition(5).submit()\n",
" print(job)\n",
" print(\"Wait until finished ...\")\n",
" job.wait_finished()\n",
" job.wait()\n",
" print(job)\n",
" if job.exit_code:\n",
" print('Failed:\\n{}'.format(job.stderr))\n",
......@@ -382,12 +382,12 @@
" job = definition(5).submit(hold=True)\n",
" print(job)\n",
" print(\"Wait until enqueued ...\")\n",
" job.wait_needsresume()\n",
" job.wait(states=('Hold', 'Suspended'))\n",
" print(job)\n",
" print(\"Schedule job\")\n",
" job.resume()\n",
" print(\"Wait until finished ...\")\n",
" job.wait_finished()\n",
" job.wait()\n",
" print(job)\n",
" if job.exit_code:\n",
" print('Failed:\\n{}'.format(job.stderr))\n",
......@@ -525,7 +525,7 @@
" job = definition(60).submit()\n",
" print(job)\n",
" print(\"Wait until started ...\")\n",
" job.wait_started()\n",
" job.wait(states=('Running', 'Terminated', 'Error')\n",
" print(job)\n",
" print(\"Suspend job\")\n",
" try:\n",
......@@ -534,12 +534,12 @@
" print(\"This operation is currently not permitted\")\n",
" else:\n",
" print(\"Wait until suspended ...\")\n",
" job.wait_needsresume()\n",
" job.wait_needsresume(states=('Hold', 'Suspended'))\n",
" print(job)\n",
" print(\"Resume job\")\n",
" job.resume()\n",
" print(\"Wait until finished ...\")\n",
" job.wait_finished()\n",
" job.wait()\n",
" print(job)\n",
" if job.exit_code:\n",
" print('Failed:\\n{}'.format(job.stderr))\n",
......@@ -595,12 +595,12 @@
" job = definition(60).submit()\n",
" print(job)\n",
" print(\"Wait until started ...\")\n",
" job.wait_started()\n",
" job.wait(states=('Running', 'Terminated', 'Error'))\n",
" sleep(5)\n",
" print(\"Interrupt\")\n",
" job.interrupt()\n",
" print(\"Wait until finished ...\")\n",
" job.wait_finished()\n",
" job.wait()\n",
" print(job)\n",
" if job.exit_code:\n",
" print('Failed:\\n{}'.format(job.stderr))\n",
......
......@@ -27,7 +27,7 @@
"\n",
"jobdef = oarjob.JobDefinition(command='echo \"Hello word\"')\n",
"job = jobdef.submit()\n",
"job.wait_finished()\n",
"job.wait()\n",
"\n",
"if job.exit_code:\n",
" print('Failed:\\n{}'.format(job.stderr))\n",
......@@ -70,10 +70,10 @@
" command='echo \"Hello word\"',resource=resource)\n",
"\n",
"job = jobdef.submit(hold=True)\n",
"job.wait_needsresume()\n",
"job.wait(states=('Hold', 'Suspended'))\n",
"# job is waiting for you to resume it\n",
"job.resume()\n",
"job.wait_finished()\n",
"job.wait()\n",
"\n",
"if job.exit_code:\n",
" print('Failed:\\n{}'.format(job.stderr))\n",
......
......@@ -44,6 +44,10 @@ class Job(object):
"""Manage existing jobs but does not create new jobs
"""
_POSSIBLE_STATES = ('Launching', 'Resuming', 'Finishing', 'Terminated',
'Error', 'Running', 'Waiting', 'Hold', 'Suspended')
"""list all the possible state the job can take"""
def __init__(self, jobid):
"""
......@@ -122,10 +126,35 @@ class Job(object):
def __getitem__(self, key):
return self.stats.get(key, None)
def wait_until(self, until, refresh=1, silent=False):
def wait(self, states=('Terminated', 'Error'), timeout=None, refresh=1,
silent=False):
"""
:param str or tuple states: state or states we are waiting for
:param int or None timeout: if not none, second before the timeout.
:param int refresh: time (in second) between two observations of the job
state
:param silent: if False then write to stdout advancement ('.')
"""
# make sure _until will always be iterable (dea with str, bytes...)
_states = states
if isinstance(_states, (tuple, list, dict)) is False:
_states = (_states,)
for _state in _states:
if _state not in self._POSSIBLE_STATES:
raise ValueError('%s is not a valid state. Unable to wait for it' % _state)
_until = lambda: self.status in _states
waited_time = 0
newline = False
while not until():
while not _until():
sleep(refresh)
waited_time = waited_time + refresh
if timeout is not None:
if waited_time >= timeout:
logger.warning('wait on job %s has reach a timeout' % self.jobid)
return
if not silent:
sys.stdout.write('.')
sys.stdout.flush()
......@@ -133,21 +162,6 @@ class Job(object):
if newline:
print('')
def wait_states(self, states, **kwargs):
until = lambda: self.status in states
self.wait_until(until, **kwargs)
def wait_finished(self, **kwargs):
until = lambda: self.is_finished
self.wait_until(until, **kwargs)
def wait_started(self):
self.wait_states(['Running', 'Terminated', 'Error'])
def wait_needsresume(self, **kwargs):
until = lambda: self.needsresume
self.wait_until(until, **kwargs)
@property
def duration(self):
"""Time since scheduled
......
......@@ -84,7 +84,7 @@ class test_oarjob(unittest.TestCase):
jobdef,expected = self.definition(5,'immediate')
job = jobdef.submit()
self.assertEqual(job.definition,jobdef)
job.wait_finished(silent=True)
job.wait(silent=True)
self.assertEqual(job.exit_code,0)
self._check_output(job,expected)
......@@ -92,9 +92,9 @@ class test_oarjob(unittest.TestCase):
self.skipoar()
jobdef,expected = self.definition(5,'notimmediate')
job = jobdef.submit(hold=True)
job.wait_needsresume(silent=True)
job.wait_needsresume(states=('Hold', 'Suspended'), silent=True)
job.resume()
job.wait_finished(silent=True)
job.wait(silent=True)
self.assertEqual(job.exit_code,0)
self._check_output(job,expected)
......@@ -102,21 +102,21 @@ class test_oarjob(unittest.TestCase):
self.skipoar()
jobdef,expected = self.definition(60,'interrupt')
job = jobdef.submit()
job.wait_started(silent=True)
job.wait_started(states=('Running', 'Terminated', 'Error'), silent=True)
sleep(5)
job.interrupt()
job.wait_finished(silent=True)
job.wait(silent=True)
self.assertEqual(job.exit_code,None)
def test_suspend(self):
self.skipoar()
jobdef,expected = self.definition(60,'interrupt')
job = jobdef.submit()
job.wait_started(silent=True)
job.wait_started(states=('Running', 'Terminated', 'Error'), silent=True)
self.assertRaises(RuntimeError, job.suspend)
#job.wait_needsresume(silent=True)
#job.wait_needsresume(states=('Hold', 'Suspended'), silent=True)
#job.resume()
#job.wait_finished(silent=True)
#job.wait(silent=True)
#self.assertEqual(job.exit_code,0)
#self._check_output(job,expected)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment