Commit 342db144 authored by Wout De Nolf's avatar Wout De Nolf

[oarjob] start, stop, status

parent 526bb3c7
This diff is collapsed.
This diff is collapsed.
......@@ -22,11 +22,13 @@
# THE SOFTWARE.
import datetime
import re
from . import oarshell
class Resource(object):
def __init__(self,nodes=None,cpu=None,core=None,gpu=False,
walltime=None,memory=None):
walltime=None,mem_core_mb=None):
"""
Args:
nodes(Optional(int)): number of nodes to be used
......@@ -34,14 +36,15 @@ class Resource(object):
core(Optional(int)): number of cores per cpu
gpu(Optional(bool)): need a gpu or not
walltime(Optional(timedelta|dict|num)): hours when a number
memory(Optional(int)): required memory per core in MB
dict keys: days, seconds, microseconds, milliseconds, minutes, hours, weeks
mem_core_mb(Optional(int)): required memory per core in MB
"""
self.nodes = nodes
self.cpu = cpu
self.core = core
self.gpu = gpu
self.walltime = walltime
self.memory = memory
self.mem_core_mb = mem_core_mb
@property
def walltime(self):
......@@ -61,10 +64,6 @@ class Resource(object):
def memory(self):
return self.cores*self.mem_core_mb
@memory.setter
def memory(self,value):
self.mem_core_mb = value
@property
def cores(self):
n = 1
......@@ -84,7 +83,7 @@ class Resource(object):
resources.append('core={}'.format(self.core))
cliarg = '/'.join(resources)
if self.walltime:
cliarg = '{},walltime={}'.format(clsarg,self.walltime)
cliarg = '{},walltime={}'.format(cliarg,self.walltime)
return cliarg
@property
......@@ -105,11 +104,44 @@ class Resource(object):
args += ['-l',add]
add = self._cli_properties
if add:
args += ['-p','"{}"'.format(add)]
args += ['-p',add]
return args
@property
def cli_string(self):
return oarshell.cli_args2str(*self.cli_arguments)
@classmethod
def from_cli(cls,cmd):
kwargs = {}
cli_arguments = list(oarshell.cli_str2args(cmd))
arg = oarshell._cli_getarg(cli_arguments,'-l')
if arg:
for key in ['nodes','cpu','core']:
match = re.search('{}=(\d+)'.format(key),arg)
if match:
kwargs[key] = match.groups()[0]
arg = oarshell._cli_getarg(cli_arguments,'-p')
if arg:
match = re.search('gpu=\'?YES',arg)
if match:
kwargs['gpu'] = True
match = re.search('mem_core_mb[>=<]+(\d+)',arg)
if match:
kwargs['mem_core_mb'] = int(match.groups()[0])
return cls(**kwargs)
def __eq__(self,other):
return self.cli_arguments==other.cli_arguments
def __ne__(self,other):
return self.cli_arguments!=other.cli_arguments
def __str__(self):
return ' '.join(self.cli_arguments)
return self.cli_string
def __nonzero__(self):
return self.__bool__()
......
......@@ -25,6 +25,7 @@ import os
import subprocess
import json
import logging
import shlex
logger = logging.getLogger(__name__)
......@@ -32,6 +33,8 @@ def _execute(args,**kwargs):
logger.debug('Subprocess: {}'.format(' '.join(args)))
proc = subprocess.Popen(args, **kwargs)
out,err = proc.communicate()
logger.debug('stdout: {}'.format(out))
logger.debug('stderr: {}'.format(err))
return out,err,proc.returncode
def installed(*args):
......@@ -47,8 +50,7 @@ def execute(*args):
try:
return _execute(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except OSError as e:
if e.errno == os.errno.ENOENT:
return None,None
return None,None,e.errno
def oarinstalled():
return installed('oarsub', '--version')
......@@ -65,9 +67,13 @@ def executejson(cmd,*args):
"""
out,err,code = execute(cmd,'-J',*args)
if out:
start = out.index('{')
stop = out.index('}')
out = json.loads(out[start:stop+1])
try:
start = out.index('{')
stop = out.index('}')
except ValueError:
out = {}
else:
out = json.loads(out[start:stop+1])
else:
out = {}
return out,err,code
......@@ -120,9 +126,13 @@ def oarsub(*args):
"""
out,err,code = execute('oarsub','-J',*args)
if out:
start = out.index('{')
stop = out.index('}')
out = json.loads(out[start:stop+1]).get('job_id',0)
try:
start = out.index('{')
stop = out.index('}')
except ValueError:
out = 0
else:
out = json.loads(out[start:stop+1]).get('job_id',0)
else:
out = 0
return out,err,code
......@@ -131,7 +141,7 @@ def jobstatus(jobid):
"""
None: means job is not registered
Hold: not scheduled (needs to be resumed)
Waiting: scheduled
Waiting: scheduled for execution
Suspended: running process is suspended (needs to be resumed)
Launching: process is starting
Running: process is running
......@@ -158,7 +168,7 @@ def jobstats(jobid):
jobid(int)
Returns:
out(dict):
out(str|None):
err(str|None)
exitcode(int)
"""
......@@ -166,3 +176,34 @@ def jobstats(jobid):
if not isinstance(out,dict):
out = {}
return out,err,code
def jobresume(jobid):
return execute('oarresume',str(jobid))
def jobhold(jobid):
# Not permissions to hold running job with the -r option
return execute('oarhold',str(jobid))
def jobdel(jobid,signal=None):
args = []
if signal:
args = ['-s',signal]
return execute('oardel',str(jobid),*args)
def cli_args2str(*cli_arguments):
return subprocess.list2cmdline(cli_arguments)
def cli_str2args(cmd):
return tuple(shlex.split(cmd))
def _cli_getarg(cli_arguments,flag):
if flag in cli_arguments:
i = cli_arguments.index(flag)
ret = cli_arguments[i+1]
if ret.startswith('-'):
return ''
else:
return ret
else:
return ''
......@@ -22,14 +22,14 @@
# THE SOFTWARE.
import unittest
from . import test_shell
from . import test_oarshell
from . import test_oarresource
from . import test_oarjob
def test_suite():
"""Test suite including all test suites"""
testSuite = unittest.TestSuite()
testSuite.addTest(test_shell.test_suite())
testSuite.addTest(test_oarshell.test_suite())
testSuite.addTest(test_oarresource.test_suite())
testSuite.addTest(test_oarjob.test_suite())
return testSuite
......
......@@ -25,52 +25,101 @@
import unittest
import errno
from time import sleep
from .. import oarresource
from .. import oarjob
from pprint import pprint
import logging
logging.basicConfig(level=logging.DEBUG)
from .. import oarjob
from ..oarresource import Resource
from ..oarshell import oarinstalled
class test_oarjob(unittest.TestCase):
def setUp(self):
res = oarresource.Resource(core=1)
command = 'for i in $(seq 1 5);do sleep 1; echo line ${i}; done'
self.description = oarjob.JobDescription(name=self.__class__.__name__,project='oarpy',
command=command,resource=res)
def skipoar(self):
if not oarinstalled():
raise unittest.SkipTest("oar cli tools not available")
def test_jobdescription(self):
self.assertFalse(oarjob.JobDescription())
self.assertTrue(oarjob.JobDescription(command='ls'))
res = oarresource.Resource(nodes=1,cpu=2,core=2,gpu=True,memory=8000)
info = oarjob.JobDescription(resource=res,command='ls')
cli = "-l nodes=1/cpu=2/core=2 -p \"gpu='YES' and mem_core_mb>=8000\" ls"
self.assertEqual(cli,str(info))
def test_status(self):
job = oarjob.Job(description=self.description)
self.assertFalse(job.exists)
job = oarjob.Job(jobid=1)
self.assertTrue(job.exists)
def test_launch(self):
job = oarjob.Job(description=self.description)
#job.launch()
self.skipoar()
job = oarjob.Job(1)
with job.fixed_stats():
self.assertTrue(job.exists)
self.assertTrue(job.finished)
self.assertFalse(job.running)
self.assertFalse(job.waiting)
self.assertFalse(job.intermediate)
self.assertFalse(job.needsresume)
self.assertEqual(job['state'],job.status)
def test_jobdefinition(self):
self.skipoar()
self.assertTrue(oarjob.JobDefinition())
self.assertTrue(oarjob.JobDefinition(command='ls -all'))
#while job.finished:
# print job.duration,job.runtime
# pprint(job.stats)
# sleep(1)
res = Resource(nodes=1,cpu=2,core=2,gpu=True,mem_core_mb=8000)
info = oarjob.JobDefinition(resource=res,command='ls -all',log_base='test')
cli = "-O test.stdout -E test.stderr -l nodes=1/cpu=2/core=2 -p \"gpu='YES' and mem_core_mb>=8000\" \"ls -all\""
self.assertEqual(cli,info.cli_string)
def definition(self,seconds,name):
command = 'python -c "from time import sleep\nfor i in range({}):\n print(i)\n sleep(1)"'
resource = Resource(core=1,walltime={'seconds':seconds*3})
jobdef = oarjob.JobDefinition(name=name,project='oarpy',
resource=resource,command=command.format(seconds),
log_directory='~/.local/oarpy')
output = range(seconds)
return jobdef,output
def _check_output(self,job,expected):
output = [int(i) for i in job.stdout.split('\n') if i]
self.assertEqual(output,expected)
def test_immediate(self):
self.skipoar()
jobdef,expected = self.definition(5,'immediate')
job = jobdef.submit()
job.wait_finished(silent=True)
self.assertEqual(job.exit_code,0)
self._check_output(job,expected)
def test_notimmediate(self):
self.skipoar()
jobdef,expected = self.definition(5,'notimmediate')
job = jobdef.submit(hold=True)
job.wait_needsresume(silent=True)
job.resume()
job.wait_finished(silent=True)
self.assertEqual(job.exit_code,0)
self._check_output(job,expected)
def test_interrupt(self):
self.skipoar()
jobdef,expected = self.definition(60,'interrupt')
job = jobdef.submit()
job.wait_started(silent=True)
sleep(5)
job.interrupt()
job.wait_finishedsilent=True()
self.assertEqual(job.exit_code,None)
def test_suspend(self):
self.skipoar()
jobdef,expected = self.definition(60,'interrupt')
job = jobdef.submit()
job.wait_started(silent=True)
self.assertRaises(RuntimeError, job.suspend)
#job.wait_needsresume(silent=True)
#job.resume()
#job.wait_finished(silent=True)
#self.assertEqual(job.exit_code,0)
#self._check_output(job,expected)
def test_suite():
"""Test suite including all test suites"""
testSuite = unittest.TestSuite()
testSuite.addTest(test_oarjob("test_status"))
testSuite.addTest(test_oarjob("test_jobdescription"))
testSuite.addTest(test_oarjob("test_launch"))
testSuite.addTest(test_oarjob("test_jobdefinition"))
testSuite.addTest(test_oarjob("test_immediate"))
testSuite.addTest(test_oarjob("test_notimmediate"))
testSuite.addTest(test_oarjob("test_interrupt"))
testSuite.addTest(test_oarjob("test_suspend"))
return testSuite
if __name__ == '__main__':
......
......@@ -23,27 +23,41 @@
# THE SOFTWARE.
import unittest
from .. import oarresource
from ..oarresource import Resource
class test_oarresource(unittest.TestCase):
def test_resource(self):
self.assertFalse(oarresource.Resource())
self.assertTrue(oarresource.Resource(core=1))
self.assertEqual(oarresource.Resource(nodes=1).cores,1)
self.assertEqual(oarresource.Resource(cpu=1).cores,1)
self.assertEqual(oarresource.Resource(core=1).cores,1)
res = oarresource.Resource(nodes=1,cpu=2,core=2,gpu=True,memory=8000)
cli = "-l nodes=1/cpu=2/core=2 -p \"gpu='YES' and mem_core_mb>=8000\""
self.assertEqual(cli,str(res))
def test_cores(self):
self.assertFalse(Resource())
self.assertTrue(Resource(core=1))
self.assertEqual(Resource(nodes=1).cores,1)
self.assertEqual(Resource(cpu=1).cores,1)
self.assertEqual(Resource(core=1).cores,1)
def test_to_cli(self):
res = Resource(nodes=1,cpu=2,core=2,gpu=True,mem_core_mb=8000)
cli = '-l nodes=1/cpu=2/core=2 -p "gpu=\'YES\' and mem_core_mb>=8000"'
self.assertEqual(cli,res.cli_string)
self.assertEqual(res.cores,4)
def test_from_cli(self):
res = Resource()
self.assertEqual(res,Resource.from_cli(res.cli_string))
res = Resource(core=10)
self.assertEqual(res,Resource.from_cli(res.cli_string))
res = Resource(nodes=1,cpu=2,core=2,gpu=True)
self.assertEqual(res,Resource.from_cli(res.cli_string))
res = Resource(nodes=1,cpu=2,core=2,mem_core_mb=8000)
self.assertEqual(res,Resource.from_cli(res.cli_string))
res = Resource(nodes=1,cpu=2,core=2,gpu=True,mem_core_mb=8000)
self.assertEqual(res,Resource.from_cli(res.cli_string))
def test_suite():
"""Test suite including all test suites"""
testSuite = unittest.TestSuite()
testSuite.addTest(test_oarresource("test_resource"))
testSuite.addTest(test_oarresource("test_cores"))
testSuite.addTest(test_oarresource("test_to_cli"))
testSuite.addTest(test_oarresource("test_from_cli"))
return testSuite
if __name__ == '__main__':
......
# -*- coding: utf-8 -*-
#
# Copyright (C) 2018 European Synchrotron Radiation Facility, Grenoble, France
#
# Principal author: Wout De Nolf (wout.de_nolf@esrf.eu)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import unittest
from .. import oarshell
class test_utils(unittest.TestCase):
def skipoar(self):
if not oarshell.oarinstalled():
raise unittest.SkipTest("oar cli tools not available")
def test_installed(self):
oarshell.oarinstalled()
def test_oarstat(self):
self.skipoar()
out,err,code = oarshell.jobstats(1)
self.assertTrue(isinstance(out,dict))
def test_cli(self):
args = '-p',"gpu=YES"
cmd1 = oarshell.cli_args2str(*args)
cmd2 = "-p gpu=YES"
self.assertEqual(cmd1,cmd2)
self.assertEqual(args,oarshell.cli_str2args(cmd2))
args = '-p',"gpu='YES'"
cmd1 = oarshell.cli_args2str(*args)
cmd2 = "-p gpu='YES'"
self.assertEqual(cmd1,cmd2)
self.assertEqual(('-p',"gpu=YES"),oarshell.cli_str2args(cmd1))
args = '-p',"gpu='YES' and mem_core_mb>=8000"
cmd1 = oarshell.cli_args2str(*args)
cmd2 = '-p "gpu=\'YES\' and mem_core_mb>=8000"'
self.assertEqual(cmd1,cmd2)
self.assertEqual(args,oarshell.cli_str2args(cmd2))
def test_suite():
"""Test suite including all test suites"""
testSuite = unittest.TestSuite()
testSuite.addTest(test_utils("test_installed"))
testSuite.addTest(test_utils("test_oarstat"))
testSuite.addTest(test_utils("test_cli"))
return testSuite
if __name__ == '__main__':
import sys
mysuite = test_suite()
runner = unittest.TextTestRunner()
if not runner.run(mysuite).wasSuccessful():
sys.exit(1)
......@@ -2,7 +2,6 @@
#
# Copyright (C) 2018 European Synchrotron Radiation Facility, Grenoble, France
#
# Principal author: Wout De Nolf (wout.de_nolf@esrf.eu)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
......@@ -22,28 +21,11 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import unittest
from .. import shell
from setuptools import setup
class test_utils(unittest.TestCase):
def test_installed(self):
shell.oarinstalled()
def test_oarstat(self):
shell.jobstats(1)
def test_suite():
"""Test suite including all test suites"""
testSuite = unittest.TestSuite()
testSuite.addTest(test_utils("test_installed"))
testSuite.addTest(test_utils("test_oarstat"))
return testSuite
if __name__ == '__main__':
import sys
mysuite = test_suite()
runner = unittest.TextTestRunner()
if not runner.run(mysuite).wasSuccessful():
sys.exit(1)
setup(name='oarpy',
version='0.0.1',
description='OAR job management in python',
url='https://gitlab.esrf.fr/denolf/oarpy',
license='MIT',
packages=['oarpy'])
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment