Commit 8123f59f authored by payno's avatar payno

Merge branch 'add_thread_ftseries' into 'master'

Add thread ftseries

See merge request !65
parents 309eef6f d6eea07e
......@@ -32,19 +32,24 @@ __date__ = "11/04/2017"
from silx.gui import qt
from orangecontrib.esrf.core.utils.Singleton import Singleton
from orangecontrib.esrf.web.client import OWClient
import subprocess
import sys
import time
import os
import logging
logger = logging.getLogger(__file__)
@Singleton
class RSyncManager(qt.QObject):
class RSyncManager(OWClient, qt.QObject):
"""docstring for RSyncManager"""
WAITING_TIME = 1
def __init__(self):
qt.QObject.__init__(self)
OWClient.__init__(self, logger)
self.threads = {}
def syncFolder(self, source, target, useThread=True, delete=False,
......@@ -61,13 +66,13 @@ class RSyncManager(qt.QObject):
:param bool parallel: True if we want to launch rsync in parallel mode.
"""
# TODO : add a link with verbose. Replace -r per -a
options = ['-r', '-t']
options = ['--recursive', '--times']
if delete is True:
options.append('--delete')
options.append('--remove-source-files')
if verbose is True:
options.append('-v')
options.append('--verbose')
return self.syncFolderRaw(source=source,
target=target,
options=options,
......@@ -90,10 +95,10 @@ class RSyncManager(qt.QObject):
:param bool parallel: True if we want to launch rsync in parallel mode.
"""
if useThread is True:
thread = RSynchThread(source=source,
target=target,
options=options,
parallel=parallel)
thread = RSyncThread(source=source,
target=target,
options=options,
parallel=parallel)
self.threads[(source, target)] = thread
if handler is not None:
......@@ -119,7 +124,7 @@ class RSyncManager(qt.QObject):
if not sys.platform.startswith('linux'):
return False
try:
subprocess.call(["rsync", "--version"])
subprocess.call(["rsync", "--version"], stdout = subprocess.PIPE)
return True
except OSError:
return False
......@@ -140,12 +145,12 @@ class RSyncManager(qt.QObject):
def canUseParallel():
"""True if we can use rsync in parallel mode"""
try:
subprocess.call(["parallel", "--version"])
subprocess.call(["parallel", "--version"], stdout = subprocess.PIPE)
return True
except OSError:
return False
class RSynchThread(qt.QThread):
class RSyncThread(OWClient, qt.QThread):
"""Thread dealing with synchronisation
"""
......@@ -153,7 +158,8 @@ class RSynchThread(qt.QThread):
"""Signal emitted when the synchronization is over"""
def __init__(self, source, target, options, parallel):
super(RSynchThread, self).__init__()
OWClient.__init__(self, logger)
qt.QThread.__init__(self)
self._source = source
self._target = target
self._options = options
......@@ -166,4 +172,33 @@ class RSynchThread(qt.QThread):
options=self._options,
parallel=self._parallel)
subprocess.call(command, shell=True, stdout=subprocess.PIPE)
# if delete action have been requested:
if '--remove-source-files' in self._options:
if RSyncThread.removeEmptyFolders(self._source) is False:
mess = 'fail to remove file on %s.' % self._source
mess += 'Synchronisation might have failed'
logger.error(mess)
raise OSError(mess)
self.syncFinished.emit(self._source, self._target)
@staticmethod
def removeEmptyFolders(folder):
assert(os.path.isdir(folder))
subFiles = os.listdir(folder)
if len(subFiles) == 0:
os.rmdir(folder)
return True
else:
for subFile in subFiles:
subFolder = os.path.join(folder, subFile)
if not os.path.isdir(subFolder):
return False
else:
if not RSyncThread.removeEmptyFolders(subFolder):
return False
os.rmdir(folder)
return True
\ No newline at end of file
This diff is collapsed.
......@@ -38,15 +38,16 @@ import logging
import os
from orangecontrib.esrf.core import tomodir
from orangecontrib.esrf.core.RSyncManager import RSyncManager
from orangecontrib.esrf.web.client import OWClient
import time
_logger = logging.getLogger("tomodirThread")
logger = logging.getLogger("tomodirThread")
TIME_OUT_DURATION = 200 # in second
DEFAULT_TIME_OUT_DURATION = 200 # in second
class TomoDirObserver(qt.QThread):
class TomoDirObserver(OWClient, qt.QThread):
"""Thread launching the tomodir process (observation of acquisition)
:param str headDir: the root dir to make to fin dacquisition
......@@ -59,6 +60,8 @@ class TomoDirObserver(qt.QThread):
and for all .edf file to be copied
:param str srcPattern: see tomodir
:param str destPattern: see tomodir
:param int timeout: if found an acquisition running, will wait for this
duration until moving to an other folder
"""
sigStatusChanged = qt.Signal(tuple)
......@@ -67,14 +70,17 @@ class TomoDirObserver(qt.QThread):
def __init__(self, headDir=None, startByOldest=False,
funcAdvancementHandler=None, waitForXML=False,
srcPattern=None, destPattern=None):
super(qt.QThread, self).__init__()
srcPattern=None, destPattern=None,
timeout=DEFAULT_TIME_OUT_DURATION):
OWClient.__init__(self, logger=logger)
qt.QThread.__init__(self)
self.setHeadFolder(headDir)
self.setStartByOldest(startByOldest)
self.funcAdvancementHandler = funcAdvancementHandler
self.waitForXML = waitForXML
self.srcPattern = srcPattern
self.destPattern = destPattern
self.timeout = timeout
self.tomodirProcess = None
self.reset()
......@@ -105,16 +111,16 @@ class TomoDirObserver(qt.QThread):
def run(self):
if not os.path.isdir(self.headDir):
_logger.info('can\'t observe %s, not a directory'%self.headDir)
logger.info('can\'t observe %s, not a directory'%self.headDir)
return
if self.status in ('waiting for acquisition ending', 'started'):
if self.isTimeOut() or not os.path.isdir(self.lastObsDir):
self.resetTimeOut()
dirToObserve = self.headDir
self.startTimeOutCount()
self.dirToObserve = self.headDir
else:
dirToObserve = self.lastObsDir
self.dirToObserve = self.lastObsDir
else:
dirToObserve = self.headDir
self.dirToObserve = self.headDir
# if status is observing check timeout
# if timeout is exceeded then do not check this folder on the next
......@@ -122,7 +128,7 @@ class TomoDirObserver(qt.QThread):
# if not exceeded then do not change the observe directory if headdir
# not changed.
# otherwise default behavior : observe the head dir
self.tomodirProcess = tomodir.TomoDir(dataDir=dirToObserve,
self.tomodirProcess = tomodir.TomoDir(dataDir=self.dirToObserve,
waitXML=self.waitForXML,
srcPattern=self.srcPattern,
destPattern=self.destPattern)
......@@ -141,7 +147,7 @@ class TomoDirObserver(qt.QThread):
nextd = self.tomodirProcess.dir_explore()
except Exception as e:
self._updateStatus(('failure', self.headDir))
_logger.error('fail to explore dir %s'%self.headDir)
logger.error('fail to explore dir %s'%self.headDir)
raise e
if nextd is True:
......@@ -151,10 +157,12 @@ class TomoDirObserver(qt.QThread):
if validation == 1:
# signal status of acquisition
self.resetTimeOut()
self._updateStatus(('acquisition ended', self.tomodirProcess.parsing_dir))
elif validation == 0:
self._updateStatus(('waiting for acquisition ending', self.tomodirProcess.parsing_dir))
elif validation == -1:
self.resetTimeOut()
self._updateStatus(('not processing', 'observation stopped'))
def setSourceToDestinationPatterns(self, srcPattern, destPattern):
......@@ -184,7 +192,7 @@ class TomoDirObserver(qt.QThread):
assert(type(status) in (list, tuple))
assert(status[0] in tomodir.OBSERVATION_STATUS)
info = "new status of the observation " + status[0]
_logger.info(info)
logger.info(info)
if status[0] == 'waiting for acquisition ending':
# if this is a new observation
if self.lastObsDir != status[1]:
......@@ -194,7 +202,7 @@ class TomoDirObserver(qt.QThread):
self.sigStatusChanged.emit(status)
def _updateInformation(self, info):
_logger.info(info)
logger.info(info)
self.sigInformation.emit(info)
def startTimeOutCount(self):
......@@ -207,8 +215,7 @@ class TomoDirObserver(qt.QThread):
if self.startTime is None:
return False
return (time.time() - self.startTime) > TIME_OUT_DURATION
return (time.time() - self.startTime) > self.timeout
def setTimeOut(self, t):
global TIME_OUT_DURATION
TIME_OUT_DURATION = t
def setTimeOut(self, timeout):
self.timeout = timeout
# coding: utf-8
# /*##########################################################################
#
# Copyright (c) 2017 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
__authors__ = ["H. Payno"]
__license__ = "MIT"
__date__ = "09/06/2017"
from orangecontrib.esrf.core.TomoDirObserver import TomoDirObserver
from orangecontrib.esrf.test.utils import UtilsTest
import unittest
import tempfile
import shutil
import os
import time
class TestTimeOut(unittest.TestCase):
"""Simple test to make sure the timeout of tomodir is working properly"""
@classmethod
def setUpClass(cls):
# create a folder with an unfinished acquisition
cls.observeFolder = tempfile.mkdtemp()
cls.dataSetID = 'test10'
dataDir = UtilsTest.getDataset(cls.dataSetID)
shutil.copytree(dataDir, os.path.join(cls.observeFolder, cls.dataSetID))
fileName = cls.dataSetID + '.xml'
filePath = os.path.join(cls.observeFolder, cls.dataSetID, fileName)
assert(os.path.isfile(filePath))
os.remove(filePath)
# observer
cls.observer = TomoDirObserver(headDir=cls.observeFolder,
startByOldest=False,
funcAdvancementHandler=None,
waitForXML=True,
srcPattern=None,
destPattern=None,
timeout=1)
def testTimeOut(self):
"""Make sure time out is valide.
Should find the acquisition then should move up to the top level
directory if time is out...
"""
# intially observe top level dir
self.observer.start()
self.observer.wait()
self.assertTrue(
self.observer.dirToObserve == self.observeFolder)
# after one iteration, found the DataSetID, wait until time out
self.observer.start()
self.observer.wait()
self.assertTrue(
self.observer.dirToObserve == os.path.join(self.observeFolder, self.dataSetID))
# if time out exceeded, start back from the top level
time.sleep(1.1)
self.observer.start()
self.observer.wait()
self.assertTrue(self.observer.dirToObserve == self.observeFolder)
# then found again the newest (previous one)
self.observer.start()
self.observer.wait()
self.assertTrue(
self.observer.dirToObserve == os.path.join(self.observeFolder, self.dataSetID))
# time out exceeded again
time.sleep(1.1)
self.observer.start()
self.observer.wait()
self.assertTrue(self.observer.dirToObserve == self.observeFolder)
def suite():
test_suite = unittest.TestSuite()
for ui in (TestTimeOut,):
test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(ui))
return test_suite
if __name__ == '__main__':
unittest.main(defaultTest="suite")
\ No newline at end of file
......@@ -44,6 +44,7 @@ from silx.third_party.EdfFile import EdfFile
from silx.gui import qt
from orangecontrib.esrf.core.FastSetupDefineGlobals import *
from orangecontrib.esrf.core.RSyncManager import RSyncManager
from orangecontrib.esrf.web.client import OWClient
logger = logging.getLogger(__name__)
......@@ -134,7 +135,7 @@ OBSERVATION_STATUS = {
" Possible status of tomodir"
class TomoDir(qt.QObject):
class TomoDir(OWClient, qt.QObject):
sigNbDirExplored = qt.Signal(int)
sigAdvanceExploration = qt.Signal(int)
......@@ -174,6 +175,7 @@ class TomoDir(qt.QObject):
:param str destPattern: the pattern that will replace srcPattern in the
scan path
"""
OWClient.__init__(self, logger)
qt.QObject.__init__(self)
self.RootDir = dataDir
self.parsing_dir = dataDir
......@@ -236,7 +238,7 @@ class TomoDir(qt.QObject):
# if on lbsram, .info file is already in dest dir ( /data/visitor...). Copy it
if self.srcPattern is not None:
if not os.path.isdir(self.srcPattern):
if self.srcPattern != '' and not os.path.isdir(self.srcPattern):
warn = "given split pattern %s doesn't exists. Can't apply it" % self.srcPattern
logger.warning(warn)
else:
......@@ -367,7 +369,7 @@ class TomoDir(qt.QObject):
xmlfilelbsram = os.path.join(self.RootDir, self.parsing_dir, aux[len(aux)-1]+self.file_xml_ext)
# if on lbsram, .xml file is copy in /data/visitor...
if self.srcPattern is None:
return os.path.isfile(xmlfilenice)
return os.path.isfile(xmlfilelbsram)
else:
xmlfilenice = xmlfilelbsram.replace(self.srcPattern, self.destPattern)
return os.path.isfile(xmlfilenice) or os.path.isfile(xmlfilelbsram)
......
......@@ -30,10 +30,13 @@ __date__ = "20/01/2017"
import os
from orangecontrib.esrf.core.ftseries import FtserieReconstruction
from orangecontrib.esrf.core.FastSetupDefineGlobals import FastSetupAll
from silx.io.octaveh5 import Octaveh5
import shutil
from glob import glob
import re
import logging
logger = logging.getLogger(__file__)
def getRadioPaths(scanID):
"""Return the dict of radios for the given scanID.
......@@ -255,3 +258,31 @@ def tryToFindH5File(folder, politic):
return result
def saveH5File(structs, h5File, targettedOctaveVersion, displayInfo=True):
"""Function to write the reconstruction parameters into the h5 file
:param dict structs: the reconstruction parameters
:param str h5File: the path to the file to create
:param float targettedOctaveVersion: the octave version which will read the
h5 file
:param bool displayInfo: add information in the log ?
"""
if not h5File.lower().endswith('.h5'):
h5File = h5File + '.h5'
# check that the file exists
if displayInfo is True:
mess = 'try to save .h5 file ()%s ...' % h5File
logger.info(mess)
writer = Octaveh5(targettedOctaveVersion)
try:
writer.open(h5File, 'w')
for structID in structs:
writer.write(structID, structs[structID])
finally:
writer.close()
\ No newline at end of file
......@@ -31,18 +31,21 @@ __date__ = "24/01/2017"
from orangecontrib.esrf.widgets.FolderTransfertWidget import FolderTransfertWidget
from orangecontrib.esrf.core.qtApplicationManager import QApplicationManager
from glob import glob
from orangecontrib.esrf.core.RSyncManager import RSyncManager
from silx.gui.test.utils import TestCaseQt
import unittest
import os
import tempfile
import shutil
import logging
import time
_qapp = QApplicationManager.getInstance()
logging.disable(logging.INFO)
class TestFolderTransfert(unittest.TestCase):
class TestFolderTransfert(TestCaseQt):
"""Simple unit test to test the start/stop observation button action"""
def setUp(self):
......@@ -87,18 +90,40 @@ class TestFolderTransfert(unittest.TestCase):
if os.path.isdir(self.targettedir):
shutil.rmtree(self.targettedir)
# TODO : do the same test if no rsync to make sure previous cross platform
# functions are ok
@unittest.skipIf(RSyncManager.getInstance().canUseRSync() is False,
"Can't access rsync")
def testMoveFiles(self):
"""simple test that files are moved"""
self.folderTransWidget._transfertLocally(self.sourcedir, move=True)
# since RSync is using a thread we need to wait.
# alternative will be to link with RSyncThread.syncFinished
outputdir = os.path.join(self.targettedir, os.path.basename(self.sourcedir))
timeout = 1
while(os.path.isdir(self.sourcedir)) and timeout > 0:
timeout = timeout - 0.1
time.sleep(0.1)
self.assertTrue(os.path.isdir(outputdir))
self.assertTrue(self.checkDataCopied())
@unittest.skipIf(RSyncManager.getInstance().canUseRSync() is False,
"Can't access rsync")
def testCopyFiles(self):
"""Simple test that file are copy and deleted"""
self.folderTransWidget._transfertLocally(self.sourcedir, move=False)
# since RSync is using a thread we need to wait.
# alternative will be to link with RSyncThread.syncFinished
timeout = 1
outputdir = os.path.join(self.targettedir, os.path.basename(self.sourcedir))
while(os.path.isdir(self.sourcedir)) and timeout > 0:
timeout = timeout - 0.1
time.sleep(0.1)
self.assertTrue(os.path.isdir(outputdir))
self.assertTrue(self.checkDataCopied())
......@@ -107,16 +132,23 @@ class TestFolderTransfert(unittest.TestCase):
assert(not os.path.isdir(out))
os.mkdir(out)
assert(os.path.isdir(out))
self.folderTransWidget._transfertLocally(self.sourcedir, move=True, force=False)
self.folderTransWidget._transfertLocally(self.sourcedir,
move=True,
force=False,
noRsync=True)
self.setFiles()
with self.assertRaises(shutil.Error):
self.assertRaises(
self.folderTransWidget._transfertLocally(self.sourcedir,
move=True,
force=False))
force=False,
noRsync=True))
self.folderTransWidget._transfertLocally(self.sourcedir, move=True, force=True)
self.folderTransWidget._transfertLocally(self.sourcedir,
move=True,
force=True,
noRsync=True)
self.assertTrue(self.checkDataCopied())
def testCopyFilesForce(self):
......@@ -124,7 +156,10 @@ class TestFolderTransfert(unittest.TestCase):
assert(not os.path.isdir(out))
os.mkdir(out)
assert(os.path.isdir(out))
self.folderTransWidget._transfertLocally(self.sourcedir, move=False, force=False)
self.folderTransWidget._transfertLocally(self.sourcedir,
move=False,
force=False,
noRsync=True)
self.assertTrue(self.checkDataCopied())
shutil.copytree(out, self.sourcedir)
......@@ -132,9 +167,13 @@ class TestFolderTransfert(unittest.TestCase):
self.assertRaises(
self.folderTransWidget._transfertLocally(self.sourcedir,
move=False,
force=False))
force=False,
noRsync=True))
self.folderTransWidget._transfertLocally(self.sourcedir, move=False, force=True)
self.folderTransWidget._transfertLocally(self.sourcedir,
move=False,
force=True,
noRsync=True)
self.assertTrue(self.checkDataCopied())
def checkDataCopied(self):
......
......@@ -92,7 +92,7 @@ class TestSimplifyH5EditorDisplay(TestCaseQt):
self.assertFalse(widget._qcbKeepBone.isChecked())
def testPYHST(self):
"""Test that the display of the PyHST widget is correct"""
"""test that the display of the PyHST widget is correct"""
widget = self.h5Editor._PyHSTWidget
widget.load(self.input.structures)
......@@ -117,7 +117,7 @@ class TestSimplifyH5EditorDisplay(TestCaseQt):
self.assertTrue(widget._qleVerboseFile.text() == 'pyhst_out.txt')
def testBeamGEO(self):
"""Test that the display of the BeamGeo widget is correct"""
"""test that the display of the BeamGeo widget is correct"""
widget = self.h5Editor._beamGeoWidget
widget.load(self.input.structures)
......@@ -273,7 +273,7 @@ class TestSimplifyH5EditorDisplay(TestCaseQt):
class TestSimplifyH5EditorSave(TestCaseQt):
"""Test that h5editor is returning the correct H5 structures"""
"""test that h5editor is returning the correct H5 structures"""
def setUp(self):
super(TestSimplifyH5EditorSave, self).setUp()
......
......@@ -29,6 +29,7 @@ __date__ = "24/01/2017"
from orangecontrib.esrf.core.qtApplicationManager import QApplicationManager
from orangecontrib.esrf.test.OrangeWorkflowTest import OrangeWorflowTest
from orangecontrib.esrf.core.FastSetupDefineGlobals import FastSetupAll
from orangecontrib.esrf.core.ReconstructionStack import _ReconsFtSeriesThread
from orangecontrib.esrf.test.utils import UtilsTest
import logging
import os
......@@ -84,10 +85,10 @@ class TestScanListFTSerieWorkflow(OrangeWorflowTest):
cls.scanListWidget = cls.getWidgetForNode(cls, nodeScanList)
cls.ftserieWidget = cls.getWidgetForNode(cls, nodeFTSerie)
cls.ftserieWidget.copyH5FileReconsIntoFolder = True
_ReconsFtSeriesThread.setCopyH5FileReconsIntoFolder(True)
# Set we only want to simulate the reconstruction
cls.ftserieWidget.setMockMode(True)
cls.ftserieWidget.reconsStack.setMockMode(True)
@classmethod
def tearDownClass(cls):
......@@ -147,92 +148,94 @@ class TestScanListFTSerieWorkflow(OrangeWorflowTest):
def setH5Exploration(self, b):
"""Activate or not the exploration"""
self.ftserieWidget.setH5Exploration(b)
def testListH5Exploration(self):