Commit c15e5ecf authored by Wout De Nolf's avatar Wout De Nolf

[oarjob] get definition from job

parent c0cf3eb7
......@@ -3,6 +3,7 @@ stages:
.test:
script:
- python -m pip install -r requirements.txt --user
- python -m oarpy.tests.test_all
test-2.7:
......
......@@ -20,6 +20,7 @@ Hello world (minimal):
print('Interrupted:\n{}'.format(job.stdout))
else:
print('Succes:\n{}'.format(job.stdout))
job.remove_logs()
Hello world (specify resources and postpone execution):
......@@ -44,8 +45,9 @@ Hello world (specify resources and postpone execution):
print('Interrupted:\n{}'.format(job.stdout))
else:
print('Succes:\n{}'.format(job.stdout))
Status and owners of all jobs started in the last 5 minutes:
job.remove_logs()
Find all jobs started in the last 5 minutes:
.. code-block:: python
......@@ -57,13 +59,3 @@ Status and owners of all jobs started in the last 5 minutes:
print(set([(job['owner'],job.status) for job in jobs]))
print('Job statistics: {}'.format(jobs[0].stats.keys()))
Remove the log files of all finished jobs of a particular user and project:
.. code-block:: python
from oarpy import oarjob
jobs = oarjob.search(owner='rniceuser',project='oarpy',state='Terminated')
for job in jobs:
job.remove_logs()
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Hello world (minimal):"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from oarpy import oarjob\n",
"\n",
"jobdef = oarjob.JobDefinition(command='echo \"Hello word\"')\n",
"job = jobdef.submit()\n",
"job.wait_finished()\n",
"\n",
"if job.exit_code:\n",
" print('Failed:\\n{}'.format(job.stderr))\n",
"elif job.exit_code is None:\n",
" print('Interrupted:\\n{}'.format(job.stdout))\n",
"else:\n",
" print('Succes:\\n{}'.format(job.stdout))\n",
"job.remove_logs()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Hello world (specify resources and postpone execution):"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from oarpy import oarjob\n",
"from oarpy.oarresource import Resource\n",
"\n",
"resource = Resource(core=1,walltime={'hours':1},gpu=False)\n",
"jobdef = oarjob.JobDefinition(name='helloworld',project='oarpy',\n",
" command='echo \"Hello word\"',resource=resource)\n",
"\n",
"job = jobdef.submit(hold=True)\n",
"job.wait_needsresume()\n",
"# job is waiting for you to resume it\n",
"job.resume()\n",
"job.wait_finished()\n",
"\n",
"if job.exit_code:\n",
" print('Failed:\\n{}'.format(job.stderr))\n",
"elif job.exit_code is None:\n",
" print('Interrupted:\\n{}'.format(job.stdout))\n",
"else:\n",
" print('Succes:\\n{}'.format(job.stdout))\n",
"job.remove_logs()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Find all jobs started in the last 5 minutes:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from oarpy import oarjob\n",
"from oarpy import timeutils\n",
"\n",
"start = timeutils.add(timeutils.now(),minutes=-5)\n",
"jobs = oarjob.search(start=start)\n",
"print(jobs)\n",
"print(set([(job['owner'],job.status) for job in jobs]))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Retrieve job definition (can be used to resubmit a job):"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Initial request:\n",
" \n",
"Wanted resources:\n",
" -l \"{(gpu='YES') AND type = 'default'}/host=1,walltime=24:0:0\" \n",
"Properties:\n",
" ((((desktop_computing = 'NO') AND cluster = 'NICE') AND opsys = 'debian8') AND interactive = 'MIXED') AND drain='NO'\n",
"Job resource:\n",
" -l walltime=24:00:00 -p gpu='YES'\n",
"Job definition:\n",
" -n sample_031_tero --project pyhst2 -d /data/visitor/ma3931/id15/data/absct/sample_031_tero -O OAR.%jobname%.%jobid%.stdout -E OAR.%jobname%.%jobid%.stderr -l walltime=24:00:00 -p gpu='YES' \"./sample_031_tero.oar sample_031_tero0001.par\"\n"
]
}
],
"source": [
"from oarpy import oarjob\n",
"\n",
"job = oarjob.Job(110327)\n",
"jobdef = job.definition\n",
"print('Initial request:\\n {}'.format(job.stats['initial_request']))\n",
"print('Wanted resources:\\n {}'.format(job.stats['wanted_resources']))\n",
"print('Properties:\\n {}'.format(job.stats['properties']))\n",
"print('Job resource:\\n {}'.format(jobdef.resource))\n",
"print('Job definition:\\n {}'.format(jobdef))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Remove the log files of all finished jobs of a particular user and project:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from oarpy import oarjob\n",
"jobs = oarjob.search(owner='testuser',project='oarpy',state='Terminated')\n",
"for job in jobs:\n",
" job.remove_logs()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.9"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
......@@ -151,31 +151,48 @@ class Job(object):
@property
def stderr_file(self):
return self['stderr_file']
return self._std_file('stderr_file')
@property
def stdout_file(self):
return self['stdout_file']
return self._std_file('stdout_file')
@property
def stderr(self):
with open(self.stderr_file,mode='r') as f:
return f.read()
return self._std_read(self.stderr_file)
@property
def stdout(self):
with open(self.stdout_file,mode='r') as f:
return f.read()
return self._std_read(self.stdout_file)
def _std_file(self,filename):
with self.fixed_stats():
filename = self[filename]
if os.path.dirname(filename):
return filename
else:
return os.path.join(self.working_directory,filename)
def _std_read(self,filename):
try:
with open(filename,mode='r') as f:
return f.read()
except IOError as e:
if e.errno == errno.ENOENT:
return None
else:
raise e
def remove_logs(self):
for f in [self.stderr_file,self.stdout_file]:
try:
os.remove(f)
except OSError as e:
if e.errno == errno.ENOENT:
pass
else:
raise e
with self.fixed_stats():
for f in [self.stderr_file,self.stdout_file]:
try:
os.remove(f)
except OSError as e:
if e.errno == errno.ENOENT:
pass
else:
raise e
@property
def exit_code(self):
......@@ -186,6 +203,62 @@ class Job(object):
"""
return self['exit_code']
@property
def name(self):
return self['name']
@property
def project(self):
return self['project']
@property
def command(self):
return self['command']
@property
def working_directory(self):
return self['launchingDirectory']
@property
def log_directory(self):
with self.fixed_stats():
path = os.path.dirname(self.stdout_file)
if path:
return path
else:
return self.working_directory
@property
def log_base(self):
with self.fixed_stats():
base = os.path.basename(self.stdout_file)
base = base.split('.')
if base[-1]=='stdout':
base.pop()
name = self.name
base = ['%jobname%' if s==name else s
for s in base if s]
name = str(self.jobid)
base = ['%jobid%' if s==name else s
for s in base]
return '.'.join(base)
@property
def resource(self):
with self.fixed_stats():
initial_request = self['initial_request']
wanted_resources = self['wanted_resources']
properties = self['properties']
if initial_request:
cli = initial_request
else:
cli = wanted_resources
res = oarresource.Resource.from_cli(cli)
for s in [wanted_resources,properties]:
if "gpu='YES'" in s:
res.gpu = True
return res
def __str__(self):
try:
with self.fixed_stats():
......@@ -219,6 +292,20 @@ class Job(object):
stop = interrupt
@property
def definition(self):
with self.fixed_stats():
kwargs = {}
kwargs['command'] = self.command
kwargs['name'] = self.name
kwargs['project'] = self.project
kwargs['working_directory'] = self.working_directory
if self.log_directory!=self.working_directory:
kwargs['log_directory'] = self.log_directory
kwargs['log_base'] = self.log_base
kwargs['resource'] = self.resource
return JobDefinition(**kwargs)
def _parse_stats(self,stats):
#Dates are in local timezone
for timekey in ['startTime','stopTime','scheduledStart','submissionTime']:
......@@ -267,6 +354,8 @@ class JobDefinition(object):
self.log_base = log_base
else:
self.log_base = '%jobname%.%jobid%'
if not self.name and '%jobname%' in self.log_base:
self.name = 'OAR'
@property
def cli_arguments(self):
......
......@@ -25,6 +25,15 @@ import datetime
import re
from . import oarshell
def walltime2str(tdelta):
h, rem = divmod(int(round(tdelta.total_seconds())), 3600)
m, s = divmod(rem, 60)
return '{:02d}:{:02d}:{:02d}'.format(h,m,s)
def str2walltime(s):
h,m,s = s.split(':')
return datetime.timedelta(hours=int(h), minutes=int(m), seconds=int(s))
class Resource(object):
def __init__(self,nodes=None,cpu=None,core=None,gpu=False,
......@@ -83,7 +92,11 @@ class Resource(object):
resources.append('core={}'.format(self.core))
cliarg = '/'.join(resources)
if self.walltime:
cliarg = '{},walltime={}'.format(cliarg,self.walltime)
walltime = walltime2str(self.walltime)
if cliarg:
cliarg = '{},walltime={}'.format(cliarg,walltime)
else:
cliarg = 'walltime={}'.format(walltime)
return cliarg
@property
......@@ -122,7 +135,9 @@ class Resource(object):
match = re.search('{}=(\d+)'.format(key),arg)
if match:
kwargs[key] = match.groups()[0]
match = re.search('walltime=(\d+:\d+:\d+)',arg)
if match:
kwargs['walltime'] = str2walltime(match.groups()[0])
arg = oarshell._cli_getarg(cli_arguments,'-p')
if arg:
match = re.search('gpu=\'?YES',arg)
......
......@@ -23,8 +23,10 @@
# THE SOFTWARE.
import unittest
import errno
import os
import tempfile
from time import sleep
import shutil
from .. import oarjob
from ..oarresource import Resource
......@@ -32,6 +34,12 @@ from ..oarshell import oarinstalled
class test_oarjob(unittest.TestCase):
def setUp(self):
self.working_directory = tempfile.mkdtemp(dir=os.getcwd())
def tearDown(self):
shutil.rmtree(self.working_directory)
def skipoar(self):
if not oarinstalled():
raise unittest.SkipTest("oar cli tools not available")
......@@ -63,7 +71,7 @@ class test_oarjob(unittest.TestCase):
resource = Resource(core=1,walltime={'seconds':seconds*3})
jobdef = oarjob.JobDefinition(name=name,project='oarpy',
resource=resource,command=command.format(seconds),
log_directory='~/.local/oarpy')
working_directory=self.working_directory)
output = list(range(seconds))
return jobdef,output
......@@ -75,6 +83,7 @@ class test_oarjob(unittest.TestCase):
self.skipoar()
jobdef,expected = self.definition(5,'immediate')
job = jobdef.submit()
self.assertEqual(job.definition,jobdef)
job.wait_finished(silent=True)
self.assertEqual(job.exit_code,0)
self._check_output(job,expected)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment