Commit 68d53e44 authored by Wout De Nolf's avatar Wout De Nolf Committed by Matias Guijarro

[nexus writer] adapt to ESRF data policy

parent 615f22a4
......@@ -85,7 +85,7 @@ def get_uri(p):
def ensure_existence(
session_name,
server="nexuswriter",
instance="nexuswriters",
instance=None,
domain=None,
family="bliss_nxwriter",
member=None,
......@@ -96,7 +96,7 @@ def ensure_existence(
:param str session_name:
:param str server: device server name
:param str instance: device server instance
:param str instance: device server instance (Default: `session_name`)
:param str domain: location of device
:param str family: type of device
:param str member: device name (Default: `session_name`)
......@@ -108,6 +108,8 @@ def ensure_existence(
member = session_name
if not domain:
domain = beamline()
if not instance:
instance = session_name
dev_name = "/".join([domain, family, member])
if use_existing:
pdev_name = find_session_writer(session_name, db=db)
......@@ -166,9 +168,7 @@ def main():
parser = argparse.ArgumentParser(
description="Register Tango device of session writing"
)
parser.add_argument(
"session", type=str, default="session", help="Bliss session name"
)
parser.add_argument("session", type=str, help="Bliss session name")
parser.add_argument(
"--server",
type=str,
......@@ -178,8 +178,8 @@ def main():
parser.add_argument(
"--instance",
type=str,
default="nexuswriters",
help="Server instance name ('nexuswriters' by default)",
default="",
help="Server instance name (session name by default)",
)
parser.add_argument(
"--domain",
......@@ -200,9 +200,9 @@ def main():
"--ignore_existing",
action="store_false",
dest="use_existing",
help="Ignore existing writer forthis session",
help="Ignore existing writer for this session",
)
logging_utils.add_cli_args(parser)
# logging_utils.add_cli_args(parser)
args, unknown = parser.parse_known_args()
ensure_existence(
args.session,
......
......@@ -14,12 +14,8 @@ API available in bliss session
from nexus_writer_service import *
"""
from .utils import data_policy
from .utils.data_policy import *
from .utils import scan_utils
from .utils.scan_utils import *
__all__ = []
__all__.extend(data_policy.__all__)
__all__.extend(scan_utils.__all__)
......@@ -43,7 +43,7 @@ cli_saveoptions = {
"multivalue_positioners": {
"dest": "multivalue_positioners",
"action": "store_true",
"help": "Allow positioners with multiple values",
"help": "Group positioners values",
},
"enable_external_nonhdf5": {
"dest": "allow_external_nonhdf5",
......@@ -60,7 +60,7 @@ cli_saveoptions = {
"action": "store_true",
"help": "Copy data instead of saving the uri when external linking is disabled",
},
"enable_profiling": {
"resource_profiling": {
"dest": "resource_profiling",
"action": "store_true",
"help": "Enable resource profiling",
......@@ -191,11 +191,11 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
self._h5flush_task_period = 0.5 # flushing blocks the gevent loop
# Cache
self._filename = None
self._subscans = set() # set(Subscan)
self._devices = {} # str -> dict(subscan.name:dict)
self._nxroot = {} # for recursive calling
self._nxentry = None # for recursive calling
self._configurable = False
def _listen_event_loop(self, **kwargs):
"""
......@@ -219,8 +219,8 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
"""
super()._event_loop_initialize(**kwargs)
self.logger.info(
"Start writing to {} with options {}".format(
repr(self.filename), self.saveoptions
"Start writing to {} with options ({}) {}".format(
repr(self.filename), self.__class__.__name__, self.saveoptions
)
)
......@@ -306,14 +306,19 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
"""
Saving intended for this scan?
"""
return self.get_info("save", False)
return (
self.get_info("save", False)
and self.get_info("data_writer", "null") == "nexus"
)
@property
def filename(self):
"""
HDF5 file name for data
"""
return self._filename()
if not self._filename:
self._filename = scan_utils.scan_filename(self.node)
return self._filename
@property
def uris(self):
......@@ -327,32 +332,6 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
def subscan_uri(self, subscan):
return self.filename + "::/" + self._nxentry_name(subscan)
def _filename(self, level=0):
"""
HDF5 file name for data and masters
"""
try:
filename = self.filenames[level]
if not os.path.dirname(filename):
self.logger.warning(
"Filename {} has no directory specified".format(repr(filename))
)
filename = ""
except IndexError:
filename = ""
return filename
@property
def filenames(self):
"""
:returns list: a list of filenames, the first for the dataset
and the others for the masters
"""
if self.save:
return scan_utils.scan_filenames(self.node, config=self._configurable)
else:
return []
@property
def config_devices(self):
return {}
......@@ -429,22 +408,25 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
return "unknown{}D".format(len(node.shape))
@contextmanager
def nxroot(self, level=0):
def nxroot(self, filename=None):
"""
Yields the NXroot instance (h5py.File) or None
when information is missing
"""
nxroot = self._nxroot.get(level, None)
if not filename:
filename = self.filename
if not filename:
self._h5missing("filename")
nxroot = self._nxroot.get(filename, None)
if nxroot is None:
filename = self._filename(level=level)
if filename:
try:
with nexus.nxRoot(filename, **self._nxroot_kwargs) as nxroot:
try:
self._nxroot[level] = nxroot
self._nxroot[filename] = nxroot
yield nxroot
finally:
self._nxroot[level] = None
self._nxroot[filename] = None
except OSError as e:
if nxroot is None and nexus.isLockedError(e):
self._exception_is_fatal = True
......@@ -452,8 +434,6 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
else:
raise
else:
self._h5missing("filenames")
self._nxroot[level] = None
yield None
else:
yield nxroot
......@@ -467,8 +447,8 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
}
@contextmanager
def _modify_nxroot(self, level=0):
with self.nxroot(level=level) as nxroot:
def _modify_nxroot(self, filename=None):
with self.nxroot(filename=filename) as nxroot:
if nxroot is None:
yield nxroot
else:
......@@ -1592,7 +1572,7 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber):
result = []
for refnode in node.get(rproxy.npoints, -1):
if refnode.info.get("save"):
result += scan_utils.scan_uris(refnode, config=self._configurable)
result += scan_utils.scan_uris(refnode)
return result
def _fetch_new_references(self, dproxy, node):
......
......@@ -16,12 +16,16 @@ Configurable Nexus writer listening to Redis events of a scan
import os
import re
import datetime
import logging
from contextlib import contextmanager
from . import scan_writer_base
from ..io import nexus
from ..utils import scan_utils
logger = logging.getLogger(__name__)
cli_saveoptions = dict(scan_writer_base.cli_saveoptions)
cli_saveoptions["stackmca"] = {
"dest": "stack_mcas",
......@@ -54,9 +58,10 @@ class NexusScanWriterConfigurable(scan_writer_base.NexusScanWriterBase):
"""
for option, default in default_saveoptions().items():
kwargs[option] = kwargs.get(option, default)
if kwargs.get("parentlogger") is None:
kwargs["parentlogger"] = logger
super().__init__(*args, **kwargs)
self._applications = {"appxrf": self._save_application_xrf}
self._configurable = True
@property
def config_writer(self):
......@@ -530,32 +535,37 @@ class NexusScanWriterConfigurable(scan_writer_base.NexusScanWriterBase):
if incomplete and notfoundmsg:
self.logger.warning(notfoundmsg)
@property
def master_files(self):
"""
:returns list(str):
"""
return list(scan_utils.scan_master_filenames(self.node).values())
def _create_master_links(self, subscan):
"""
Links to the scan's NXentry
:param Subscan subscan:
"""
filenames = self.filenames
n = len(filenames)
if n <= 1:
filenames = self.master_files
if not filenames:
return
filenames = filenames[1:]
with self.nxentry(subscan) as nxentry:
if nxentry is None:
return
self.logger.info("Create scan links in masters ...")
linkname, ext = os.path.splitext(os.path.basename(nxentry.file.filename))
linkname += ": " + nxentry.name[1:]
for level in range(1, n):
with self.nxroot(level=level) as nxroot:
for filename in filenames:
with self.nxroot(filename=filename) as nxroot:
if nxroot is None:
continue
if linkname in nxroot:
continue
self.logger.info(
"Create link {} in master {}".format(
repr(linkname), repr(nxroot.file.filename)
repr(linkname), repr(filename)
)
)
lnk = nexus.createLink(nxroot, linkname, nxentry)
......@@ -46,7 +46,7 @@ def register_metadata_generators(generators):
generators.set("instrument", fill_instrument_name)
generators.set("device_info", fill_device_info)
generators.set("technique", fill_technique_info)
generators.set("filenames", fill_filenames)
generators.set("masterfiles", fill_masterfiles)
def fill_instrument_name(scan):
......@@ -73,12 +73,15 @@ def fill_technique_info(scan):
return {"technique": current_technique_definition()}
def fill_filenames(scan):
def fill_masterfiles(scan):
"""
:param bliss.scanning.scan.Scan scan:
"""
logger.debug("fill filename info")
return {"filenames": scan_utils.current_filenames()}
logger.debug("fill master filenames")
if scan.scan_info["save"]:
return {"masterfiles": scan_utils.session_master_filenames()}
else:
return {}
def fill_device_info(scan):
......
......@@ -104,21 +104,44 @@ class NexusSessionWriter(base_subscriber.BaseSubscriber):
:param Logger parentlogger:
:param saveoptions:
"""
self.configurable = configurable
self.writers = {}
self.writer_saveoptions = saveoptions
self.purge_delay = purge_delay
self.minimal_purge_delay = 5
self._log_task_period = 5
self._fds = {}
if parentlogger is None:
parentlogger = logger
super().__init__(
db_name, resource_profiling=resource_profiling, parentlogger=parentlogger
)
self._log_task_period = 5
self.saveoptions = saveoptions
if configurable:
self._scan_writer_class = scan_writer_config.NexusScanWriterConfigurable
def update_saveoptions(self, **kwargs):
if "configurable" in kwargs:
self.configurable = kwargs.pop("configurable")
self.writer_saveoptions.update(kwargs)
@property
def saveoptions(self):
d = dict(self.writer_saveoptions)
d["configurable"] = self.configurable
return d
@property
def resource_profiling(self):
return self.writer_saveoptions["resource_profiling"]
@resource_profiling.setter
def resource_profiling(self, value):
self.writer_saveoptions["resource_profiling"] = value
@property
def _scan_writer_class(self):
if self.configurable:
return scan_writer_config.NexusScanWriterConfigurable
else:
self._scan_writer_class = scan_writer_base.NexusScanWriterBase
self.writers = {}
self.minimal_purge_delay = 5
self.purge_delay = purge_delay
self._fds = {}
return scan_writer_base.NexusScanWriterBase
@property
def progress_string(self):
......@@ -159,7 +182,9 @@ class NexusSessionWriter(base_subscriber.BaseSubscriber):
Executed at the start of the event loop
"""
super()._event_loop_initialize(**kwargs)
self.logger.info("Writer started with options {}".format(self.saveoptions))
self.logger.info(
"Session writer started with options {}".format(self.saveoptions)
)
self._fds = {}
def _event_loop_finalize(self, **kwargs):
......@@ -225,8 +250,7 @@ class NexusSessionWriter(base_subscriber.BaseSubscriber):
node.db_name,
node_type=node.node_type,
parentlogger=self.logger,
resource_profiling=self.resource_profiling,
**self.saveoptions,
**self.writer_saveoptions,
)
writer.start()
self.writers[node.name] = writer
......
......@@ -33,7 +33,7 @@ import re
import os
import itertools
from tango import LogLevel
from nexus_writer_service.subscribers.session_writer import NexusSessionWriter
from nexus_writer_service.subscribers import session_writer
from nexus_writer_service.subscribers.scan_writer_base import NexusScanWriterBase
from nexus_writer_service.utils.log_levels import tango_log_level
......@@ -49,7 +49,7 @@ logger = logging.getLogger(__name__)
def session_tango_state(state):
SessionWriterStates = NexusSessionWriter.STATES
SessionWriterStates = session_writer.NexusSessionWriter.STATES
if state == SessionWriterStates.INIT:
return DevState.INIT
elif state == SessionWriterStates.ON:
......@@ -117,13 +117,44 @@ class NexusWriter(Device):
session
- Bliss session name
- Type:'DevString'
copy_nonhdf5_data
- Copy EDF and other data formats to HDF5
keepshape
- Keep shape of multi-dimensional grid scans
- Type:'DevBoolean'
multivalue_positioners
- Group positioners values
- Type:'DevBoolean'
enable_external_nonhdf5
- Enable external non-hdf5 files like edf (ABSOLUTE LINK!)
- Type:'DevBoolean'
disable_external_hdf5
- Disable external hdf5 files (virtual datasets)
- Type:'DevBoolean'
copy_non_external
- Copy data instead of saving the uri when external linking is disabled
- Type:'DevBoolean'
noconfig
- Do not use extra writer information from Redis
- Type:'DevBoolean'
stackmca
- Merged MCA datasets in application definition
- Type:'DevBoolean'
"""
__metaclass__ = DeviceMeta
# PROTECTED REGION ID(NexusWriter.class_variable) ENABLED START #
@property
def saveoptions(self):
saveoptions = session_writer.default_saveoptions()
for attr, attrinfo in session_writer.all_cli_saveoptions().items():
option = attrinfo["dest"]
store_true = attrinfo["action"] == "store_true"
try:
saveoptions[option] = getattr(self, attr) == store_true
except AttributeError:
continue
return saveoptions
# PROTECTED REGION END # // NexusWriter.class_variable
# -----------------
......@@ -132,7 +163,19 @@ class NexusWriter(Device):
session = device_property(dtype="DevString", mandatory=True)
copy_nonhdf5_data = device_property(dtype="DevBoolean", default_value=False)
keepshape = device_property(dtype="DevBoolean", default_value=False)
multivalue_positioners = device_property(dtype="DevBoolean", default_value=False)
enable_external_nonhdf5 = device_property(dtype="DevBoolean", default_value=False)
disable_external_hdf5 = device_property(dtype="DevBoolean", default_value=False)
copy_non_external = device_property(dtype="DevBoolean", default_value=False)
noconfig = device_property(dtype="DevBoolean", default_value=False)
stackmca = device_property(dtype="DevBoolean", default_value=False)
# ----------
# Attributes
......@@ -175,7 +218,9 @@ class NexusWriter(Device):
_logger.set_level(level)
self.session_writer = getattr(self, "session_writer", None)
if self.session_writer is None:
self.session_writer = NexusSessionWriter(self.session, parentlogger=None)
self.session_writer = session_writer.NexusSessionWriter(
self.session, parentlogger=None, **self.saveoptions
)
self.start()
# PROTECTED REGION END # // NexusWriter.init_device
......@@ -431,8 +476,8 @@ class NexusWriter(Device):
:return:None
"""
# Fill with device properties (already done for attributes)
self.session_writer.saveoptions["copy_non_external"] = self.copy_nonhdf5_data
# Fill with device properties (attributes are already set)
self.session_writer.update_saveoptions(**self.saveoptions)
# Greenlet not running or None
self.session_writer.start()
# PROTECTED REGION END # // NexusWriter.start
......
......@@ -9,7 +9,37 @@
<type xsi:type="pogoDsl:StringType"/>
<status abstract="false" inherited="false" concrete="true" concreteHere="true"/>
</deviceProperties>
<deviceProperties name="copy_nonhdf5_data" description="Copy EDF and other data formats to HDF5">
<deviceProperties name="keepshape" description="Keep shape of multi-dimensional grid scans">
<type xsi:type="pogoDsl:BooleanType"/>
<status abstract="false" inherited="false" concrete="true" concreteHere="true"/>
<DefaultPropValue>False</DefaultPropValue>
</deviceProperties>
<deviceProperties name="multivalue_positioners" description="Group positioners values">
<type xsi:type="pogoDsl:BooleanType"/>
<status abstract="false" inherited="false" concrete="true" concreteHere="true"/>
<DefaultPropValue>False</DefaultPropValue>
</deviceProperties>
<deviceProperties name="enable_external_nonhdf5" description="Enable external non-hdf5 files like edf (ABSOLUTE LINK!)">
<type xsi:type="pogoDsl:BooleanType"/>
<status abstract="false" inherited="false" concrete="true" concreteHere="true"/>
<DefaultPropValue>False</DefaultPropValue>
</deviceProperties>
<deviceProperties name="disable_external_hdf5" description="Disable external hdf5 files (virtual datasets)">
<type xsi:type="pogoDsl:BooleanType"/>
<status abstract="false" inherited="false" concrete="true" concreteHere="true"/>
<DefaultPropValue>False</DefaultPropValue>
</deviceProperties>
<deviceProperties name="copy_non_external" description="Copy data instead of saving the uri when external linking is disabled">
<type xsi:type="pogoDsl:BooleanType"/>
<status abstract="false" inherited="false" concrete="true" concreteHere="true"/>
<DefaultPropValue>False</DefaultPropValue>
</deviceProperties>
<deviceProperties name="noconfig" description="Do not use extra writer information from Redis">
<type xsi:type="pogoDsl:BooleanType"/>
<status abstract="false" inherited="false" concrete="true" concreteHere="true"/>
<DefaultPropValue>False</DefaultPropValue>
</deviceProperties>
<deviceProperties name="stackmca" description="Merged MCA datasets in application definition">
<type xsi:type="pogoDsl:BooleanType"/>
<status abstract="false" inherited="false" concrete="true" concreteHere="true"/>
<DefaultPropValue>False</DefaultPropValue>
......
......@@ -15,8 +15,10 @@ Bliss session configuration utilities
import os
import re
from functools import wraps
from bliss import current_session
from bliss.config import static
from bliss.scanning.scan_saving import ScanSaving, with_eval_dict
def static_config():
......@@ -74,15 +76,24 @@ def static_root_find(name, default=None, parent=None):
return {}
def current_scan_saving():
"""
Get session's SCAN_SAVING object
def with_scan_saving(func):
"""Pass the current session's SCAN_SAVING instance as a named argument
:returns bliss.scanning.scan.ScanSaving:
:param callable func:
:returns callable:
"""
return current_session.scan_saving
@wraps(func)
def wrapper(*args, **kwargs):
scan_saving = kwargs.get("scan_saving")
if scan_saving is None:
kwargs["scan_saving"] = ScanSaving(current_session.name)
return func(*args, **kwargs)
return wrapper
@with_scan_saving
def scan_saving_get(attr, default=None, scan_saving=None):
"""
Get attribute from the session's scan saving object
......@@ -92,34 +103,21 @@ def scan_saving_get(attr, default=None, scan_saving=None):
:param bliss.scanning.scan.ScanSaving scan_saving:
:returns str:
"""
if scan_saving is None:
scan_saving = current_scan_saving()
return getattr(scan_saving, attr, default)
def scan_saving_attrs(template=None, scan_saving=None, **overwrite):
@with_eval_dict
@with_scan_saving
def scan_saving_eval(template, scan_saving=None, eval_dict=None):
"""
SCAN_SAVING attributes from template
Evaluate template with SCAN_SAVING attributes and properties.
:param str template: SCAN_SAVING.template when missing
:param str template:
:param bliss.scanning.scan.ScanSaving scan_saving:
:param overwrite: overwrite attribute values
:param dict eval_dict:
:returns str:
"""
if scan_saving is None:
scan_saving = current_scan_saving()
if template is None:
template = scan_saving.template
params = {}
for attr in re.findall(r"\{(.*?)\}", template):
if attr in overwrite:
params[attr] = overwrite[attr]
else:
try:
params[attr] = getattr(scan_saving, attr)
except AttributeError:
pass
return params
return scan_saving.eval_template(template, eval_dict=eval_dict)