Commit 7d1f81d4 authored by Wout De Nolf's avatar Wout De Nolf

Merge branch 'feat-oarproperties' into 'master'

Feat oarproperties

See merge request !2
parents 29c77a38 65430774
......@@ -34,7 +34,7 @@ Hello world (specify resources and postpone execution):
command='echo "Hello word"',resource=resource)
job = jobdef.submit(hold=True)
job.wait(states=('Hold', 'Suspended'))
job.wait(states='Hold')
# job is waiting for you to resume it
job.resume()
job.wait()
......
......@@ -33,6 +33,8 @@
# ones.
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.mathjax',
'nbsphinx'
]
# Add any paths that contain templates here, relative to this directory.
......@@ -56,9 +58,9 @@ author = 'de nolf'
# built documents.
#
# The short X.Y version.
version = '0.1'
version = '0.0.1'
# The full version, including alpha/beta/rc tags.
release = '0.1'
release = '0.0.1'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
......@@ -70,7 +72,7 @@ language = None
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This patterns also effect to html_static_path and html_extra_path
exclude_patterns = []
exclude_patterns = ['_build','_templates','_autosummary','.ipynb_checkpoints']
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
......
......@@ -8,11 +8,17 @@ Welcome to oarpy's documentation!
.. toctree::
:maxdepth: 2
:caption: Contents:
modules.rst
:hidden:
modules
tutorials
:doc:`tutorials`
Some examples on how to use the library
:doc:`modules`
API documentation
Indices and tables
==================
......
Tutorials
=========
.. toctree::
:maxdepth: 3
tutorials/quickstart.rst
tutorials/oarjob.rst
This diff is collapsed.
......@@ -4,7 +4,12 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"Hello world (minimal):"
"# Quick start\n",
"\n",
"## Hello world\n",
"OAR job that prints \"Hello world\" to the standard output.\n",
"\n",
"### Minimal"
]
},
{
......@@ -16,9 +21,10 @@
"name": "stdout",
"output_type": "stream",
"text": [
".......\n",
"Interrupted:\n",
"None\n"
"......................\n",
"Succes:\n",
"Hello word\n",
"\n"
]
}
],
......@@ -42,7 +48,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"Hello world (specify resources and postpone execution):"
"### With resources and postponed execution"
]
},
{
......@@ -54,7 +60,7 @@
"name": "stdout",
"output_type": "stream",
"text": [
"........\n",
"......................................\n",
"Succes:\n",
"Hello word\n",
"\n"
......@@ -65,12 +71,12 @@
"from oarpy import oarjob\n",
"from oarpy.oarresource import Resource\n",
"\n",
"resource = Resource(core=1,walltime={'hours':1},gpu=False)\n",
"resource = Resource(core=1,walltime={'hours':1,'minutes':30},gpu=False)\n",
"jobdef = oarjob.JobFactory(name='helloworld',project='oarpy',\n",
" command='echo \"Hello word\"',resource=resource)\n",
"\n",
"job = jobdef.submit(hold=True)\n",
"job.wait(states=('Hold', 'Suspended'))\n",
"job.wait(states='Hold')\n",
"# job is waiting for you to resume it\n",
"job.resume()\n",
"job.wait()\n",
......@@ -88,6 +94,8 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"## Job management\n",
"\n",
"Find all jobs started in the last 5 minutes:"
]
},
......@@ -100,8 +108,8 @@
"name": "stdout",
"output_type": "stream",
"text": [
"[Job(1103715), Job(1103724), Job(1103725), Job(1103726), Job(1103727), Job(976220)]\n",
"set([(u'denolf', u'Terminated'), (u'ljacques', u'Error'), (u'piault', u'Running'), (u'sallazda', u'Running')])\n"
"[Job(1122759), Job(1122760), Job(1124796), Job(1125514), Job(1125515), Job(1125516), Job(976220)]\n",
"set([(u'denolf', u'Finishing'), (u'denolf', u'Error'), (u'in1081', u'Running'), (u'denolf', u'Terminated'), (u'ljacques', u'Error'), (u'in1096', u'Waiting')])\n"
]
}
],
......@@ -109,7 +117,7 @@
"from oarpy import oarjob\n",
"from oarpy import timeutils\n",
"\n",
"start = timeutils.add(timeutils.now(),minutes=-5)\n",
"start = timeutils.add(timeutils.now(),minutes=-1)\n",
"jobs = oarjob.search(start=start)\n",
"print(sorted(jobs))\n",
"print(set([(job['owner'],job.status) for job in jobs]))"
......@@ -141,10 +149,10 @@
" (((((gpu='YES') AND desktop_computing = 'NO') AND cluster = 'NICE') AND opsys = 'debian8') AND interactive = 'MIXED') AND drain='NO'\n",
"\n",
"Job resource:\n",
" -l host=1/core=16,walltime=16:00:00 -p gpu='YES'\n",
" -l nodes=1/core=16,walltime=16:00:00 -p \"gpu='YES' and drain='NO'\"\n",
"\n",
"Job definition:\n",
" -n Gecko_2_0p7um_2474_3031__001_.par --project default -d /mntdirect/_data_visitor/md1189/id17/GeckosHR/Gecko_2/Gecko_2_0p7um_2474_3031__001_/Slices -O OAR.Gecko_2_0p7um_2474_3031__001_.par.%jobid%.stdout -E OAR.Gecko_2_0p7um_2474_3031__001_.par.%jobid%.stderr -l host=1/core=16,walltime=16:00:00 -p gpu='YES' /mntdirect/_data_visitor/md1189/id17/GeckosHR/Gecko_2/Gecko_2_0p7um_2474_3031__001_/Slices//./tmpmd1189.sh\n",
" -n Gecko_2_0p7um_2474_3031__001_.par --project default -d /mntdirect/_data_visitor/md1189/id17/GeckosHR/Gecko_2/Gecko_2_0p7um_2474_3031__001_/Slices -O OAR.Gecko_2_0p7um_2474_3031__001_.par.%jobid%.stdout -E OAR.Gecko_2_0p7um_2474_3031__001_.par.%jobid%.stderr -l nodes=1/core=16,walltime=16:00:00 -p \"gpu='YES' and drain='NO'\" /mntdirect/_data_visitor/md1189/id17/GeckosHR/Gecko_2/Gecko_2_0p7um_2474_3031__001_/Slices//./tmpmd1189.sh\n",
"\n"
]
}
......@@ -155,9 +163,10 @@
"job = oarjob.Job(1103714)\n",
"if job.exists:\n",
" jobdef = job.definition\n",
" print('Initial request:\\n {}\\n'.format(job.stats['initial_request']))\n",
" print('Wanted resources:\\n {}\\n'.format(job.stats['wanted_resources']))\n",
" print('Properties:\\n {}\\n'.format(job.stats['properties']))\n",
" stats = job.stats\n",
" print('Initial request:\\n {}\\n'.format(stats['initial_request']))\n",
" print('Wanted resources:\\n {}\\n'.format(stats['wanted_resources']))\n",
" print('Properties:\\n {}\\n'.format(stats['properties']))\n",
" print('Job resource:\\n {}\\n'.format(jobdef.resource))\n",
" print('Job definition:\\n {}\\n'.format(jobdef))"
]
......@@ -166,7 +175,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"Remove the log files of all finished jobs of a particular user and project:"
"Remove the log files of all succesfully finished jobs of a particular user and project:"
]
},
{
......@@ -191,14 +200,14 @@
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.2"
"pygments_lexer": "ipython2",
"version": "2.7.9"
}
},
"nbformat": 4,
......
......@@ -55,7 +55,7 @@ class Job(object):
"""
self._jobid = jobid
self._fixed_stats = None
"""Used to store store stats when using the `fixed_stats` context
"""Used to store stats when using the `fixed_stats` context
manager"""
@property
......@@ -119,9 +119,12 @@ class Job(object):
@contextmanager
def fixed_stats(self):
self._fixed_stats = self.stats
first = not bool(self._fixed_stats)
if first:
self._fixed_stats = self.stats
yield
self._fixed_stats = None
if first:
self._fixed_stats = None
def __getitem__(self, key):
return self.stats.get(key, None)
......@@ -130,13 +133,15 @@ class Job(object):
silent=False):
"""
:param str or tuple states: state or states we are waiting for
:param int or None timeout: if not none, second before the timeout.
:param int refresh: time (in second) between two observations of the job
:param states: state or states we are waiting for
:type states: str or tuple
:param timeout: if not none, second before the timeout.
:type timeout: int or None
:param int refresh: time (in seconds) between two observations of the job
state
:param silent: if False then write to stdout advancement ('.')
"""
# make sure _until will always be iterable (dea with str, bytes...)
# make sure _until will always be iterable (deal with str, bytes...)
_states = states
if isinstance(_states, (tuple, list, dict)) is False:
_states = (_states,)
......@@ -163,16 +168,32 @@ class Job(object):
print('')
@property
def duration(self):
"""Time since scheduled
def runtime(self):
"""Effective execution time, excluding queue time
"""
return self._time_diff('scheduledStart', 'stopTime')
dt = self._time_diff('stopTime', 'startTime')
return max(dt, datetime.timedelta(seconds=0))
@property
def runtime(self):
"""Time since started
def time_scheduled(self):
"""Time this job was scheduled, excluding queue and runtime
"""
dt = self._time_diff('startTime', 'scheduledStart')
return max(dt, datetime.timedelta(seconds=0))
@property
def time_enqueued(self):
"""Time this job was enqueue (not scheduled for execution)
"""
return self._time_diff('startTime', 'stopTime')
dt = self._time_diff('scheduledStart', 'submissionTime')
return max(dt, datetime.timedelta(seconds=0))
@property
def time_to_start(self):
"""Time until the job starts
"""
dt = self._time_diff('startTime', timeutils.now())
return max(dt, datetime.timedelta(seconds=0))
@property
def stderr_file(self):
......@@ -198,7 +219,8 @@ class Job(object):
else:
return os.path.join(self.working_directory, filename)
def _std_read(self, filename):
@staticmethod
def _std_read(filename):
try:
with open(filename, mode='r') as f:
return f.read()
......@@ -272,18 +294,13 @@ class Job(object):
@property
def resource(self):
with self.fixed_stats():
initial_request = self['initial_request']
wanted_resources = self['wanted_resources']
properties = self['properties']
if initial_request:
cli = initial_request
cmd = self['initial_request']
if cmd:
properties = None
else:
cli = wanted_resources
res = oarresource.Resource.from_cli(cli)
for s in [wanted_resources, properties]:
if "gpu='YES'" in s:
res.gpu = True
return res
cmd = self['wanted_resources']
properties = self['properties']
return oarresource.Resource.from_cli(cmd, properties)
def __str__(self):
try:
......@@ -293,7 +310,7 @@ class Job(object):
for key in ['name', 'project', 'state', 'owner']:
lst.append('{} = {}'.format(key, stats[key]))
lst.append('runtime = {}'.format(self.runtime))
return 'Job({})\n '.format(self.jobid)+'\n '.join(lst)
return 'Job({})\n '.format(self.jobid) + '\n '.join(lst)
except RuntimeError:
return 'Job(non existing {})'.format(self.jobid)
......@@ -335,7 +352,8 @@ class Job(object):
kwargs['resource'] = self.resource
return JobFactory(**kwargs)
def _parse_stats(self, stats):
@staticmethod
def _parse_stats(stats):
# Dates are in local timezone
for timekey in ['startTime', 'stopTime', 'scheduledStart', 'submissionTime']:
value = stats.get(timekey, 0)
......@@ -348,17 +366,32 @@ class Job(object):
if value:
stats['walltime'] = datetime.timedelta(seconds=int(value))
def _time_diff(self, start, end):
stats = self.stats
start = stats[start]
if start:
end = stats[end]
if end:
return end-start
else:
return timeutils.now() - start
def _time_diff(self, end, start):
"""
:param end: None (now)
:type end: str or datetime or None
:param start: None (now)
:type start: str or datetime or None
:return timedelta:
"""
with self.fixed_stats():
return self._get_time(end) - self._get_time(start)
def _get_time(self, tm):
"""
:param tm: None (now)
:type tm: str or datetime or None
:return datetime:
"""
if isinstance(tm, datetime.datetime):
return tm
else:
return datetime.timedelta(seconds=0)
tm = self[tm]
if not tm:
tm = timeutils.now()
return tm
def _raise_if_error(self, err, code, msg):
if code:
......@@ -392,18 +425,41 @@ class JobFactory(object):
def __init__(self, command=None, resource=None, name=None, project=None,
working_directory=None, log_directory=None, log_base=None):
self._name = None
self._log_base = None
self.command = command
self.resource = resource
self.name = name
self.project = project
self.working_directory = working_directory
self.log_directory = log_directory
if log_base:
self.log_base = log_base
self.log_base = log_base
@property
def name(self):
return self._name
@name.setter
def name(self, value):
self._name = value
self._default_name()
@property
def log_base(self):
return self._log_base
@log_base.setter
def log_base(self, value):
if value:
self._log_base = value
else:
self.log_base = '%jobname%.%jobid%'
if not self.name and '%jobname%' in self.log_base:
self.name = 'OAR'
self._log_base = '%jobname%.%jobid%'
self._default_name()
def _default_name(self):
if not self.name and self.log_base:
if '%jobname%' in self.log_base:
self.name = 'OAR'
@property
def cli_arguments(self):
......@@ -466,8 +522,18 @@ class JobFactory(object):
return Job(jobid)
def search(name=None, project=None, owner=None, start=None, end=None,
state=None, **kwargs):
def search(name=None, project=None, owner=None, state=None,
start=None, end=None, **properties):
"""
:param str name:
:param str project:
:param str owner:
:param datetime start:
:param datetime end:
:param str state:
:return list: list of jobs
"""
# https://github.com/oar-team/oar/blob/2.5/sources/core/database/mysql_structure.sql
sqlquery = []
if name:
......@@ -483,8 +549,8 @@ def search(name=None, project=None, owner=None, start=None, end=None,
sqlquery.append("start_time>='{}'".format(start))
if end:
end = timeutils.totimestamp(end)
sqlquery.append("start_time<='{}'".format(end))
for k, (op, v) in kwargs.items():
sqlquery.append("stop_time<='{}'".format(end))
for k, (op, v) in properties.items():
sqlquery.append("{}{}'{}'".format(k, op, v))
if not sqlquery:
return []
......
......@@ -24,7 +24,6 @@
__authors__ = ["W. De Nolf"]
__license__ = "MIT"
import datetime
import re
from . import oarshell
......@@ -41,13 +40,20 @@ def str2walltime(s):
return datetime.timedelta(hours=int(h), minutes=int(m), seconds=int(s))
class Resource(object):
property_pattern = re.compile('(\w+)([>=<]+)\'?([\w\d]+)\'?')
# Includes the default ones (space around operator):
# property_pattern = re.compile('(\w+) *([>=<]+) *\'?([\w\d]+)\'?')
class Resource(object):
def __init__(self, host=None, nodes=None, cpu=None, core=None, gpu=False,
walltime=None, mem_core_mb=None):
walltime=None, mem_core_mb=None, **properties):
"""
Custom properties can be defined: for example cpu_vendor=('=',INTEL)"
:param int host: number of hosts to be used (Optional)
:param int host: number of nodes to be used (Optional)
:param int nodes: number of nodes to be used (Optional)
:param int cpu: number of cpu's per node (Optional)
:param int core: number of cores per cpu (Optional)
......@@ -58,13 +64,15 @@ class Resource(object):
:type walltime: timedelta or dict or num
:param int mem_core_mb: required memory per core in MB (Optional)
"""
self.host = host
if not nodes:
nodes = host
self.nodes = nodes
self.cpu = cpu
self.core = core
self.gpu = gpu
self.walltime = walltime
self.mem_core_mb = mem_core_mb
self.properties = properties
@property
def walltime(self):
......@@ -82,12 +90,12 @@ class Resource(object):
@property
def memory(self):
return self.cores*self.mem_core_mb
return self.cores * self.mem_core_mb
@property
def cores(self):
n = 1
for m in [self.host, self.nodes, self.cpu, self.core]:
for m in [self.nodes, self.cpu, self.core]:
if m:
n *= m
return n
......@@ -95,8 +103,6 @@ class Resource(object):
@property
def _cli_resource(self):
resources = []
if self.host:
resources.append('host={}'.format(self.host))
if self.nodes:
resources.append('nodes={}'.format(self.nodes))
if self.cpu:
......@@ -119,6 +125,8 @@ class Resource(object):
properties.append("gpu='YES'")
if self.mem_core_mb:
properties.append("mem_core_mb>={:d}".format(self.mem_core_mb))
for k, (op, v) in self.properties.items():
properties.append("{}{}'{}'".format(k, op, v))
cliarg = ' and '.join(properties)
return cliarg
......@@ -138,11 +146,10 @@ class Resource(object):
return oarshell.cli_args2str(*self.cli_arguments)
@classmethod
def from_cli(cls, cmd):
def from_cli(cls, cmd, properties=None):
kwargs = {}
cli_arguments = list(oarshell.cli_str2args(cmd))
arg = oarshell._cli_getarg(cli_arguments, '-l')
arg = oarshell.cli_getarg(cli_arguments, '-l')
if arg:
for key in ['host', 'nodes', 'cpu', 'core']:
match = re.search('{}=(\d+)'.format(key), arg)
......@@ -151,15 +158,18 @@ class Resource(object):
match = re.search('walltime=(\d+:\d+:\d+)', arg)
if match:
kwargs['walltime'] = str2walltime(match.groups()[0])
arg = oarshell._cli_getarg(cli_arguments, '-p')
arg = oarshell.cli_getarg(cli_arguments, '-p')
if not arg and properties:
arg = properties
if arg:
match = re.search('gpu=\'?YES', arg)
if match:
kwargs['gpu'] = True
match = re.search('mem_core_mb[>=<]+(\d+)', arg)
if match:
kwargs['mem_core_mb'] = int(match.groups()[0])
for match in property_pattern.finditer(arg):
k, op, v = match.groups()
if k == 'gpu':
kwargs['gpu'] = v == 'YES'
elif k == 'mem_core_mb':
kwargs['mem_core_mb'] = int(v)
else:
kwargs[k] = op, v
return cls(**kwargs)
def __eq__(self, other):
......
......@@ -71,7 +71,7 @@ def oarinstalled():
def executejson(cmd, *args):
"""
:param int jobid:
:param str cmd:
:return tuple: out(dict), err(str or None), exitcode(int)
"""
out, err, code = execute(cmd, '-J', *args)
......@@ -91,7 +91,6 @@ def executejson(cmd, *args):
def oarstat(*args):
"""
:param int jobid:
:return tuple: out(dict), err(str or None), exitcode(int)
"""
out, err, code = execute('oarstat', '-J', *args)
......@@ -120,7 +119,6 @@ def oarjobstat(jobid, *args):
def oarsub(*args):
"""
:param int jobid:
:return tuple: out(str or None), err(str or None), exitcode(int)
"""
out, err, code = execute('oarsub', '-J', *args)
......@@ -194,7 +192,7 @@ def cli_str2args(cmd):
return tuple(shlex.split(cmd))
def _cli_getarg(cli_arguments, flag):
def cli_getarg(cli_arguments, flag):
if flag in cli_arguments:
i = cli_arguments.index(flag)
ret = cli_arguments[i+1]
......
......@@ -26,6 +26,7 @@ from . import test_oarshell
from . import test_oarresource
from . import test_oarjob
def test_suite():
"""Test suite including all test suites"""
testSuite = unittest.TestSuite()
......@@ -33,7 +34,8 @@ def test_suite():
testSuite.addTest(test_oarresource.test_suite())
testSuite.addTest(test_oarjob.test_suite())
return testSuite
if __name__ == '__main__':
import sys
......
......@@ -32,14 +32,14 @@ from .. import oarjob
from ..oarresource import Resource
from ..oarshell import oarinstalled
class test_oarjob(unittest.TestCase):
class test_oarjob(unittest.TestCase):
def setUp(self):
self.working_directory = tempfile.mkdtemp(dir=os.getcwd())
def tearDown(self):
shutil.rmtree(self.working_directory)
def skipoar(self):
if not oarinstalled():
raise unittest.SkipTest("oar cli tools not available")
......@@ -54,31 +54,31 @@ class test_oarjob(unittest.TestCase):
self.assertFalse(job.is_waiting)
self.assertFalse(job.is_intermediate)
self.assertFalse(job.needsresume)
self.assertEqual(job['state'],job.status)
self.assertEqual(job['state'], job.status)
def test_jobfactory(self):
self.skipoar()
self.assertTrue(oarjob.JobFactory())
self.assertTrue(oarjob.JobFactory(command='ls -all'))
res = Resource(nodes=1, cpu=2, core=2, gpu=True, mem_core_mb=8000)
info = oarjob.JobFactory(resource=res, command='ls -all', log_base='test')
cli = "-O test.stdout -E test.stderr -l nodes=1/cpu=2/core=2 -p \"gpu='YES' and mem_core_mb>=8000\" \"ls -all\""
self.assertEqual(cli, info.cli_string)
def definition(self, seconds, name):
command = 'python -c "from time import sleep\nfor i in range({}):\n print(i)\n sleep(1)"'
resource = Resource(core=1,walltime={'seconds':seconds*3})
resource = Resource(core=1, walltime={'seconds': seconds * 3})
jobdef = oarjob.JobFactory(name=name, project='oarpy',
resource=resource, command=command.format(seconds),
working_directory=self.working_directory)
output = list(range(seconds))
return jobdef,output
return jobdef, output
def _check_output(self, job, expected):
output = [int(i) for i in job.stdout.split('\n') if i]
self.assertEqual(output, expected)
def test_immediate(self):
self.skipoar()
jobdef, expected = self.definition(5, 'immediate')
......@@ -87,7 +87,7 @@ class test_oarjob(unittest.TestCase):
job.wait(silent=True)
self.assertEqual(job.exit_code, 0)
self._check_output(job, expected)
def test_notimmediate(self):
self.skipoar()
jobdef, expected = self.definition(5, 'notimmediate')
......@@ -97,29 +97,30 @@ class test_oarjob(unittest.TestCase):
job.wait(silent=True)
self.assertEqual(job.exit_code, 0)
self._check_output(job, expected)
def test_interrupt(self):
self.skipoar()
jobdef,expected = self.definition(60, 'interrupt')
jobdef, expected = self.definition(60, 'interrupt')
job = jobdef.submit()
job.wait(states=('Running', 'Terminated', 'Error'), silent=True)
sleep(5)
job.interrupt()
job.wait(silent=True)
self.assertEqual(job.exit_code,None)
self.assertEqual(job.exit_code, None)
def test_suspend(self):
self.skipoar()
jobdef, expected = self.definition(60, 'interrupt')
job = jobdef.submit()
job.wait(states=('Running', 'Terminated', 'Error'), silent=True)
self.assertRaises(RuntimeError, job.suspend)
#job.wait_needsresume(states=('Hold', 'Suspended'), silent=True)
#job.resume()
#job.wait(silent=True)
#self.assertEqual(job.exit_code,0)
#self._check_output(job,expected)
# job.wait(states='Hold', silent=True)
# job.resume()
# job.wait(silent=True)
# self.assertEqual(job.exit_code,0)
# self._check_output(job,expected)
def test_suite():
"""Test suite including all test suites"""
testSuite = unittest.TestSuite()
......@@ -130,7 +131,8 @@ def test_suite():
testSuite.addTest(test_oarjob("test_interrupt"))
testSuite.addTest(test_oarjob("test_suspend"))
return testSuite
if __name__ == '__main__':
import sys
......