Commit 30a57502 authored by Pierre Paleo's avatar Pierre Paleo
Browse files

Start heuristic for dispatching work

parent a3bc4236
Pipeline #19449 canceled with stage
......@@ -25,6 +25,7 @@ Getting started
definitions.md
ccdprocessing.md
phase.md
distribution.md
pipeline.md
nabu_config_file.md
......
......@@ -3,6 +3,7 @@ from multiprocessing import cpu_count
from psutil import virtual_memory
from distributed import client, get_worker, worker_client
from ..utils import check_supported
from ..resources.computations import estimate_chunk_size
from ..cuda.utils import collect_cuda_gpus
from ..opencl.utils import collect_opencl_gpus
......@@ -56,7 +57,7 @@ def get_workers_resources(client, try_cuda=True, try_opencl=True):
return resources
def estimate_workers_chunk_size(process_config, client=None, workers_resources=None, min_chunk=50, gpu_ids=None):
def estimate_workers_chunk_size(process_config, client=None, workers_resources=None, min_chunk=50, try_cuda=True, try_opencl=True):
"""
Estimate the maximum chunk size that can be used on each worker.
Either `client` or `worker_resources` has to be provided.
......@@ -73,13 +74,11 @@ def estimate_workers_chunk_size(process_config, client=None, workers_resources=N
Mutually exclusive with the `client` parameter.
min_chunk: int
Minimum chunk size.
gpu_ids: list of int, optional
GPU ID for each worker, optional. Each item of the list can be None.
"""
if not((client is not None) ^ (workers_resources is not None)):
raise ValueError("Please provide either 'client' or 'workers_resources' parameter")
if workers_resources is None:
workers_resources = get_workers_resources(client, gpu_device_ids=gpu_ids)
workers_resources = get_workers_resources(client, try_cuda=try_cuda, try_opencl=try_opencl)
chunks = {}
for worker, resources in workers_resources.items():
chunk_cpu = estimate_chunk_size(
......@@ -127,5 +126,56 @@ def initialize_workers(client, process_config, chunk_sizes=None):
def get_common_gpus(workers_resources):
"""
Return the GPUs visible from several workers.
"""
# ~ for worker, resources in workers_resources.items():
pass
def dispatch_workers_local(workers_resources):
"""
Dispatch resources to workers according to the "local" heuristic (for
computations distributed on the local machine).
Please see the documentation of `dispatch_resources_to_workers`.
Suppose we have `N_w` workers, and there are `N_g` GPUs in total. Then we
must have `N_w <= N_g + 1` (the worker without a GPU uses the remaining
CPU cores to do computations with OpenCL).
"""
def dispatch_workers_slurm(workers_resources):
"""
Dispatch resources to workers according to the "SLURM" heuristic.
It is assumed that the GPUs seen by a node are not seen by other nodes.
Please see the documentation of `dispatch_resources_to_workers`.
"""
DISPATCH_HEURISTICS = ["slurm", "local"]
def dispatch_resources_to_workers(workers_resources, heuristic):
"""
Dispatch available resources to workers according to a heuristic.
Available heuristics are: "slurm", "local"
The notations are the following
- `N_w`: number of workers
- `N_g`: number of GPUs
Parameters
-----------
worker_resources: dict
Dictionary containing the information on workers resources, obtained
with `get_workers_resources()`.
"""
check_supported(heuristic, DISPATCH_HEURISTICS, "heuristic")
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment