Commit 4205cdcc authored by payno's avatar payno

reformat files using black

parent 1a01f2ed
Pipeline #31652 failed with stages
in 4 minutes and 1 second
This diff is collapsed.
......@@ -36,12 +36,15 @@ __date__ = "06/12/2019"
import logging
logging.basicConfig()
import sys
from silx.utils.launcher import Launcher
try:
import est._version
import_version = True
except ImportError:
import_version = False
......@@ -57,20 +60,21 @@ def main():
:returns: The execution status
"""
if import_version is False:
_version = '???'
_version = "???"
else:
_version = est._version.version
launcher = Launcher(prog="est", version=_version)
launcher.add_command("canvas",
module_name="est.app.canvas",
description="open the orange-canvas")
launcher.add_command("process",
module_name="est.app.process",
description="process a workflow from a .xml.ows file"
"description")
launcher.add_command("test",
module_name="est.app.test_",
description="Launch est unittest")
launcher.add_command(
"canvas", module_name="est.app.canvas", description="open the orange-canvas"
)
launcher.add_command(
"process",
module_name="est.app.process",
description="process a workflow from a .xml.ows file" "description",
)
launcher.add_command(
"test", module_name="est.app.test_", description="Launch est unittest"
)
status = launcher.execute(sys.argv)
return status
......
from .launcher import Launcher
\ No newline at end of file
from .launcher import Launcher
......@@ -20,9 +20,7 @@ class EstConfig(config.Config):
@staticmethod
def core_packages():
return super(EstConfig, EstConfig).core_packages() + [
"est-add-on",
]
return super(EstConfig, EstConfig).core_packages() + ["est-add-on"]
@staticmethod
def application_icon():
......@@ -31,16 +29,26 @@ class EstConfig(config.Config):
class EstSplashScreen(qt.QSplashScreen):
"""SplashScreen to overwrite the one of Orange"""
def __init__(self, parent=None, pixmap=None, textRect=None,
textFormat=qt.Qt.PlainText, **kwargs):
def __init__(
self,
parent=None,
pixmap=None,
textRect=None,
textFormat=qt.Qt.PlainText,
**kwargs
):
qt.QSplashScreen.__init__(self, parent, pixmap=pixmap, **kwargs)
def showMessage(self, message, alignment=qt.Qt.AlignLeft, color=qt.Qt.black):
super().showMessage(version(), qt.Qt.AlignCenter | qt.Qt.AlignBottom, qt.QColor("#e68f17"))
super().showMessage(
version(), qt.Qt.AlignCenter | qt.Qt.AlignBottom, qt.QColor("#e68f17")
)
class Launcher:
"""Proxy to orange-canvas"""
def launch(self, argv):
config.Config = EstConfig
self.fix_application_dirs()
......@@ -91,4 +99,5 @@ class Launcher:
def main(self, argv):
from Orange.canvas.__main__ import main
main(argv)
......@@ -11,7 +11,7 @@ def splash_screen():
note: QRect is used by orange to define a mask to display the message.
In our case we overwrite the QSplashScreen so we don't need this.
"""
pixmap = icons.getQPixmap('est')
pixmap = icons.getQPixmap("est")
return pixmap, qt.QRect(0, 0, 0, 0)
......@@ -21,5 +21,5 @@ def getIcon():
:return: application icon
:rtype: QIcon
"""
pixmap = icons.getQPixmap('est')
pixmap = icons.getQPixmap("est")
return qt.QIcon(pixmap)
......@@ -2,8 +2,10 @@ import argparse
import sys
from ..pushworkflow.scheme.parser import scheme_load
import logging
try:
import h5py
has_h5py = True
except:
has_h5py = False
......@@ -17,23 +19,24 @@ def exec_(scheme, input_=None, output_=None):
if input_:
# monkey patch the input file for start nodes if an input is given
for node in scheme.nodes:
if node.properties and '_input_file_setting' in node.properties:
node.properties['_input_file_setting'] = input_
if node.properties and "_input_file_setting" in node.properties:
node.properties["_input_file_setting"] = input_
manage_input()
def manage_output():
if output_:
# monkey patch the input file for start nodes if an input is given
for node in scheme.nodes:
if node.properties and '_output_file_setting' in node.properties:
node.properties['_output_file_setting'] = output_
if node.properties and "_output_file_setting" in node.properties:
node.properties["_output_file_setting"] = output_
# TODO: if the input file contains configuration, then overwrite the node
# properties...
manage_output()
if len(scheme._start_actor.listDownStreamActor) is 0:
_logger.warning('no downstream node defined')
_logger.warning("no downstream node defined")
return None
scheme._start_actor.trigger(input_)
scheme._end_actor.join()
......@@ -51,18 +54,27 @@ def getInputFrmHdf5(file_path):
def main(argv):
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
'workflow_file',
help='Workflow file providing the workflow description (.ows, .xml)')
parser.add_argument("-i", dest="input_", default=None,
help="Input of the workflow. require at most one "
"instance of XASInputOW")
parser.add_argument("-o", dest="output_", default=None,
help="Output file of the workflow. Require at most one "
"instance of XASOutputOW")
"workflow_file",
help="Workflow file providing the workflow description (.ows, .xml)",
)
parser.add_argument(
"-i",
dest="input_",
default=None,
help="Input of the workflow. require at most one " "instance of XASInputOW",
)
parser.add_argument(
"-o",
dest="output_",
default=None,
help="Output file of the workflow. Require at most one "
"instance of XASOutputOW",
)
options = parser.parse_args(argv[1:])
scheme = scheme_load(_file=options.workflow_file)
exec_(scheme, options.input_, options.output_)
if __name__ == "__main__":
main(sys.argv)
......@@ -3,8 +3,10 @@ import sys
import logging
from est.core.types import XASObject
from est.core.reprocessing import get_process_instance_frm_h5_desc
try:
import h5py
has_h5py = True
except:
has_h5py = False
......@@ -13,26 +15,29 @@ logging.basicConfig(level=logging.DEBUG)
_logger = logging.getLogger(__name__)
def exec_(h5_file, entry='scan1', spectra_path=None, energy_path=None,
configuration_path=None):
def exec_(
h5_file, entry="scan1", spectra_path=None, energy_path=None, configuration_path=None
):
try:
xas_obj = XASObject.from_file(h5_file=h5_file,
entry=entry or 'scan1',
spectra_path=spectra_path or 'data/absorbed_beam',
energy_path=energy_path or 'data/energy',
configuration_path=configuration_path)
xas_obj = XASObject.from_file(
h5_file=h5_file,
entry=entry or "scan1",
spectra_path=spectra_path or "data/absorbed_beam",
energy_path=energy_path or "data/energy",
configuration_path=configuration_path,
)
xas_obj.link_to_h5(h5_file=h5_file)
except Exception as e:
_logger.error(e)
xas_obj = None
if not xas_obj:
_logger.error('Unable to create an object from the given file and path')
_logger.error("Unable to create an object from the given file and path")
xas_processes = xas_obj.get_process_flow()
if len(xas_processes) == 0:
_logger.warning('no xas process found in the given h5 file')
_logger.warning("no xas process found in the given h5 file")
return
xas_obj.clean_process_flow()
for i_process, process in xas_processes.items():
......@@ -53,21 +58,39 @@ def getInputFrmHdf5(file_path):
def main(argv):
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
'workflow_file',
help='Workflow file providing the workflow description (.h5)')
parser.add_argument("--spectra_path", dest="spectra_path", default=None,
help="path to the spectra data")
parser.add_argument("--energy_path", dest="energy_path", default=None,
help="path to the energy data")
parser.add_argument("--configuration_path", dest="configuration_path", default=None,
help="path to the process configuration")
parser.add_argument("--entry", dest="entry to load",
default=None,
help="path to the process configuration")
"workflow_file", help="Workflow file providing the workflow description (.h5)"
)
parser.add_argument(
"--spectra_path",
dest="spectra_path",
default=None,
help="path to the spectra data",
)
parser.add_argument(
"--energy_path",
dest="energy_path",
default=None,
help="path to the energy data",
)
parser.add_argument(
"--configuration_path",
dest="configuration_path",
default=None,
help="path to the process configuration",
)
parser.add_argument(
"--entry",
dest="entry to load",
default=None,
help="path to the process configuration",
)
options = parser.parse_args(argv[1:])
exec_(h5_file=options.workflow_file, spectra_path=options.spectra_path,
energy_path=options.energy_path,
configuration_path=options.configuration_path)
exec_(
h5_file=options.workflow_file,
spectra_path=options.spectra_path,
energy_path=options.energy_path,
configuration_path=options.configuration_path,
)
if __name__ == "__main__":
......
......@@ -34,6 +34,7 @@ from est.core.utils import DownloadDataset
from ...pushworkflow.scheme.parser import scheme_load
from ..process import exec_
import os
try:
import PyMca5
except ImportError:
......@@ -43,37 +44,38 @@ else:
from PyMca5.PyMcaDataDir import PYMCA_DATA_DIR
@unittest.skipIf(has_pymca is False, 'PyMca5 is not installed')
@unittest.skipIf(has_pymca is False, "PyMca5 is not installed")
class TestWorkflowFromOwsFile(unittest.TestCase):
"""test construction of XAS object"""
def setUp(self):
self.outputdir = tempfile.mkdtemp()
file_ = 'pymca_workflow_2.ows'
DownloadDataset(dataset=file_,
output_folder=self.outputdir,
timeout=10.0)
file_ = "pymca_workflow_2.ows"
DownloadDataset(dataset=file_, output_folder=self.outputdir, timeout=10.0)
self.orange_file = os.path.join(self.outputdir, file_)
self.input_file1 = os.path.join(PYMCA_DATA_DIR, "EXAFS_Cu.dat")
self.output_file = os.path.join(self.outputdir, 'output.h5')
self.output_file = os.path.join(self.outputdir, "output.h5")
def tearDown(self):
shutil.rmtree(self.outputdir)
def testPyMcaWorkflow(self):
"""Test regarding the instantiation of the pymcaXAS"""
exec_(scheme=scheme_load(self.orange_file),
input_=self.input_file1,
output_=self.output_file)
exec_(
scheme=scheme_load(self.orange_file),
input_=self.input_file1,
output_=self.output_file,
)
self.assertTrue(os.path.exists(self.output_file))
# TODO: check the values in the output file
def suite():
test_suite = unittest.TestSuite()
for ui in (TestWorkflowFromOwsFile, ):
for ui in (TestWorkflowFromOwsFile,):
test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(ui))
return test_suite
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main(defaultTest="suite")
......@@ -38,6 +38,7 @@ from est.core.utils import spectra as spectra_utils
import urllib.request
from est.core.types import Spectrum
from ..reprocessing import exec_
try:
import PyMca5
except ImportError:
......@@ -62,17 +63,19 @@ else:
from est.io.utils.larch import read_ascii
@unittest.skipIf(has_pymca is False, 'PyMca5 is not installed')
@unittest.skipIf(has_pymca is False, "PyMca5 is not installed")
class TestReprocessingPyMca(unittest.TestCase):
"""test reprocessing work for some pymca process"""
def setUp(self):
self.output_dir = tempfile.mkdtemp()
data_file = os.path.join(PYMCA_DATA_DIR, "EXAFS_Cu.dat")
energy, mu = read_spectrum(data_file)
self.spectrum = Spectrum(energy=energy, mu=mu)
self.xas_obj_ref = XASObject(spectra=(self.spectrum, ), energy=energy,
dim1=1, dim2=1)
self.h5_file = os.path.join(self.output_dir, 'output_file.h5')
self.xas_obj_ref = XASObject(
spectra=(self.spectrum,), energy=energy, dim1=1, dim2=1
)
self.h5_file = os.path.join(self.output_dir, "output_file.h5")
out = PyMca_normalization()(xas_obj=self.xas_obj_ref)
out = PyMca_exafs()(xas_obj=out)
out = PyMca_k_weight()(xas_obj=out)
......@@ -91,37 +94,42 @@ class TestReprocessingPyMca(unittest.TestCase):
def test(self):
res_xas_obj = exec_(self.h5_file)
self.assertEqual(res_xas_obj, self.xas_obj_ref)
numpy.testing.assert_allclose(res_xas_obj.spectra[0].ft.intensity,
self.xas_obj_ref.spectra[0].ft.intensity)
numpy.testing.assert_allclose(
res_xas_obj.spectra[0].ft.intensity,
self.xas_obj_ref.spectra[0].ft.intensity,
)
@unittest.skipIf(has_larch is False, 'Larch is not installed')
@unittest.skipIf(has_larch is False, "Larch is not installed")
class TestReprocessingLarch(unittest.TestCase):
"""test reprocessing work for some larch process"""
def setUp(self):
self.output_dir = tempfile.mkdtemp()
xmu_url = "https://raw.githubusercontent.com/xraypy/xraylarch/master/examples/xafs/cu_rt01.xmu"
self.data_file = os.path.join(self.output_dir, 'cu_rt01.xmu')
self.data_file = os.path.join(self.output_dir, "cu_rt01.xmu")
with urllib.request.urlopen(xmu_url) as response, open(self.data_file, 'wb') as out_file:
with urllib.request.urlopen(xmu_url) as response, open(
self.data_file, "wb"
) as out_file:
data = response.read() # a `bytes` object
out_file.write(data)
assert os.path.exists(self.data_file)
energy, mu = read_ascii(self.data_file)
self.spectrum = Spectrum(energy=energy, mu=mu)
spectra = (self.spectrum, )
self.xas_obj_ref = XASObject(spectra=spectra, energy=energy, dim1=1,
dim2=1)
self.h5_file = os.path.join(self.output_dir, 'output_file.h5')
spectra = (self.spectrum,)
self.xas_obj_ref = XASObject(spectra=spectra, energy=energy, dim1=1, dim2=1)
self.h5_file = os.path.join(self.output_dir, "output_file.h5")
pre_edge_process = Larch_pre_edge()
pre_edge_process.setProperties({'rbkg': 1.0, 'kweight': 2})
pre_edge_process.setProperties({"rbkg": 1.0, "kweight": 2})
autobk_process = Larch_autobk()
autobk_process.setProperties({'kmin': 2, 'kmax': 16, 'dk': 3,
'window': 'hanning', 'kweight': 2})
autobk_process.setProperties(
{"kmin": 2, "kmax": 16, "dk": 3, "window": "hanning", "kweight": 2}
)
xftf_process = Larch_xftf()
xftf_process.setProperties({'kweight': 2})
xftf_process.setProperties({"kweight": 2})
out = pre_edge_process(xas_obj=self.xas_obj_ref)
out = autobk_process(xas_obj=out)
......@@ -138,8 +146,9 @@ class TestReprocessingLarch(unittest.TestCase):
def test(self):
res_xas_obj = exec_(self.h5_file)
self.assertEqual(res_xas_obj, self.xas_obj_ref)
numpy.testing.assert_allclose(res_xas_obj.spectra[0].chir_mag,
self.xas_obj_ref.spectra[0].chir_mag)
numpy.testing.assert_allclose(
res_xas_obj.spectra[0].chir_mag, self.xas_obj_ref.spectra[0].chir_mag
)
def suite():
......@@ -149,5 +158,5 @@ def suite():
return test_suite
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main(defaultTest="suite")
......@@ -93,16 +93,29 @@ def main(argv):
from silx.test import utils
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("-v", "--verbose", default=0,
action="count", dest="verbose",
help="Increase verbosity. Option -v prints additional " +
"INFO messages. Use -vv for full verbosity, " +
"including debug messages and test help strings.")
parser.add_argument("--qt-binding", dest="qt_binding", default=None,
help="Force using a Qt binding: 'PyQt5' or 'PySide2'")
parser.add_argument("--web", dest="web_log", default=False,
help="Force unit test to export his log to graylog'",
action="store_true")
parser.add_argument(
"-v",
"--verbose",
default=0,
action="count",
dest="verbose",
help="Increase verbosity. Option -v prints additional "
+ "INFO messages. Use -vv for full verbosity, "
+ "including debug messages and test help strings.",
)
parser.add_argument(
"--qt-binding",
dest="qt_binding",
default=None,
help="Force using a Qt binding: 'PyQt5' or 'PySide2'",
)
parser.add_argument(
"--web",
dest="web_log",
default=False,
help="Force unit test to export his log to graylog'",
action="store_true",
)
utils.test_options.add_parser_argument(parser)
options = parser.parse_args(argv[1:])
......@@ -149,6 +162,7 @@ def main(argv):
unittest.installHandler()
import est.test
test_suite = unittest.TestSuite()
test_suite.addTest(est.test.suite())
result = runner.run(test_suite)
......
......@@ -30,6 +30,7 @@ from larch.xafs.autobk import autobk
import multiprocessing
import functools
import logging
_logger = logging.getLogger(__name__)
_DEBUG = True
......@@ -37,8 +38,14 @@ if _DEBUG:
from larch.symboltable import Group
def process_spectr_autobk(spectrum, configuration, overwrite=True,
callbacks=None, output=None, output_dict=None):
def process_spectr_autobk(
spectrum,
configuration,
overwrite=True,
callbacks=None,
output=None,
output_dict=None,
):
"""
:param spectrum: spectrum to process
......@@ -56,20 +63,38 @@ def process_spectr_autobk(spectrum, configuration, overwrite=True,
:return: processed spectrum
:rtype: tuple (configuration, spectrum)
"""
_logger.debug('start autobk on spectrum (%s, %s)' % (spectrum.x, spectrum.y))
_logger.debug("start autobk on spectrum (%s, %s)" % (spectrum.x, spectrum.y))
assert isinstance(spectrum, Spectrum)
if spectrum.energy is None or spectrum.mu is None:
_logger.error('Energy and or Mu is/are not specified, unable to '
'compute pre edge')
_logger.error(
"Energy and or Mu is/are not specified, unable to " "compute pre edge"
)
_conf = configuration
if 'autobk' in _conf:
_conf = _conf['autobk']
if "autobk" in _conf:
_conf = _conf["autobk"]
opts = {}
for opt_name in ('rbkg', 'nknots', 'e0', 'edge_step', 'kmin', 'kmax',
'kweight', 'dk', 'win', 'k_std', 'chi_std', 'nfft',
'kstep', 'pre_edge_kws', 'nclamp', 'clamp_lo', 'clamp_hi',
'calc_uncertainties', 'err_sigma'):
for opt_name in (
"rbkg",
"nknots",
"e0",
"edge_step",
"kmin",
"kmax",
"kweight",
"dk",
"win",
"k_std",
"chi_std",
"nfft",
"kstep",
"pre_edge_kws",
"nclamp",
"clamp_lo",
"clamp_hi",
"calc_uncertainties",
"err_sigma",
):
if opt_name in _conf:
opts[opt_name] = _conf[opt_name]
......@@ -110,35 +135,37 @@ _USE_MULTIPROCESSING_POOL = False
class Larch_autobk(Process):
def __init__(self):
Process.__init__(self, name='autobk')
Process.__init__(self, name="autobk")
def setProperties(self, properties):
if '_larchSettings' in properties:
self._settings = properties['_larchSettings']
if "_larchSettings" in properties:
self._settings = properties["_larchSettings"]
def process(self, xas_obj):
_xas_obj = self.getXasObject(xas_obj=xas_obj)
if self._settings:
_xas_obj.configuration['autobk'] = self._settings
_xas_obj.configuration["autobk"] = self._settings
self._advancement.reset(max_=_xas_obj.n_spectrum)
self._advancement.startProcess()
self._pool_process(xas_obj=_xas_obj)
self._advancement.endProcess()
self.register_process(_xas_obj,