Commit e9d712c1 authored by Thomas Vincent's avatar Thomas Vincent
Browse files

avoid writting to hdf5 from multiprocessing part

parent ed3310c8
......@@ -31,7 +31,9 @@ __date__ = "15/09/2016"
from collections import OrderedDict
from multiprocessing import Pool, Manager, queues
import functools
import logging
from multiprocessing import Pool
from silx.gui import qt as Qt, icons
......@@ -49,6 +51,8 @@ from .ProjectItem import ProjectItem
from .ProjectDef import ItemClassDef
_logger = logging.getLogger(__name__)
@ItemClassDef('IntensityItem')
class IntensityItem(ProjectItem):
......@@ -102,32 +106,13 @@ class IntensityGroup(ProjectItem):
return IntensityItem(self.filename, itemPath)
def _getIntensity(entry, entry_f, projectLock, projectFile, pathTpl, queue):
queue.put({'id': entry,
'state': 'started',
'done':False})
try:
# TODO : this works because each entry has its own separate file. Watch
# out for errors (maybe?) if one day there is only one file for all
# entries
with XsocsH5(entry_f) as entryH5:
# there should be only one entry --> removed `entry` argument
cumul = entryH5.image_cumul()
angle = entryH5.scan_angle()
dsetPath = pathTpl.format(str(entry))
projectLock.acquire()
# WARNING, make sure the file isn't opened in write mode elsewhere!!!
IntensityItem(projectFile, dsetPath, mode='r+', data=cumul)
projectLock.release()
except:
state = 'error'
else:
state = 'done'
queue.put({'id': entry,
'state': state,
'done':True})
def _getIntensity(entry_f):
# TODO : this works because each entry has its own separate file. Watch
# out errors (maybe?) if one day there is only one file for all
# entries
with XsocsH5(entry_f) as entryH5:
cumul = entryH5.image_cumul()
return cumul
def getIntensity(projectFile, pathTpl, view=None):
......@@ -157,43 +142,47 @@ def getIntensity(projectFile, pathTpl, view=None):
mw.show()
app.processEvents()
manager = Manager()
projectLock = manager.Lock()
queue = manager.Queue()
pool = Pool(config.DEFAULT_PROCESS_NUMBER)
results = OrderedDict()
n_proc = config.DEFAULT_PROCESS_NUMBER
def callback(subject, entry, _):
"""Callback handling apply_async success"""
subject.sigStateChanged.emit({'id': entry, 'state': 'done'})
pool = Pool(n_proc,
maxtasksperchild=2)
results = OrderedDict()
def error_cb(subject, entry, _):
"""Callback handling apply_async error"""
_logger.error("An error occured while processing entry: %s", entry)
subject.sigStateChanged.emit({'id': entry, 'state': 'error'})
# Start a task for each entry
for entry in entries:
subject.sigStateChanged.emit({'id': entry, 'state': 'started'})
entry_f = xsocsH5.object_filename(entry)
results[entry] = pool.apply_async(
_getIntensity,
args=(xsocsH5.object_filename(entry),),
callback=functools.partial(callback, subject, entry),
error_callback=functools.partial(error_cb, subject, entry))
args = (entry,
entry_f,
projectLock,
projectFile,
pathTpl,
queue,)
results[entry] = pool.apply_async(_getIntensity,
args)
pool.close()
while results:
try:
msg = queue.get(True, 0.01)
if msg['done']:
del results[msg['id']]
subject.sigStateChanged.emit(msg)
except queues.Empty:
pass
# Wait for all tasks to be complete while running Qt event loop
while [res for res in results.values() if not res.ready()]:
app.processEvents()
# Make sure all tasks are done, but that should already be the case
pool.join()
# Saving result to file
for entry, result in results.items():
dsetPath = pathTpl.format(str(entry))
# WARNING, make sure the file isn't opened in write mode elsewhere!!!
if result.successful():
IntensityItem(projectFile, dsetPath, mode='r+', data=result.get())
else:
_logger.error("Intensity computation failed for entry: %s", entry)
mw.close()
mw.deleteLater()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment