Skip to content
Snippets Groups Projects
Commit 80ef3fcb authored by Wout De Nolf's avatar Wout De Nolf
Browse files

create SplitBlissScan task using split_bliss_scan

parent d7ac1e41
No related branches found
No related tags found
1 merge request!178Resolve "Workflow to split multi-XAS scan"
Pipeline #223667 passed
from est.core.split_monotonic import split_bliss_scan
from ewokscore import execute_graph
from ewoksutils.task_utils import task_inputs
if __name__ == "__main__":
filename = "/data/scisoft/ewoks/ch7280/id24-dcm/20250131/RAW_DATA/Ru_WVC1/Ru_WVC1_1_RT_air/Ru_WVC1_1_RT_air.h5"
scannr = 1
monotonic_channel = "measurement/energy_enc"
out_filename = "/tmp/test.h5"
inputs = {
"filename": "/data/scisoft/ewoks/ch7280/id24-dcm/20250131/RAW_DATA/Ru_WVC1/Ru_WVC1_1_RT_air/Ru_WVC1_1_RT_air.h5",
"scan_number": 1,
"monotonic_channel": "measurement/energy_enc",
"out_filename": "/tmp/test.h5",
}
split_bliss_scan(filename, scannr, monotonic_channel, out_filename)
node = {
"task_type": "class",
"task_identifier": "est.core.process.split.SplitBlissScan",
}
graph = {"graph": {"graph_version": "1.1", "id": "split"}, "nodes": [node]}
print(execute_graph(graph, inputs=task_inputs(inputs=inputs)))
from ewokscore.task import Task
from est.core.split_monotonic import split_bliss_scan
class SplitBlissScan(
Task,
input_names=["filename", "scan_number", "monotonic_channel", "out_filename"],
optional_input_names=["retry_timeout", "retry_period"],
output_names=["out_urls"],
):
def run(self):
self.outputs.out_urls = split_bliss_scan(**self.get_input_values())
import logging
from typing import List
import h5py
import numpy
......@@ -12,19 +13,28 @@ _logger = logging.getLogger(__name__)
def split_bliss_scan(
filename: str, scannr: int, monotonic_channel: str, out_filename: str, **retry_args
):
entry_name = f"{scannr}.1"
filename: str,
scan_number: int,
monotonic_channel: str,
out_filename: str,
**retry_args,
) -> List[str]:
entry_name = f"{scan_number}.1"
_wait_scan_finished(filename, entry_name, **retry_args)
out_urls = []
with h5py_utils.File(filename) as nxroot_in:
nxentry_in = nxroot_in[entry_name]
monotonic_values = nxentry_in[monotonic_channel][()]
monotonic_slices = split_piecewise_monotonic(monotonic_values)
for subscannr, subscan_slice in enumerate(monotonic_slices, 1):
_save_subscan(nxentry_in, scannr, subscannr, out_filename, subscan_slice)
for subscan_number, subscan_slice in enumerate(monotonic_slices, 1):
out_url = _save_subscan(
nxentry_in, scan_number, subscan_number, out_filename, subscan_slice
)
out_urls.append(out_url)
return out_urls
@h5py_utils.retry(retry_timeout=60)
......@@ -38,18 +48,20 @@ def _wait_scan_finished(filename: str, entry_name: str) -> None:
def _save_subscan(
nxentry_in: h5py.Group,
scannr: int,
subscannr: int,
scan_number: int,
subscan_number: int,
out_filename: str,
subscan_slice: slice,
) -> None:
) -> str:
entry_name = f"{scan_number}.{subscan_number}"
out_url = f"silx://{out_filename}::/{entry_name}"
with h5py.File(out_filename, mode="a", track_order=True) as nxroot_out:
entry_name = f"{scannr}.{subscannr}"
if entry_name in nxroot_out:
_logger.warning("%s::/%s already exists", out_filename, entry_name)
return
return out_url
nxentry_out = nxroot_out.create_group(entry_name)
_save_subgroup(nxentry_in, nxentry_out, subscan_slice)
return out_url
def _save_subgroup(
......
......@@ -4,9 +4,17 @@ from silx.io import dictdump
from silx.utils.retry import RetryTimeoutError
from est.core.split_monotonic import split_bliss_scan
from est.core.process.split import SplitBlissScan
def test_split_bliss_scan(tmp_path):
def split_bliss_scan_task(**kwargs):
task = SplitBlissScan(inputs=kwargs)
task.execute()
return task.outputs.out_urls
@pytest.mark.parametrize("split_function", [split_bliss_scan, split_bliss_scan_task])
def test_split_bliss_scan(tmp_path, split_function):
bliss_scan_data = {
"1.1": {
"end_time": "",
......@@ -57,13 +65,21 @@ def test_split_bliss_scan(tmp_path):
dictdump.dicttoh5(bliss_scan_data, in_file)
out_file = str(tmp_path / "out.h5")
split_bliss_scan(in_file, 1, "group1/dataset3", out_file)
out_urls = split_function(
filename=in_file,
scan_number=1,
monotonic_channel="group1/dataset3",
out_filename=out_file,
)
assert out_urls == [f"silx://{out_file}::/1.1", f"silx://{out_file}::/1.2"]
split_data = _normalize_h5data(dictdump.h5todict(out_file, asarray=False))
assert split_data == expected_split_data
def test_split_bliss_scan_timeout(tmp_path):
@pytest.mark.parametrize("split_function", [split_bliss_scan, split_bliss_scan_task])
def test_split_bliss_scan_timeout(tmp_path, split_function):
bliss_scan_data = {
"1.1": {
"@attr1": "value1",
......@@ -85,8 +101,19 @@ def test_split_bliss_scan_timeout(tmp_path):
out_file = tmp_path / "out.h5"
with pytest.raises(RetryTimeoutError):
split_bliss_scan(in_file, 1, "group1/dataset3", str(out_file), retry_timeout=1)
if split_function is split_bliss_scan_task:
exc = RuntimeError
else:
exc = RetryTimeoutError
with pytest.raises(exc):
_ = split_function(
filename=in_file,
scan_number=1,
monotonic_channel="group1/dataset3",
out_filename=out_file,
retry_timeout=0.1,
)
assert not out_file.exists()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment