Skip to content
Snippets Groups Projects
Commit cc19bee6 authored by Nicola Vigano's avatar Nicola Vigano
Browse files

OAR/Python: added initial OAR API python modules and some utilities

parent 38d7f284
No related branches found
No related tags found
No related merge requests found
import oar_api
import oar_utils
#Embedded file name: oar_api.py
"""
Created on May 22, 2013
@author: ben
"""
import subprocess
import getpass
from yaml import load, load_all
try:
from yaml import CLoader as Loader
except ImportError as exc:
print exc
print("No C implementation of the YAML parser, using pure Python")
from yaml import Loader
try:
import requests
from requests.exceptions import ConnectionError, HTTPError
from urllib2 import URLError
can_use_rest_api = True
except ImportError as exc:
print(exc)
print("No 'requests' module, using Command line backend")
can_use_rest_api = False
class OarAPI(object):
def __init__(self, api_type = "RestAPI", auth = None):
url = "http://lin-radeb01.esrf.fr/oarapi/jobs.yaml"
can_use_rest_api = OarBackendRestAPI.checkRestAPI(url)
if (can_use_rest_api is False) and (api_type == "RestAPI"):
api_type = "CmdAPI"
if api_type == "RestAPI":
if auth is None:
username = getpass.getuser()
print("Username: %s" % username)
password = getpass.getpass()
auth = requests.auth.HTTPBasicAuth(username=username, \
password=password)
self.backend = OarBackendRestAPI(auth)
elif api_type == "CmdAPI":
self.backend = OarBackendCmdAPI()
else:
raise ValueError("Unknown API: %s" % api_type)
def query(self, jobs = [], array = None, full_details = True, username = ""):
"""
records = query(jobs = [], array = None, full_details = True, username = "")
"""
result = {}
if len(jobs) > 0:
output = self.backend.query(jobs, None, full_details, username)
output = load(output, Loader=Loader)
result.update(output)
if array is not None:
output = self.backend.query([], array, full_details, username)
output = load(output, Loader=Loader)
result.update(output)
if len(jobs) == 0 and array is None:
output = self.backend.query([], None, full_details, username)
output = load(output, Loader=Loader)
result.update(output)
return result
def delete(self, jobs = [], array = None):
"""
delete(jobs = [], array = None)
"""
if len(jobs) > 0 and array is not None:
raise ValueError('OAR: it is only possible to work exclusively on jobs or array')
elif len(jobs) == 0 and array is None:
raise ValueError('OAR: no jobs or array specified')
else:
self.backend.delete(jobs, array)
def submit(self, command_path):
"""
job_ids = submit(command_path)
"""
output = self.backend.submit(command_path)
output = load_all(output, Loader=Loader)
job_ids = [ elem['job_id'] for elem in output ]
return {'array_id': job_ids[0], 'job_ids': job_ids}
class OarBackendAPI(object):
def query(self, jobs, array, full_details, username):
raise NotImplementedError('Use one of the derived classes!')
def delete(self, jobs, array):
raise NotImplementedError('Use one of the derived classes!')
def submit(self, command_path):
raise NotImplementedError('Use one of the derived classes!')
class OarBackendRestAPI(OarBackendAPI):
headers = {'content-type': 'text/yaml'}
@staticmethod
def checkRestAPI(url):
try:
r = requests.get(url, headers=OarBackendRestAPI.headers)
r.raise_for_status()
content = r.headers['content-type'].split("; ")
if content[0] != OarBackendRestAPI.headers['content-type']:
raise HTTPError("Wrong content type returned from the server")
return True
except (ConnectionError, HTTPError, URLError) as exc:
print(exc)
return False
def __init__(self, auth, base_url = "http://lin-radeb01.esrf.fr"):
self.base_url = base_url
self.auth = auth
def query(self, jobs, array, full_details, username):
params = {"structure" : "oar"}
if len(jobs) > 0:
full_text = [];
for job in jobs:
full_url = "/".join([self.base_url, "oarapi", "jobs", "%d.yaml" % job])
r = requests.get(full_url, params=params, headers=self.headers, auth=self.auth)
r.raise_for_status()
full_text.append(r.text)
return "\n".join(full_text)
else:
if array is not None:
params.update({"array" : array})
if full_details is True:
full_url = "/".join([self.base_url, "oarapi", "jobs", "details.yaml"])
else:
full_url = "/".join([self.base_url, "oarapi", "jobs.yaml"])
if username != "":
params.update({"user" : username})
r = requests.get(full_url, params=params, headers=self.headers, auth=self.auth)
r.raise_for_status()
return r.text
def delete(self, jobs, array):
if array is not None:
raise ValueError("Deletion of arrays is not allowed in REST API")
full_text = [];
for job in jobs:
full_url = "/".join([self.base_url, "oarapi", "jobs", "%d.yaml" % job])
r = requests.delete(full_url, headers=self.headers, auth=self.auth)
r.raise_for_status()
full_text.append(r.text)
return "\n".join(full_text)
class OarBackendCmdAPI(OarBackendAPI):
def _call_oar(self, cmd):
subproc = subprocess.Popen(cmd, stdout=subprocess.PIPE, \
stderr=subprocess.PIPE)
try:
ret_code = subproc.wait()
except KeyboardInterrupt as ex:
print("User interrupted..")
if subproc.poll() is None:
print("Killing query..")
subproc.kill()
raise ex
if ret_code != 0:
raise ValueError("OAR: query failed with return code %d and message:\n%s" \
% (ret_code, subproc.stderr.read()))
return subproc.stdout.read()
def query(self, jobs, array, full_details, username):
cmd = ['oarstat', '--yaml']
if full_details is True:
cmd.append('--full')
if username != '':
cmd = cmd + ['--user', username]
for job in jobs:
cmd = cmd + ['--job', '%d' % job]
if array is not None:
cmd = cmd + ['--array', '%d' % array]
result = self._call_oar(cmd)
if result == '--- {}\n':
raise ValueError("OAR: No jobs that satisfy the criteria")
return result
def delete(self, jobs, array):
cmd = ['oardel']
if array is not None:
cmd = cmd + ['--array', '%d' % array]
cmd = cmd + [ '%d' % job for job in jobs ]
return self._call_oar(cmd)
def submit(self, command_path):
cmd = ['oarsub', '--yaml', '-S', command_path]
output = self._call_oar(cmd)
result = []
is_yaml = False
for line in output.split('\n'):
if line == '##########':
is_yaml = not is_yaml
elif is_yaml is True:
result.append(line)
return '\n'.join(result)
'''
Created on Jun 24, 2013
@author: ben
'''
import oar_api
import time
class OARMonitor(object):
def __init__(self):
self.oar_conn = oar_api.OarAPI(api_type="CmdAPI")
def _update_info(self):
raise NotImplementedError("Use derived Class!")
def _terminated(self):
return all(job_info['state'] == "Terminated" for job_info in self.info.itervalues())
def _terminated_correctly(self):
return all(job_info['exit_code'] == "0 (0,0,0)" for job_info in self.info.itervalues())
def _get_error_records(self):
return [job_info for job_info in self.info.itervalues() if job_info['exit_code'] != "0 (0,0,0)"]
def monitor(self, delay = 10, timeout = None):
result = None
try:
t = time.time()
self._update_info()
while not self._terminated():
if (timeout is not None and ((time.time() - t) > timeout)):
raise RuntimeError("Reached Timeout!")
time.sleep(delay)
self._update_info()
result = {"terminated_correctly" : self._terminated_correctly()}
if result["terminated_correctly"] is False:
result.update({"error_records" : self._get_error_records()})
except KeyboardInterrupt:
print("Terminated by user.")
except RuntimeError as exc:
print(exc)
return result
class OARMonitorArray(OARMonitor):
def __init__(self, array):
OARMonitor.__init__(self)
self.array = array
def _update_info(self):
self.info = self.oar_conn.query(array = self.array)
class OARMonitorJobs(OARMonitor):
def __init__(self, jobs):
OARMonitor.__init__(self)
self.jobs = jobs
def _update_info(self):
self.info = self.oar_conn.query(jobs = self.jobs)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment