Skip to content
Snippets Groups Projects
Commit 026df5c5 authored by Joao P C Bertoldo's avatar Joao P C Bertoldo
Browse files

implement shared memory allocation

parent 8f344024
No related branches found
No related tags found
1 merge request!6Draft: Resolve "py-bkg-rm"
......@@ -21,6 +21,8 @@ dependencies:
# pip stuff
- pip:
- silx==0.15.0
# dev stuff
- darfix[full]
- viztracer==0.12.3
"""Create a numpy array from a shared-memory buffer."""
import logging
import numpy as np
from numpy import ndarray
from Orange.widgets import gui, settings, widget
from Orange.widgets.utils.signals import Input, Output
from Orange.widgets.widget import OWWidget
from pydct.common import data2shared
from silx.gui.qt import QFileDialog, QThread
_logger = logging.getLogger("orangecontrib.pydct")
class AllocateSharedMemoryThread(QThread):
"""todo"""
def __init__(self, parent, data):
super().__init__(parent=parent)
self.data = data # todo make dataclass
self.data_shared = None
self.multiprocessing_array = None
def run(self):
""" """
self.data_shared, self.multiprocessing_array = data2shared(self.data, None)
class AllocateSharedMemory(OWWidget):
"""todo docstring of AllocateSharedMemory"""
name = "allocate shared memory"
description = "todo"
icon = "icons/one_round.svg"
want_main_area = False
resizing_enabled = True
class Inputs:
data = Input("data", ndarray)
class Outputs:
data_shared = Output("data_shared", ndarray)
multiprocessing_array = Output("multiprocessing_array", ndarray)
def __init__(self):
super().__init__()
self.run_button = gui.button(widget=self.controlArea, master=self, label="go", callback=self._run)
self.run_button.setDisabled(True)
self._computing = False
self.data = None
self.data_shared = None
self.multiprocessing_array = None
@Inputs.data
def set_data(self, data):
"""todo add validations here"""
# todo add a button to launch
if self._computing:
_logger.warning("allocation will be terminated")
self._thread.terminate() # todo check if there is a better way to do this
self._computing = False
self.data = data
if data is None:
self.run_button.setDisabled(True)
self.information()
else:
self.run_button.setDisabled(False)
self.information("new data, waiting to run")
def _run(self):
if self._computing:
_logger.warning("already allocating")
return
if self.data is None:
_logger.error("trying to run with data none, this shouldn't happen")
return
self.run_button.setDisabled(True)
self._thread = AllocateSharedMemoryThread(
parent=self,
data=self.data,
)
self._thread.finished.connect(self._send_signal)
self._computing = True
self.information("allocating...")
self._thread.start()
def _send_signal(self):
self._thread.finished.disconnect(self._send_signal)
_logger.info("done allocating shared memory")
self._computing = False
self.information("done")
if self._thread.data_shared is not None and self._thread.multiprocessing_array is not None:
self.data_shared = self._thread.data_shared
self.multiprocessing_array = self._thread.multiprocessing_array
self.Outputs.data_shared.send(self.data_shared)
self.Outputs.multiprocessing_array.send(self.multiprocessing_array)
else:
self.error("something went wrong and the thread did not return the results properly")
self.run_button.setDisabled(False)
......@@ -190,7 +190,7 @@ class namespace2kwargs:
return wrapper
def _data2shared(data: ndarray, multiprocessing_rawarray: Optional[RawArray]) -> Tuple[ndarray, RawArray]:
def data2shared(data: ndarray, multiprocessing_rawarray: Optional[RawArray]) -> Tuple[ndarray, RawArray]:
"""Check if the _data is already in a shared memor space or return such if not.
This is to avoid recopying the _data to a cross-process shared space, a very long operation.
......
......@@ -31,7 +31,7 @@ from typing import Optional, Tuple, Union
import numpy as np
from numpy import ndarray
from pydct import log
from pydct.common import _data2shared
from pydct.common import data2shared
# ========================================== module setup ==========================================
logger = logging.getLogger("pydct.preprocess.moving_median_removal")
......@@ -172,7 +172,7 @@ def remove_moving_medians(
f"{data.shape=} ==> {nz=} and {median_validity=}, the latter is not a factor of the former (see docstring for more detail)"
)
data, mp_data_shared = _data2shared(data, multiprocessing_rawarray)
data, mp_data_shared = data2shared(data, multiprocessing_rawarray)
idxs, validity_bounds, window_bounds = _get_zwindows(nz, median_validity, median_window)
......
......@@ -18,7 +18,7 @@ from typing import Optional, Tuple, Union
import numpy as np
from numpy import dtype, ndarray
from pydct import log
from pydct.common import _data2shared
from pydct.common import data2shared
from scipy.ndimage import median_filter
# ======================================= module setup =======================================
......@@ -89,7 +89,7 @@ def xyfilter_median(
filter_dimensions = (1, filter_dimensions[0], filter_dimensions[1])
data, mp_data_shared = _data2shared(data, multiprocessing_rawarray)
data, mp_data_shared = data2shared(data, multiprocessing_rawarray)
real_nprocs = nprocs or multiprocessing.cpu_count()
zbatch_size = int(np.ceil(nz / real_nprocs)) # only one task per worker because it's slightly better
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment