Commit b00bad51 authored by Pierre Paleo's avatar Pierre Paleo
Browse files

Add README

parent 04c9bd59
*.pyc
*.log
*.stdout
*.stderr
# Sidi
Sidi (Simple Distributed) is a tiny Python module on top of [dask.distributed](http://distributed.dask.org).
It brings three main features:
* The `DaskScheduler` object
* The `DaskWorker` object
* The `RemoteClass` object
The first two aim at making easier the instantiation of distributed scheduler and workers from Python scripts.
The latest is a wrapper around `distributed.Actor`.
\ No newline at end of file
"""
dsf
créer:
- si on ne spécifie pas les workers, on crée l'instance sur *tous* les workers
def create_remote_objects(client, class_object):
rws = []
for worker_addr in client.scheduler_info()["workers"].keys():
rw = client.submit(class_object, actors=True, workers=[worker_addr]).result() # todo init args
rws.append(rw)
return rws
soumettre une tâche: à un acteur, en conservant les "future":
il faut une routine qui bloque le "af.result()" :
def proj_dist(rw, n):
af = rw.proj(n)
return af.result()
et une autre pour soumettre cette routine, avec en paramètre l'acteur (pointeur vers remote class)
def submit_tasks2(cl, rw, n, i0=0):
for i in range(n):
f = cl.submit(proj_dist, rw, 1e10+i0+i)
f.add_done_callback(mycallback)
"""
class RemoteClass(object):
"""
Class living on a remote worker, based on distributed.Actor.
......@@ -47,9 +15,18 @@ class RemoteClass(object):
where wrapper_task is a blocking function waiting for an ActorFuture.
"""
def __init__(self, client, Class, worker, *args, **kwargs):
def __init__(
self,
client,
Class,
workers=None,
class_args=[],
class_kwargs={},
class_args_list=None,
class_kwargs_list=None
):
"""
Initialize a class on a single remote worker.
Initialize a class on one or more remote workers.
Parameters
-----------
......@@ -57,25 +34,46 @@ class RemoteClass(object):
Client to a distributed cluster/scheduler.
Class: object
Python class that will be instantiated on remote workers.
worker: str
Name/address of the target worker.
*args, **kwargs: arguments to pass to Class when instantiating it on workers.
workers: list of str
Name/address of the target workers.
class_args: list
Arguments to pass to each worker when instantiating the class.
class_kwargs: dict
Named arguments to pass to each worker when instantiating the class
class_args_list: list of list
List of "n" lists, where "n" is the length of "workers.
If used, the parameter "class_args" is ignored, and list number 0
serves as args for instantiating class on worker 0, and so on.
class_kwargs_list: list of dict
List of "n" dicts, where "n" is the length of "workers.
If used, the parameter "class_kwargs" is ignored, and list number 0
serves as kwargs for instantiating class on worker 0, and so on.
"""
self.client = client
self.name = Class.__name__
#~ setattr(self, self.name, )
self._available_workers = list(client.has_what().keys())
self.worker = worker
self.actors = {}
self._available_workers = list(client.has_what().keys())
self.workers = workers
# client.submit(workers=[...]) instantiates the actor on the first
# available worker, not all workers.
cli_args = [Class]
cli_args.extend(args)
cli_kwargs = {"actor": True, "workers": [worker]}
cli_kwargs.update(kwargs)
actor = client.submit(*cli_args, **cli_kwargs).result()
self.actor = actor
if workers is None:
self.workers = self._available_workers
if class_args_list is not None and len(class_args_list) != len(self.workers):
raise ValueError("class_args_list must be of the same length as the number of workers")
if class_kwargs_list is not None and len(class_kwargs_list) != len(self.workers):
raise ValueError("class_kwargs_list must be of the same length as the number of workers")
for i, worker_name in enumerate(self.workers):
class_args_ = class_args if class_args_list is None else class_args_list[i]
class_kwargs_ = class_kwargs if class_kwargs_list is None else class_kwargs_list[i]
cli_args = [Class]
cli_args.extend(class_args_)
cli_kwargs = {"actor": True, "workers": [worker_name]}
cli_kwargs.update(class_kwargs_)
actor = client.submit(*cli_args, **cli_kwargs).result()
self.actors[worker_name] = actor
@staticmethod
......@@ -86,7 +84,28 @@ class RemoteClass(object):
return af_res
def submit_task(self, method_name, callback=None, pure=False, method_args=[], method_kwargs={}):
def _get_first_available_worker(self):
avail = None
for worker_name, worker_tasks in self.client.processing().items():
if len(worker_tasks) == 0:
avail = worker_name
break
if avail is None:
# All workers are busy - choose the first one
avail = self.workers[0]
return avail
def submit_task(
self,
method_name,
callback=None,
workers=None,
method_args=[],
method_kwargs={},
who="first",
pure=False,
):
"""
Execute a method of a remote class.
......@@ -97,11 +116,19 @@ class RemoteClass(object):
callback: callable
Callback function executed as soon as a worker completed its task.
The callback must have only one argument (a future object).
workers: list
Specify a list of workers the work is submitted to.
If not provided, the task is submited to the FIRST worker available
(see below for other options).
method_args: tuple
Tuple of arguments for the called method.
method_kwargs: dict
Dictionary of named arguments for the called method.
who: str
Which worker to take if workers=None.
Available options are:
"first": submit work to the first available worker
"all": submit work to all the workers
Returns
--------
futures: list
......@@ -122,13 +149,40 @@ class RemoteClass(object):
client = Client("tcp://127.0.0.1:8786")
R = Remote(client, MyClass)
futures = R.submit_task("do_work", callback=mycallback, method_kwargs={'dummy': 2})
Notes
-----
A distributed `Actor` is a pointer to a class instance living on
another process.
Therefore, when instantiating several actors [A1, A2, ...], one must
explicitly provide the actor the task is submitted to
(ex. `my_actor_1.method(args1); my_actor_2.method(args2)`).
This contrasts with `distributed` standard stateless approach, where
tasks are submitted by the client, and the scheduler determines which
worker takes care of the work.
The `submit_task()` method attempts to find the first available worker
bound to [A1, A2, ...] by interrogating `client.processing()`.
When all underlying workers are busy, the task is submitted to the first
worker. This is a very simple and limited heuristic.
"""
sub_args = [self._retrieve_actor_result, self.actor, method_name]
sub_args.extend(method_args)
sub_kwargs = {"pure": pure}
sub_kwargs.update(method_kwargs)
future = self.client.submit(*sub_args, **sub_kwargs)
if callback is not None:
future.add_done_callback(callback)
return future
# TODO check that each worker name, belongs to self.workers ?
if workers is None:
if who == "all":
workers_names = self.workers
else:
workers_names = [self._get_first_available_worker()]
else:
workers_names = workers
futures = []
for worker_name in workers_names:
actor = self.actors[worker_name]
sub_args = [self._retrieve_actor_result, actor, method_name]
sub_args.extend(method_args)
sub_kwargs = {"pure": pure}
sub_kwargs.update(method_kwargs)
future = self.client.submit(*sub_args, **sub_kwargs)
if callback is not None:
future.add_done_callback(callback)
futures.append(future)
return futures
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment