Commit 04c9bd59 authored by Pierre Paleo's avatar Pierre Paleo
Browse files

RemoteClass: submit only to one worker

parent 204ba9ee
......@@ -10,7 +10,7 @@ def exec_shell_command(cmd, use_shell=False):
out, err = p.communicate()
return err, out
# TODO use pycuda.Device.get_attributes()
def get_nvidia_gpuinfo(tmp_fname="/tmp/gpuinfo.xml"):
if not(__have_xmltodict__):
raise ImportError("The xmltodict package is required")
......
......@@ -47,9 +47,9 @@ class RemoteClass(object):
where wrapper_task is a blocking function waiting for an ActorFuture.
"""
def __init__(self, client, Class, workers=None, *args, **kwargs):
def __init__(self, client, Class, worker, *args, **kwargs):
"""
Initialize
Initialize a class on a single remote worker.
Parameters
-----------
......@@ -57,26 +57,25 @@ class RemoteClass(object):
Client to a distributed cluster/scheduler.
Class: object
Python class that will be instantiated on remote workers.
workers: list
If not specified, the class will be instantiated on all workers client
is connected to. Otherwise, a list specifying the workers names/addresses
can be provided.
worker: str
Name/address of the target worker.
*args, **kwargs: arguments to pass to Class when instantiating it on workers.
"""
self.client = client
self.name = Class.__name__
#~ setattr(self, self.name, )
self.all_workers = list(client.has_what().keys())
self.workers = workers
if workers is None:
self.workers = self.all_workers
self._available_workers = list(client.has_what().keys())
self.worker = worker
self.actors = {}
# client.submit(workers=[...]) instantiates the actor on the first
# available worker, not all workers. Therefore, we use this loop.
for worker_name in self.workers:
actor = client.submit(Class, actor=True, workers=[worker_name], *args, **kwargs).result()
self.actors[worker_name] = actor
# available worker, not all workers.
cli_args = [Class]
cli_args.extend(args)
cli_kwargs = {"actor": True, "workers": [worker]}
cli_kwargs.update(kwargs)
actor = client.submit(*cli_args, **cli_kwargs).result()
self.actor = actor
@staticmethod
......@@ -87,7 +86,7 @@ class RemoteClass(object):
return af_res
def submit_task(self, method_name, workers=None, callback=None, method_args=(), method_kwargs={}):
def submit_task(self, method_name, callback=None, pure=False, method_args=[], method_kwargs={}):
"""
Execute a method of a remote class.
......@@ -95,10 +94,6 @@ class RemoteClass(object):
----------
method_name: str
Name of the class method
workers: list
If not specified, the method will be executed on all remote workers
visible from client. Otherwise, a list specifying the workers
names/addresses can be provided.
callback: callable
Callback function executed as soon as a worker completed its task.
The callback must have only one argument (a future object).
......@@ -128,15 +123,12 @@ class RemoteClass(object):
R = Remote(client, MyClass)
futures = R.submit_task("do_work", callback=mycallback, method_kwargs={'dummy': 2})
"""
futures = []
for actor_name, actor in self.actors.items():
if workers is not None and actor_name not in workers:
continue
future = self.client.submit(
self._retrieve_actor_result, actor, method_name,
*method_args, **method_kwargs
)
if callback is not None:
future.add_done_callback(callback)
futures.append(future)
return futures
sub_args = [self._retrieve_actor_result, self.actor, method_name]
sub_args.extend(method_args)
sub_kwargs = {"pure": pure}
sub_kwargs.update(method_kwargs)
future = self.client.submit(*sub_args, **sub_kwargs)
if callback is not None:
future.add_done_callback(callback)
return future
......@@ -62,11 +62,11 @@ class DaskWorker(object):
self.own_loop = True
if use_nanny:
kwargs = {'worker_port': port, 'listen_address': None}
kwargs = {'worker_port': port, 'listen_address': None} # TODO
what = Nanny
else:
kwargs = {}
kwargs['service_ports'] = {'nanny': port}
#~ kwargs['service_ports'] = {'nanny': port} # TODO
what = Worker
if interface:
......@@ -144,7 +144,10 @@ class DaskWorker(object):
def stop(self):
self.logger.debug("Attempting to terminate workers")
for n in self.nannies:
n.kill() # stop() ?
if self.use_nanny:
n.kill()
else:
n.stop()
n.status = "closed"
......@@ -163,3 +166,23 @@ class DaskWorker(object):
logging_options = default_options
loggername = logging_options.pop("loggername")
self.logger = Logger(loggername, **logging_options)
def run_daskworker(**daskworker_args):
"""
This function should be run in another process.
Both class instantiation and start() should be run on a separate process,
as the event loop is created in the instantiation.
"""
S = DaskWorker(**daskworker_args)
S.start()
def spawn_worker_process(**worker_args):
Ps = Process(target=run_daskworker, kwargs=daskworker_args)
Ps.start()
return Ps
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment