Commit 01a31b84 authored by Pierre Paleo's avatar Pierre Paleo
Browse files

Small mods to RemoteClass. Add demo.

parent 04f38c05
import subprocess
try:
import xmltodict
__have_xmltodict__ = True
except ImportError:
__have_xmltodict__ = False
def exec_shell_command(cmd, use_shell=False):
p = subprocess.Popen(cmd.split(), shell=use_shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
return err, out
def get_nvidia_gpuinfo(tmp_fname="/tmp/gpuinfo.xml"):
if not(__have_xmltodict__):
raise ImportError("The xmltodict package is required")
cmd = str("nvidia-smi -q -x -f %s" % tmp_fname)
exec_shell_command(cmd)
with open(tmp_fname) as fid:
lines = fid.read()
infos = xmltodict.parse(lines)
infos = infos["nvidia_smi_log"]
return infos
......@@ -87,7 +87,7 @@ class RemoteClass(object):
return af_res
def submit_task(self, method_name, workers=None, callback=None, *args, **kwargs):
def submit_task(self, method_name, workers=None, callback=None, method_args=(), method_kwargs={}):
"""
Execute a method of a remote class.
......@@ -102,7 +102,10 @@ class RemoteClass(object):
callback: callable
Callback function executed as soon as a worker completed its task.
The callback must have only one argument (a future object).
*args, **kwargs: arguments of called method.
method_args: tuple
Tuple of arguments for the called method.
method_kwargs: dict
Dictionary of named arguments for the called method.
Returns
--------
......@@ -123,7 +126,7 @@ class RemoteClass(object):
client = Client("tcp://127.0.0.1:8786")
R = Remote(client, MyClass)
futures = R.submit_task("do_work", callback=mycallback, dummy=2)
futures = R.submit_task("do_work", callback=mycallback, method_kwargs={'dummy': 2})
"""
futures = []
for actor_name, actor in self.actors.items():
......@@ -131,7 +134,7 @@ class RemoteClass(object):
continue
future = self.client.submit(
self._retrieve_actor_result, actor, method_name,
*args, **kwargs
*method_args, **method_kwargs
)
if callback is not None:
future.add_done_callback(callback)
......
import os
from threading import Thread
from distributed import Client
from sidi.scheduler import spawn_scheduler_process
from sidi.worker import DaskWorker
from sidi.remote import RemoteClass
from time import sleep
class Dummy(object):
def __init__(self, a=1):
self.a = a
def do_work(self, a):
print("[%d] Doing work" % os.getpid())
sleep(a)
return a+1
def mycallback(fut):
print("callback time !")
def main():
P = spawn_scheduler_process(addr="tcp://127.0.0.1", port=5455)
W = DaskWorker("tcp://127.0.0.1:5455", nprocs=1, nthreads=1)
T = Thread(target=W.start)
T.start()
cl = Client("tcp://127.0.0.1:5455")
input("Instantiate remote class ?")
RC = RemoteClass(cl, Dummy, a=1)
print("...OK\n\n")
sleep(2)
input("Ready to submit...")
futures = RC.submit_task("do_work", callback=mycallback, method_args=(1,))
print(futures)
return P, W
if __name__ == "__main__":
try:
P, W = main()
finally:
W.stop()
P.terminate()
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment