Commit f070707b authored by Carsten Richter's avatar Carsten Richter

Merge branch 'numcores' into 'master'

Allow to set number of processes

Closes #53

See merge request !78
parents b6ef204b 5a11f7ae
Pipeline #5235 passed with stages
in 6 minutes and 4 seconds
......@@ -31,6 +31,10 @@ __date__ = "26/04/2018"
import os as _os
import logging as _logging
from ._config import Config as _Config
config = _Config()
"""Global configuration shared with the whole library"""
# Attach a do nothing logging handler for xsocs
_logging.getLogger(__name__).addHandler(_logging.NullHandler())
......
......@@ -27,8 +27,10 @@
from __future__ import absolute_import
import argparse
from multiprocessing import cpu_count
from ..gui import xsocs_main
from .. import config
def main(argv):
......@@ -42,8 +44,20 @@ def main(argv):
'project_file',
nargs=argparse.OPTIONAL,
help='xsocs project file to open')
parser.add_argument(
'--numcores',
nargs='?',
type=int,
default=cpu_count(),
help='Max number of processes to use (default: %d)' % cpu_count())
options = parser.parse_args(argv[1:])
if options.numcores <= 0:
raise ValueError(
'Number of processes to use must be strictly positive')
config.DEFAULT_PROCESS_NUMBER = options.numcores
if options.project_file:
xsocs_main(projectH5File=options.project_file)
else:
......
# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2018 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
"""This module contains library wide configuration.
"""
__authors__ = ["T. Vincent"]
__license__ = "MIT"
__date__ = "04/09/2018"
from multiprocessing import cpu_count
class Config(object):
"""
Class containing shared global configuration for the silx library.
"""
DEFAULT_PROCESS_NUMBER = cpu_count()
"""Default max number of processes to use (Use number of core by default)
This value must be strictly positive.
"""
......@@ -31,10 +31,11 @@ __date__ = "15/09/2016"
from collections import OrderedDict
from multiprocessing import Pool, cpu_count, Manager, queues
from multiprocessing import Pool, Manager, queues
from silx.gui import qt as Qt, icons
from ... import config
from ..model.Node import Node
from ..model.ModelDef import ModelColumns
......@@ -160,7 +161,7 @@ def getIntensity(projectFile, pathTpl, view=None):
projectLock = manager.Lock()
queue = manager.Queue()
n_proc = cpu_count()
n_proc = config.DEFAULT_PROCESS_NUMBER
pool = Pool(n_proc,
maxtasksperchild=2)
......
......@@ -39,6 +39,7 @@ import multiprocessing.sharedctypes as mp_sharedctypes
import numpy as np
# from silx.math import curve_fit
from ... import config
from ...io import QSpaceH5
from ...fit import (GaussianFitter, GaussianResults,
CentroidFitter, CentroidResults,
......@@ -61,9 +62,8 @@ class PeakFitter(Thread):
[1, 2, 3] is provided, only those cubes will be fitted.
:type img_indices: *optional* `array_like`
:param n_proc: number of process to use. If None, the number of process
used will be the one returned by multiprocessing.cpu_count().
:type n_proc: `int`
:param Union[int,None] n_proc:
Number of process to use. If None, the config value is used.
"""
READY, RUNNING, DONE, ERROR, CANCELED = __STATUSES = range(5)
......@@ -93,7 +93,7 @@ class PeakFitter(Thread):
if n_proc:
self.__n_proc = n_proc
else:
n_proc = self.__n_proc = mp.cpu_count()
n_proc = self.__n_proc = config.DEFAULT_PROCESS_NUMBER
self.__shared_progress = mp_sharedctypes.RawArray(ctypes.c_int32,
n_proc)
......
......@@ -38,13 +38,14 @@ import os.path
import ctypes
from threading import Thread
import multiprocessing.sharedctypes as mp_sharedctypes
from multiprocessing import Pool, cpu_count, Manager
from multiprocessing import Pool, Manager
import h5py
import numpy as np
import fabio
from ...io import XsocsH5
from ... import config
_logger = logging.getLogger(__name__)
......@@ -121,7 +122,7 @@ class KmapMerger(object):
self.__spec_h5 = spec_h5
self.__callback = callback
self.__n_proc = None
self.__n_proc = config.DEFAULT_PROCESS_NUMBER
self.__compression = 'lzf'
self.__prefix = 'prefix'
self.__overwrite = False
......@@ -254,15 +255,10 @@ class KmapMerger(object):
with XsocsH5.XsocsH5MasterWriter(master_f, mode=mode):
pass
if self.__n_proc is None:
n_proc = cpu_count()
else:
n_proc = self.__n_proc
# setting progress to 0
np.frombuffer(self.__shared_progress, dtype='int32')[:] = 0
pool = Pool(n_proc,
pool = Pool(self.n_proc,
initializer=_init_process,
initargs=(term_evt,
self.__shared_progress),
......@@ -530,15 +526,13 @@ class KmapMerger(object):
@n_proc.setter
def n_proc(self, n_proc):
if n_proc is None:
self.__n_proc = None
return
if n_proc is None: # Use default
n_proc = config.DEFAULT_PROCESS_NUMBER
n_proc = int(n_proc)
if n_proc <= 0:
self.__n_proc = None
else:
self.__n_proc = n_proc
raise ValueError('n_proc must be strictly positive')
self.__n_proc = n_proc
image_roi = property(lambda self: self.__image_roi)
......
......@@ -90,10 +90,10 @@ def merge_scan_data(output_dir,
headers.
:type img_dir: *optional* str
:param n_proc: Number of threads to use when merging files. If None, the
number of threads used will be the value returned by the function
`multiprocessing.cpu_count()`
:type n_proc: *optional* str
:param Union[int,None] n_proc:
Number of threads to use when merging files.
If None, the number of processes used will be the
default config value (usually the number of cores).
:param version: version of the spec file. It is currently used to get
the offset and padding to apply to the nextNr value found in the spec scan
......
......@@ -41,6 +41,7 @@ import multiprocessing.sharedctypes as mp_sharedctypes
import numpy as np
import xrayutilities as xu
from ... import config
from ...util import bin_centers_to_range_step
from ...util.medianfilter import medfilt2d
from ...util.histogramnd_lut import histogramnd_get_lut, histogramnd_from_lut
......@@ -162,7 +163,10 @@ class QSpaceConverter(object):
""" Indices of sample positions that will be converted. """
n_proc = property(lambda self: self.__n_proc)
""" Number of processes to use. Will use cpu_count() if None or 0. """
"""Number of processes to use.
Uses the default config value if set to None.
"""
roi = property(lambda self: self.__params['roi'])
""" Selected ROI in sample coordinates : [xmin, xmax, ymin, ymax] """
......@@ -243,7 +247,7 @@ class QSpaceConverter(object):
"""Whether or not to apply Maxipix module edges correction (bool)"""
self.__callback = callback
self.__n_proc = None
self.__n_proc = config.DEFAULT_PROCESS_NUMBER
self.__overwrite = False
self.__shared_progress = None
......@@ -925,12 +929,8 @@ class QSpaceConverter(object):
write_lock = manager.Lock()
idx_queue = manager.Queue()
n_proc = self.n_proc
if n_proc is None or n_proc <= 0:
n_proc = mp.cpu_count()
self.__shared_progress = mp_sharedctypes.RawArray(ctypes.c_int32,
n_proc)
self.n_proc)
np.frombuffer(self.__shared_progress, dtype='int32')[:] = 0
if shiftH5 is not None and shifted_idx is not None:
......@@ -948,7 +948,7 @@ class QSpaceConverter(object):
shared_shifted = None
shared_shifted_shape = None
pool = mp.Pool(n_proc,
pool = mp.Pool(self.n_proc,
initializer=_init_thread,
initargs=(idx_queue,
write_lock,
......@@ -996,7 +996,7 @@ class QSpaceConverter(object):
# creating the processes
results = []
for th_idx in range(n_proc):
for th_idx in range(self.n_proc):
arg_list = (th_idx,
entry_files,
entries,
......@@ -1016,7 +1016,7 @@ class QSpaceConverter(object):
# sending the None value to let the threads know that they
# should return
for th_idx in range(n_proc):
for th_idx in range(self.n_proc):
idx_queue.put(None)
pool.close()
......@@ -1098,19 +1098,13 @@ class QSpaceConverter(object):
@n_proc.setter
def n_proc(self, n_proc):
""" Sets the number of processes to use. If None or 0 the number of
processes used will be the number returned by
multiprocessing.cpu_count.
"""
if n_proc is None:
self.__n_proc = None
return
n_proc = config.DEFAULT_PROCESS_NUMBER
n_proc = int(n_proc)
if n_proc <= 0:
self.__n_proc = None
else:
self.__n_proc = n_proc
raise ValueError('n_proc must be strictly positive')
self.__n_proc = n_proc
def abort(self, wait=True):
"""
......
......@@ -60,9 +60,10 @@ def kmap_2_qspace(xsocsH5_f,
y_max.
:type roi: *optional* `array_like` (x_min, x_max, y_min, y_max)
:param n_proc: number of process to use. If None, the number of process
used will be the one returned by multiprocessing.cpu_count().
:type n_proc: `int`
:param Union[int,None] n_proc:
number of process to use.
If None, the number of processes used will be the
default config value (usually the number of cores).
:param overwrite: if set to False, an exception will be raise if the output
file already exists.
......
......@@ -41,6 +41,7 @@ import numpy as np
from silx.utils.testutils import ParametricTestCase
from xsocs import config
from xsocs.test.testfilesmanager import TestFilesManager
from xsocs.process.fit.peak_fit import PeakFitter
......@@ -130,6 +131,8 @@ class TestPeakFitter(ParametricTestCase):
@classmethod
def setUpClass(cls):
config.DEFAULT_PROCESS_NUMBER = 2 # Limit number of processes
cls._tmpdir = tempfile.mkdtemp()
manager = cls._manager = TestFilesManager('www.silx.org/pub/'
'xsocs/test_data/')
......
......@@ -37,6 +37,8 @@ import tempfile
import unittest
from xsocs import config
from xsocs.test.testfilesmanager import TestFilesManager
from xsocs.process.merge.KmapMerger import KmapMerger
......@@ -77,6 +79,8 @@ class TestMerger(unittest.TestCase):
@classmethod
def setUpClass(cls):
config.DEFAULT_PROCESS_NUMBER = 2 # Limit number of processes
cls._tmpdir = tempfile.mkdtemp()
cls._spec_h5 = os.path.join(cls._tmpdir, 'spec_h5.h5')
manager = cls._manager = TestFilesManager('www.silx.org/pub/'
......
......@@ -38,6 +38,7 @@ import numpy as np
from silx.utils.testutils import ParametricTestCase
from xsocs import config
from xsocs.test.testfilesmanager import TestFilesManager
from xsocs.io.QSpaceH5 import QSpaceH5
......@@ -50,6 +51,8 @@ class TestQSpace(ParametricTestCase):
@classmethod
def setUpClass(cls):
config.DEFAULT_PROCESS_NUMBER = 2 # Limit number of processes
cls._tmpdir = tempfile.mkdtemp()
manager = cls._manager = TestFilesManager(
'www.silx.org/pub/xsocs/test_data/')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment