Commit 04f38c05 authored by Pierre Paleo's avatar Pierre Paleo
Browse files

Add worker and RemoteClass

parent 246dda6a
......@@ -150,6 +150,7 @@ class Logger(object):
'class': 'logging.FileHandler',
'formatter': 'default',
'filename': self.logfile,
'mode': 'w', # <= what is the "best" default ?
#~ 'maxBytes': 1048576,
#~ 'backupCount': 3
}
......
"""
dsf
créer:
- si on ne spécifie pas les workers, on crée l'instance sur *tous* les workers
def create_remote_objects(client, class_object):
rws = []
for worker_addr in client.scheduler_info()["workers"].keys():
rw = client.submit(class_object, actors=True, workers=[worker_addr]).result() # todo init args
rws.append(rw)
return rws
soumettre une tâche: à un acteur, en conservant les "future":
il faut une routine qui bloque le "af.result()" :
def proj_dist(rw, n):
af = rw.proj(n)
return af.result()
et une autre pour soumettre cette routine, avec en paramètre l'acteur (pointeur vers remote class)
def submit_tasks2(cl, rw, n, i0=0):
for i in range(n):
f = cl.submit(proj_dist, rw, 1e10+i0+i)
f.add_done_callback(mycallback)
"""
class RemoteClass(object):
"""
Class living on a remote worker, based on distributed.Actor.
Notes
-----
As distributed.ActorFuture is not as featured as distributed.Future,
a workaround is done to both execute "remote class instance methods" and benefit
from distributed.Future features.
As an Actor is "a pointer to a user-defined-object living on a remote worker",
we can submit tasks passing the actor reference:
client.submit(wrapper_task, actor, args)
where wrapper_task is a blocking function waiting for an ActorFuture.
"""
def __init__(self, client, Class, workers=None, *args, **kwargs):
"""
Initialize
Parameters
-----------
client: distributed.Client
Client to a distributed cluster/scheduler.
Class: object
Python class that will be instantiated on remote workers.
workers: list
If not specified, the class will be instantiated on all workers client
is connected to. Otherwise, a list specifying the workers names/addresses
can be provided.
*args, **kwargs: arguments to pass to Class when instantiating it on workers.
"""
self.client = client
self.name = Class.__name__
#~ setattr(self, self.name, )
self.all_workers = list(client.has_what().keys())
self.workers = workers
if workers is None:
self.workers = self.all_workers
self.actors = {}
# client.submit(workers=[...]) instantiates the actor on the first
# available worker, not all workers. Therefore, we use this loop.
for worker_name in self.workers:
actor = client.submit(Class, actor=True, workers=[worker_name], *args, **kwargs).result()
self.actors[worker_name] = actor
@staticmethod
def _retrieve_actor_result(actor, method_name, *args, **kwargs):
actor_instance_method = getattr(actor, method_name)
af = actor_instance_method(*args, **kwargs)
af_res = af.result() # blocks
return af_res
def submit_task(self, method_name, workers=None, callback=None, *args, **kwargs):
"""
Execute a method of a remote class.
Parameters
----------
method_name: str
Name of the class method
workers: list
If not specified, the method will be executed on all remote workers
visible from client. Otherwise, a list specifying the workers
names/addresses can be provided.
callback: callable
Callback function executed as soon as a worker completed its task.
The callback must have only one argument (a future object).
*args, **kwargs: arguments of called method.
Returns
--------
futures: list
A list of futures.
Example
-------
class MyClass(object):
def __init__(self):
pass
def do_work(self, dummy=1):
print("Doing work")
return 0
def mycallback(fut):
print("Callback !")
client = Client("tcp://127.0.0.1:8786")
R = Remote(client, MyClass)
futures = R.submit_task("do_work", callback=mycallback, dummy=2)
"""
futures = []
for actor_name, actor in self.actors.items():
if workers is not None and actor_name not in workers:
continue
future = self.client.submit(
self._retrieve_actor_result, actor, method_name,
*args, **kwargs
)
if callback is not None:
future.add_done_callback(callback)
futures.append(future)
return futures
from multiprocessing import Process
from tornado.ioloop import IOLoop
from distributed import Scheduler
from distributed.cli.utils import install_signal_handlers
from .utils import generate_random_string
from .logger import Logger
......@@ -22,6 +24,12 @@ class DaskScheduler(object):
Dict where the keys are parameters of the Logger() class.
bokeh_port: int, optional
If set, the scheduler will propose a Bokeh service at the given port.
Notes
-----
The scheduler has to use its own event loop (tornado.IOLoop()).
As the GIL prevents to effectively use different event loops in separate threads,
the scheduler has to be run in a separate process.
"""
def __init__(
self,
......@@ -66,7 +74,6 @@ class DaskScheduler(object):
loggername = logging_options.pop("loggername")
self.logger = Logger(loggername, **logging_options)
def configure_bokeh(self, bokeh_port, bokeh_prefix=None):
if bokeh_port is not None:
from distributed.bokeh.scheduler import BokehScheduler
......@@ -75,6 +82,9 @@ class DaskScheduler(object):
def start(self, start_ioloop=True):
"""
Start the scheduler.
"""
self.logger.info("Starting scheduler at %s" % self.addr)
self.scheduler.start(self.addr)
try:
......@@ -87,7 +97,24 @@ class DaskScheduler(object):
self.logger.info("Stopping scheduler")
self.scheduler.stop()
def cleanup(self, signum):
self.logger.critical("Received signal %s" % str(signum))
pass
def run_scheduler(**scheduler_args):
"""
This function should be run in another process.
Both class instantiation and start() should be run on a separate process,
as the event loop is created in the instantiation.
"""
S = DaskScheduler(**scheduler_args)
S.start()
def spawn_scheduler_process(**scheduler_args):
Ps = Process(target=run_scheduler, kwargs=scheduler_args)
Ps.start()
return Ps
from random import choice
from string import ascii_letters, digits
#~ from psutil import cpu_count, virtual_memory, net_if_addrs, cpu_freq
import psutil
from socket import gethostname
from .gpuinfo import get_nvidia_gpuinfo
def generate_random_string(length):
result = ""
......@@ -7,3 +13,69 @@ def generate_random_string(length):
for i in range(length):
result += choice(alphabet)
return result
def get_mem_info():
return psutil.virtual_memory()
def get_cpu_info():
return psutil.cpu_count(), psutil.cpu_freq()
def get_machine_info():
meminfo = get_mem_info()
cpuinfo = get_cpu_info()
try:
nv_gpuinfo = get_nvidia_gpuinfo()
except:
nv_gpuinfo = None
M = 1024.**3
infos = {
"hostname": gethostname(),
"cpu_count": cpuinfo[0],
"cpu_maxfreq": str("%.2f GHz" % (cpuinfo[1].max/1e3)),
"mem_total": str("%.1f GB" % (meminfo.total/M)),
"mem_used": str("%.1f GB" % (meminfo.used/M)),
"mem_avail": str("%.1f GB" % (meminfo.available/M)),
"gpu_count": 0,
}
if nv_gpuinfo is not None:
gpu_infos = {
"gpu_count": nv_gpuinfo["attached_gpus"],
"gpu": [],
}
for ginfos in nv_gpuinfo["gpu"]:
gpu_infos["gpu"].append({
"name": ginfos["product_name"],
"mem_total": ginfos["fb_memory_usage"]["total"],
"mem_used": ginfos["fb_memory_usage"]["used"]
})
infos.update(gpu_infos)
return infos
"""
def print_machine_info():
meminfo = get_mem_info()
cpuinfo = get_cpu_info()
try:
nv_gpuinfo = get_nvidia_gpuinfo()
except:
nv_gpuinfo = None
M = 1024.**3
print("Hostname: \t %s" % gethostname())
print("CPUs: \t\t %d (%.1f GHz)" % (cpuinfo[0], cpuinfo[1].max/1e3))
print("Memory: \t %.1f GB (%.1f GB used)" % (meminfo.total/M, meminfo.used/M))
if nv_gpuinfo is not None:
print("GPUs: %s" % nv_gpuinfo["attached_gpus"])
for i, ginfos in enumerate(nv_gpuinfo["gpu"]):
print(" GPU %d: \t %s (%s GB, %s used)" % (
i,
ginfos["product_name"],
ginfos["fb_memory_usage"]["total"],
ginfos["fb_memory_usage"]["used"]
))
"""
from multiprocessing import cpu_count
from tornado import gen
from tornado.ioloop import IOLoop
from distributed import Worker, Nanny
from distributed.cli.utils import install_signal_handlers
from distributed.utils import get_ip_interface, parse_timedelta
from .utils import generate_random_string
from .logger import Logger
_ncores = cpu_count()
class DaskWorker(object):
"""
Dask Worker/Nanny.
Parameters
-----------
scheduler_addr: str
Address of the scheduler, ex. tcp://127.0.0.1:8786
nprocs: int, optional
Number of worker processes to launch.
nthreads: int, optional
Number of threads per worker worker
host: str, optional
Worker listening host
port: int, optional
Worker listening port
"""
def __init__(
self,
scheduler_addr,
nprocs=1,
nthreads=None,
logging_args=None,
host=None,
port=None,
use_nanny=True,
interface=None,
name=None,
death_timeout=None,
loop=None,
memory_limit = "auto",
local_directory = "/tmp",
services = {},
resources = None,
):
self.name = name or "worker_" + generate_random_string(10)
self.configure_logger(logging_args)
self.logger.debug("Initializing worker '%s'" % self.name)
self.use_nanny = use_nanny
if not nthreads:
# Share the available threads among workers
nthreads = _ncores // nprocs
self.loop = loop or IOLoop.current()
self.logger.debug("Using loop %s" % str(self.loop))
self.own_loop = True
if use_nanny:
kwargs = {'worker_port': port, 'listen_address': None}
what = Nanny
else:
kwargs = {}
kwargs['service_ports'] = {'nanny': port}
what = Worker
if interface:
if host:
raise ValueError("Can not specify both interface and host")
else:
host = get_ip_interface(interface)
if host is not None and port is not None:
self.addr = host + port
else:
# Choose appropriate address for scheduler
self.addr = None
if death_timeout is not None:
death_timeout = parse_timedelta(death_timeout, 's')
# TODO support bokeh ?
self.nannies = [
what(
scheduler_addr,
ncores=nthreads,
services=services,
loop=self.loop,
resources=resources,
memory_limit=memory_limit,
reconnect=True,
local_dir=local_directory,
death_timeout=death_timeout,
name=self.name if nprocs == 1 else self.name + '-' + str(i),
**kwargs
)
for i in range(nprocs)
]
install_signal_handlers(self.loop, cleanup=self.on_signal)
@gen.coroutine
def close_all(self):
# Unregister all workers from scheduler
if self.use_nanny:
yield [n._close(timeout=2) for n in self.nannies]
def on_signal(self, signum):
self.logger.critical("Exiting on signal %d" % signum)
self.close_all()
@gen.coroutine
def run(self):
yield [n._start(self.addr) for n in self.nannies]
while all(n.status != 'closed' for n in self.nannies):
yield gen.sleep(0.2)
self.logger.debug("All workers closed")
def start(self):
try:
self.loop.run_sync(self.run)
except Exception as exc:
# Another was already started with the same IOLoop
# There must be a way to detect that IOLoop is not owned by current thread ?
if str(exc) == "IOLoop is already running":
self.logger.warning("IOLoop %s is already used" % str(self.loop))
self.own_loop = False
else:
self.logger.fatal("Exception: %s" % exc) # logger.exception ?
# raise ?
finally:
if self.own_loop:
self.logger.info("Stopping worker '%s'" % self.name)
def stop(self):
self.logger.debug("Attempting to terminate workers")
for n in self.nannies:
n.kill() # stop() ?
n.status = "closed"
# TODO factor with scheduler
def configure_logger(self, logging_options):
default_options = {
"loggername": self.name,
"level": "DEBUG",
"logfile": self.name + ".log",
"console": True,
"capture_stderr": True,
}
if logging_options is not None:
default_options.update(logging_options)
else:
logging_options = default_options
loggername = logging_options.pop("loggername")
self.logger = Logger(loggername, **logging_options)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment