diff --git a/nexus_writer_service/subscribers/base_subscriber.py b/nexus_writer_service/subscribers/base_subscriber.py index b9afb69b99b5b7d7d0c3b4fd0d66a40a5d61af97..7c546bf88a11e8abccadee3b523b9ae000309e4b 100644 --- a/nexus_writer_service/subscribers/base_subscriber.py +++ b/nexus_writer_service/subscribers/base_subscriber.py @@ -55,7 +55,7 @@ def get_node(node_type, db_name): ) -class BaseSubscriber(object): +class BaseSubscriber: """ Listen to events of a particular Redis node @@ -84,14 +84,70 @@ class BaseSubscriber(object): STATES.FAULT: [], } + @enum.unique + class PROFILE_PARAMETERS(enum.IntEnum): + OFF = enum.auto() + CPU30 = enum.auto() + CPU50 = enum.auto() + CPU100 = enum.auto() + WALL30 = enum.auto() + WALL50 = enum.auto() + WALL100 = enum.auto() + MEM30 = enum.auto() + MEM50 = enum.auto() + MEM100 = enum.auto() + + _profile_arguments = { + PROFILE_PARAMETERS.OFF: {}, + PROFILE_PARAMETERS.CPU30: { + "memory": False, + "time": True, + "clock": "cpu", + "timelimit": 30, + }, + PROFILE_PARAMETERS.CPU50: { + "memory": False, + "time": True, + "clock": "cpu", + "timelimit": 50, + }, + PROFILE_PARAMETERS.CPU100: { + "memory": False, + "time": True, + "clock": "cpu", + "timelimit": 100, + }, + PROFILE_PARAMETERS.WALL30: { + "memory": False, + "time": True, + "clock": "wall", + "timelimit": 30, + }, + PROFILE_PARAMETERS.WALL50: { + "memory": False, + "time": True, + "clock": "wall", + "timelimit": 50, + }, + PROFILE_PARAMETERS.WALL100: { + "memory": False, + "time": True, + "clock": "wall", + "timelimit": 100, + }, + PROFILE_PARAMETERS.MEM30: {"memory": True, "time": False, "memlimit": 30}, + PROFILE_PARAMETERS.MEM50: {"memory": True, "time": False, "memlimit": 50}, + PROFILE_PARAMETERS.MEM100: {"memory": True, "time": False, "memlimit": 100}, + } + def __init__( - self, db_name, node_type=None, parentlogger=None, resource_profiling=False + self, db_name, node_type=None, parentlogger=None, resource_profiling=None ): """ :param str db_name: :param str node_type: :param parentlogger: - :param bool resource_profiling: + :param PROFILE_PARAMETERS resource_profiling: """ self.state = self.STATES.INIT self.state_reason = "instantiation" @@ -331,19 +387,9 @@ class BaseSubscriber(object): Greenlet main function without the resource (de)allocation """ try: - if self.resource_profiling: - with profiling.profile( - memory=False, - time=True, - memlimit=30, - timelimit=30, - sortby="tottime", - clock="cpu", - color=False, - filename=True, - units="MB", - logger=logger, - ): + kw = self._get_profile_arguments() + if kw: + with profiling.profile(**kw): self._listen_event_loop() else: self._listen_event_loop() @@ -368,6 +414,33 @@ class BaseSubscriber(object): if self.end_time is None: self.end_time = datetime.datetime.now() + @property + def resource_profiling(self): + return self._resource_profiling + + @resource_profiling.setter + def resource_profiling(self, value): + if value is None: + value = self.PROFILE_PARAMETERS.OFF + self._resource_profiling = value + + def _get_profile_arguments(self): + """ + :returns dict or None: + """ + kwargs = self._profile_arguments.get(self.resource_profiling, None) + if kwargs: + kwargs.update( + { + "sortby": "tottime", + "color": False, + "filename": True, + "units": "MB", + "logger": logger, + } + ) + return kwargs + def _listen_event_loop(self, **kwargs): """ Listen to Redis events diff --git a/nexus_writer_service/subscribers/scan_writer_base.py b/nexus_writer_service/subscribers/scan_writer_base.py index 9715f665a5319d8711f275921161344b581ca0cb..1d928573117f2b5def330cec2167d8eb81e33eba 100644 --- a/nexus_writer_service/subscribers/scan_writer_base.py +++ b/nexus_writer_service/subscribers/scan_writer_base.py @@ -37,6 +37,16 @@ from .nxdata_proxy import NXdataProxy logger = logging.getLogger(__name__) +resource_profiling_choices = base_subscriber.BaseSubscriber.PROFILE_PARAMETERS + + +def resource_profiling_from_string(s): + try: + return resource_profiling_choices[s.upper()] + except KeyError: + raise ValueError() + + cli_saveoptions = { "keepshape": { "dest": "flat", @@ -65,17 +75,23 @@ cli_saveoptions = { }, "resource_profiling": { "dest": "resource_profiling", - "action": "store_true", + "default": resource_profiling_choices.OFF, + "type": resource_profiling_from_string, + "choices": list(resource_profiling_choices), "help": "Enable resource profiling", }, } def default_saveoptions(): - return { - options["dest"]: options["action"] == "store_false" - for options in cli_saveoptions.values() - } + saveoptions = {} + for name, options in cli_saveoptions.items(): + if "default" in options: + v = options["default"] + else: + v = options["action"] == "store_false" + saveoptions[options["dest"]] = v + return saveoptions def timediff(tend, tstart): @@ -88,7 +104,7 @@ def timediff(tend, tstart): return "NaN" -class Subscan(object): +class Subscan: def __init__(self, subscriber, node, parentlogger=None): """ :param BaseSubscriber subscriber: @@ -169,7 +185,7 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber): self, db_name, node_type=None, - resource_profiling=False, + resource_profiling=None, parentlogger=None, **saveoptions, ): @@ -177,7 +193,7 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber): :param str db_name: :param str node_type: :param Logger parentlogger: - :param bool resource_profiling: + :param PROFILE_PARAMETERS resource_profiling: :param saveoptions: """ if parentlogger is None: @@ -353,7 +369,7 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber): def uris(self): filename = self.filename ret = {} - for subscan in self._subscans: + for subscan in list(self._subscans): name = self._nxentry_name(subscan) ret[name.split(".")[-1]] = filename + "::/" + name return [v for _, v in sorted(ret.items())] @@ -384,7 +400,7 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber): @property def scan_number(self): - return self.get_info("scan_nb") + return self.get_info("scan_nb", cache=True) @property def _expected_subscans(self): @@ -396,7 +412,7 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber): @property def _enabled_subscans(self): - for subscan in self._subscans: + for subscan in list(self._subscans): if subscan.enabled: yield subscan @@ -598,7 +614,9 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber): self._h5missing("subscan " + repr(subscan.name)) return None try: - name = scan_utils.scan_name(self.node, i + 1) + # name = scan_utils.scan_name(self.node, i + 1) + # More efficient (caching) + name = f"{self.scan_number}.{i + 1}" except AttributeError as e: self._h5missing(str(e)) return None @@ -860,8 +878,8 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber): Total bytes (data-only) of all subscans """ nbytes = 0 - for subscan in self._subscans: - for dproxy in subscan.datasets.values(): + for subscan in list(self._subscans): + for dproxy in list(subscan.datasets.values()): if dproxy is not None: nbytes += dproxy.current_bytes return nbytes @@ -872,8 +890,8 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber): Mininal/maximal scan data progress """ lst = [] - for subscan in self._subscans: - for dproxy in subscan.datasets.values(): + for subscan in list(self._subscans): + for dproxy in list(subscan.datasets.values()): if dproxy is not None: lst.append(dproxy.progress) if lst: @@ -891,9 +909,9 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber): def getprogress(tpl): return tpl[1] - for subscan in self._subscans: + for subscan in list(self._subscans): lst = [] - for dproxy in subscan.datasets.values(): + for dproxy in list(subscan.datasets.values()): if dproxy is not None: lst.append(dproxy.progress_string) if lst: @@ -992,8 +1010,8 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber): All detector dimensions """ ret = set() - for subscan in self._subscans: - for dproxy in subscan.datasets.values(): + for subscan in list(self._subscans): + for dproxy in list(subscan.datasets.values()): if dproxy is not None: ret.add(dproxy.detector_ndim) return ret @@ -1132,8 +1150,7 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber): # unknown, unexpected or disabled return dproxy # Proxy already initialized? - datasets = subscan.datasets - dproxy = datasets.get(node.fullname) + dproxy = subscan.datasets.get(node.fullname) if dproxy is not None: return dproxy # Already fully initialized in Redis? @@ -1178,7 +1195,7 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber): parentlogger=subscan.logger, **data_info, ) - datasets[node.fullname] = dproxy + subscan.datasets[node.fullname] = dproxy self._add_to_dataset_links(subscan, dproxy) subscan.logger.debug("New data node " + str(dproxy)) return dproxy @@ -1229,7 +1246,7 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber): :param bliss.data.node.DataNode node: :returns Subscan: """ - for subscan in self._subscans: + for subscan in list(self._subscans): if subscan.hasnode(node): break else: @@ -1522,7 +1539,7 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber): scanshape = self.scan_save_shape(subscan) scanndim = len(scanshape) scanshapes = [] - for dproxy in subscan.datasets.values(): + for dproxy in list(subscan.datasets.values()): scanshapes.append(dproxy.current_scan_save_shape) if expand: scanshape = tuple(max(lst) if any(lst) else 1 for lst in zip(*scanshapes)) @@ -1530,7 +1547,7 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber): scanshape = tuple( min(i for i in lst if i) if any(lst) else 1 for lst in zip(*scanshapes) ) - for dproxy in subscan.datasets.values(): + for dproxy in list(subscan.datasets.values()): dproxy.reshape(scanshape, None) def _fetch_new_redis_data(self, dproxy, node, event_data=None): @@ -1641,7 +1658,7 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber): :param bool onlymasters: only positioners that are master in the acquisition chain :returns str, DatasetProxy: fullname and dataset handles """ - for fullname, dproxy in subscan.datasets.items(): + for fullname, dproxy in list(subscan.datasets.items()): if dproxy.device_type in ("positioner", "positionergroup"): if onlyprincipals and dproxy.data_type != "principal": continue @@ -1656,7 +1673,7 @@ class NexusScanWriterBase(base_subscriber.BaseSubscriber): :param Subscan subscan: :returns str, DatasetProxy: fullname and dataset handle """ - for fullname, dproxy in subscan.datasets.items(): + for fullname, dproxy in list(subscan.datasets.items()): if dproxy.device_type not in ("positioner", "positionergroup"): yield fullname, dproxy diff --git a/nexus_writer_service/subscribers/scan_writer_config.py b/nexus_writer_service/subscribers/scan_writer_config.py index 2c48376accc34e45ead5e0df5f4e0d21e7658903..c0d3b907f965e61658fe05c884fba5b5003f34b2 100644 --- a/nexus_writer_service/subscribers/scan_writer_config.py +++ b/nexus_writer_service/subscribers/scan_writer_config.py @@ -36,10 +36,14 @@ cli_saveoptions["stackmca"] = { def default_saveoptions(): - return { - options["dest"]: options["action"] == "store_false" - for options in cli_saveoptions.values() - } + saveoptions = {} + for name, options in cli_saveoptions.items(): + if "default" in options: + v = options["default"] + else: + v = options["action"] == "store_false" + saveoptions[options["dest"]] = v + return saveoptions class NexusScanWriterConfigurable(scan_writer_base.NexusScanWriterBase): @@ -172,7 +176,7 @@ class NexusScanWriterConfigurable(scan_writer_base.NexusScanWriterBase): :param Subscan subscan: :returns str, DatasetProxy: fullname and dataset handle """ - for fullname, dproxy in subscan.datasets.items(): + for fullname, dproxy in list(subscan.datasets.items()): if dproxy.device_type == "mca" and dproxy.data_type == "principal": yield fullname, dproxy diff --git a/nexus_writer_service/subscribers/session_writer.py b/nexus_writer_service/subscribers/session_writer.py index 768db30f027151528274f8251c2de183317970ba..64df55e710877028beed2676f3faed037ba1a84a 100755 --- a/nexus_writer_service/subscribers/session_writer.py +++ b/nexus_writer_service/subscribers/session_writer.py @@ -90,7 +90,7 @@ class NexusSessionWriter(base_subscriber.BaseSubscriber): configurable=True, purge_delay=300, parentlogger=None, - resource_profiling=False, + resource_profiling=None, **saveoptions, ): """ @@ -98,7 +98,8 @@ class NexusSessionWriter(base_subscriber.BaseSubscriber): :param bool configurable: generic or configurable writer :param int purge_delay: purge finished scans after x seconds :param Logger parentlogger: - :param saveoptions: + :param PROFILE_PARAMETERS resource_profiling: + :param **saveoptions: """ self.configurable = configurable self.writers = {} @@ -130,6 +131,8 @@ class NexusSessionWriter(base_subscriber.BaseSubscriber): @resource_profiling.setter def resource_profiling(self, value): + if value is None: + value = self.PROFILE_PARAMETERS.OFF self.writer_saveoptions["resource_profiling"] = value @property @@ -142,7 +145,7 @@ class NexusSessionWriter(base_subscriber.BaseSubscriber): @property def progress_string(self): n = len(self.writers) - nactive = sum(w.active for w in self.writers.values()) + nactive = sum(w.active for w in list(self.writers.values())) return "{} scan writers ({} active)".format(n, nactive) @property @@ -297,15 +300,22 @@ class NexusSessionWriter(base_subscriber.BaseSubscriber): def log_progress(self, msg=None): n = len(self.writers) - nactive = sum(w.active for w in self.writers.values()) + nactive = sum(w.active for w in list(self.writers.values())) if msg: msg = "{} ({} scan writers, {} active)".format(msg, n, nactive) else: msg = "{} scan writers ({} active)".format(n, nactive) self.logger.info(msg) - if self.resource_profiling: + if self.resource_profiling != self.PROFILE_PARAMETERS.OFF: self.log_resources() + def _get_profile_arguments(self): + """ + :returns dict or None: + """ + # No time or CPU profiling for the session + return None + @property def resources(self): nfds = len(process_utils.file_descriptors()) @@ -321,7 +331,7 @@ class NexusSessionWriter(base_subscriber.BaseSubscriber): @property def state(self): if self._state == self.STATES.ON: - if any(writer.active for writer in self.writers.values()): + if any(writer.active for writer in list(self.writers.values())): return self.STATES.RUNNING return self._state @@ -336,7 +346,7 @@ class NexusSessionWriter(base_subscriber.BaseSubscriber): return list( name for name, writer in sorted( - self.writers.items(), key=lambda item: item[1].sort_key + list(self.writers.items()), key=lambda item: item[1].sort_key ) ) @@ -373,7 +383,7 @@ class NexusSessionWriter(base_subscriber.BaseSubscriber): ret[name] = getter(writer) else: for name, writer in sorted( - self.writers.items(), key=lambda item: item[1].sort_key + list(self.writers.items()), key=lambda item: item[1].sort_key ): ret[name] = getter(writer) return ret diff --git a/nexus_writer_service/tango/servers/NexusWriter.py b/nexus_writer_service/tango/servers/NexusWriter.py index 86d8fa80fdae1985d5338cddf70f18fdd9682cad..6ec44f76364617d91d1aa6bc8fddd0823ad5b070 100644 --- a/nexus_writer_service/tango/servers/NexusWriter.py +++ b/nexus_writer_service/tango/servers/NexusWriter.py @@ -19,7 +19,7 @@ import tango from tango import DebugIt from tango.server import run -from tango.server import Device, DeviceMeta +from tango.server import Device from tango.server import attribute, command from tango.server import device_property from tango import AttrQuality, DispLevel, DevState @@ -34,6 +34,7 @@ import re import os import sys import itertools +from tango import Attribute from nexus_writer_service.patching import freeze_subprocess import nexus_writer_service @@ -117,11 +118,32 @@ read_log_level = { write_log_level = {v: k for k, v in read_log_level.items()} +read_resource_profiling = { + p: i for i, p in enumerate(NexusScanWriterBase.PROFILE_PARAMETERS) +} +write_resource_profiling = {i: p for p, i in read_resource_profiling.items()} + + # PROTECTED REGION END # // NexusWriter.additionnal_import __all__ = ["NexusWriter", "main"] +class Resource_profiling(enum.IntEnum): + """Python enumerated type for Resource_profiling attribute.""" + + OFF = 0 + CPU30 = 1 + CPU50 = 2 + CPU100 = 3 + WALL30 = 4 + WALL50 = 5 + WALL100 = 6 + MEM30 = 7 + MEM50 = 8 + MEM100 = 9 + + class Writer_log_level(enum.IntEnum): """Python enumerated type for Writer_log_level attribute.""" @@ -176,19 +198,38 @@ class NexusWriter(Device): - Type:'DevBoolean' """ - __metaclass__ = DeviceMeta # PROTECTED REGION ID(NexusWriter.class_variable) ENABLED START # + def getfromself(self, name): + """Get python attribute + """ + v = getattr(self, name) + if isinstance(v, Attribute): + # Tango attribute + return getattr(self.proxy, name) + else: + # Tango property or python attribute + return v + @property def saveoptions(self): saveoptions = session_writer.default_saveoptions() - for attr, attrinfo in session_writer.all_cli_saveoptions().items(): - option = attrinfo["dest"] - store_true = attrinfo["action"] == "store_true" - try: - saveoptions[option] = getattr(self, attr) == store_true - except AttributeError: - continue + for name, info in session_writer.all_cli_saveoptions().items(): + if "action" in info: + store_true = info["action"] == "store_true" + try: + v = self.getfromself(name) == store_true + except AttributeError: + continue + else: + try: + v = self.getfromself(name) + except AttributeError: + continue + if name == "resource_profiling": + # isinstance(v, Resource_profiling) is not True ??? + v = write_resource_profiling[v] + saveoptions[info["dest"]] = v return saveoptions # PROTECTED REGION END # // NexusWriter.class_variable @@ -217,7 +258,9 @@ class NexusWriter(Device): # Attributes # ---------- - resource_profiling = attribute(dtype="DevBoolean", access=AttrWriteType.READ_WRITE) + resource_profiling = attribute( + dtype=Resource_profiling, access=AttrWriteType.READ_WRITE + ) writer_log_level = attribute( dtype=Writer_log_level, access=AttrWriteType.READ_WRITE @@ -251,6 +294,7 @@ class NexusWriter(Device): """Initialises the attributes and properties of the NexusWriter.""" Device.init_device(self) # PROTECTED REGION ID(NexusWriter.init_device) ENABLED START # + self.proxy = tango.DeviceProxy(self.get_name()) self.session_writer = getattr(self, "session_writer", None) if self.session_writer is None: log_levels.init_tango_log_level(device=self) @@ -283,13 +327,13 @@ class NexusWriter(Device): def read_resource_profiling(self): # PROTECTED REGION ID(NexusWriter.resource_profiling_read) ENABLED START # """Return the resource_profiling attribute.""" - return self.session_writer.resource_profiling + return read_resource_profiling[self.session_writer.resource_profiling] # PROTECTED REGION END # // NexusWriter.resource_profiling_read def write_resource_profiling(self, value): # PROTECTED REGION ID(NexusWriter.resource_profiling_write) ENABLED START # """Set the resource_profiling attribute.""" - self.session_writer.resource_profiling = value + self.session_writer.resource_profiling = write_resource_profiling[value] # PROTECTED REGION END # // NexusWriter.resource_profiling_write def read_writer_log_level(self): diff --git a/nexus_writer_service/tango/servers/NexusWriter.xmi b/nexus_writer_service/tango/servers/NexusWriter.xmi index c771ae6b0b9796c6f920d61d45e570406dd0bb5e..59d88e4ccaa7e7b94b83cce7f1d3fdfa7d3ce57b 100644 --- a/nexus_writer_service/tango/servers/NexusWriter.xmi +++ b/nexus_writer_service/tango/servers/NexusWriter.xmi @@ -1,6 +1,6 @@ - + @@ -162,12 +162,22 @@ - + + OFF + CPU30 + CPU50 + CPU100 + WALL30 + WALL50 + WALL100 + MEM30 + MEM50 + MEM100 diff --git a/nexus_writer_service/utils/profiling.py b/nexus_writer_service/utils/profiling.py index 6b84c1069297ca1d9d83358e87312e1e1219db86..9380602472bebfe63a21e5b01cbce7fdf16234f4 100644 --- a/nexus_writer_service/utils/profiling.py +++ b/nexus_writer_service/utils/profiling.py @@ -17,6 +17,7 @@ import os from io import StringIO import logging from contextlib import contextmanager, ExitStack +from time import time from .logging_utils import log from ..io import io_utils @@ -57,9 +58,22 @@ def durationfmt(x): return "%8.3f" % x -def print_malloc_snapshot( - snapshot, logger=None, key_type="lineno", limit=10, units="KB" -): +@contextmanager +def execute_min_duration(func, min_duration=None): + """Execute function when the time >= min_duration. + + :param callable func: + :param num min_duration: + """ + t0 = time() + try: + yield + finally: + if not min_duration or (time() - t0) >= min_duration: + func() + + +def log_malloc_snapshot(snapshot, logger=None, key_type="lineno", limit=10, units="KB"): """ :param tracemalloc.Snapshot snapshot: :param str key_type: @@ -111,7 +125,7 @@ DEFAULT_SORTBY = "tottime" DEFAULT_CLOCK = "cpu" -def print_pstats_snapshot( +def log_pstats_snapshot( snapshot, logger=None, timelimit=None, sortby=None, color=False ): """ @@ -153,7 +167,7 @@ pstats_to_yappi_sort = { } -def print_yappi_snapshot(snapshot, logger=None, sortby=None): +def log_yappi_snapshot(snapshot, logger=None, sortby=None): """ :param YFuncStat snapshot: :param logger: @@ -175,25 +189,37 @@ def print_yappi_snapshot(snapshot, logger=None, sortby=None): @contextmanager -def memory_context(logger=None, **kwargs): +def memory_context(logger=None, min_duration=None, **kwargs): """ :param logger: - :param **kwargs: see print_malloc_snapshot + :param **kwargs: see log_malloc_snapshot """ if tracemalloc is None: log(logger, "tracemalloc required") return + tracemalloc.clear_traces() tracemalloc.start() + + def func(): + snapshot = tracemalloc.take_snapshot() + log_malloc_snapshot(snapshot, logger=logger, **kwargs) + try: - yield + with execute_min_duration(func, min_duration=min_duration): + yield finally: - snapshot = tracemalloc.take_snapshot() - print_malloc_snapshot(snapshot, logger=logger, **kwargs) + tracemalloc.stop() @contextmanager def time_context_cprofile( - logger=None, timelimit=None, clock=None, sortby=None, color=False, filename=None + logger=None, + timelimit=None, + clock=None, + sortby=None, + color=False, + filename=None, + min_duration=None, ): """ :param logger: @@ -202,15 +228,15 @@ def time_context_cprofile( :param str sortby: sort time profile :param bool color: :param str or bool filename: + :param num min_duration: no output when time < min_duration """ pr = cProfile.Profile() pr.enable() - try: - yield - finally: - pr.disable() + + def func(): + nonlocal filename snapshot = pstats.Stats(pr) - print_pstats_snapshot( + log_pstats_snapshot( snapshot, logger=logger, timelimit=timelimit, sortby=sortby, color=color ) if filename: @@ -221,10 +247,23 @@ def time_context_cprofile( snapshot.dump_stats(filename) log(logger, f"Statistics saved as {repr(filename)}") + with execute_min_duration(func, min_duration=min_duration): + try: + yield + finally: + pr.disable() + @contextmanager def time_context_yappi( - logger=None, timelimit=None, clock=None, sortby=None, color=False, filename=None + logger=None, + timelimit=None, + clock=None, + sortby=None, + color=False, + filename=None, + min_duration=None, + yappi_output=False, ): """ :param logger: @@ -233,23 +272,26 @@ def time_context_yappi( :param str sortby: sort time profile :param bool color: :param str or bool filename: + :param num min_duration: no output when time < min_duration + :param bool yappi_output: output formatted by yappi """ if clock not in ("cpu", "wall"): clock = DEFAULT_CLOCK yappi.set_clock_type(clock) yappi.clear_stats() yappi.start(builtins=False) - try: - yield - finally: - yappi.stop() + + def func(): + nonlocal filename stats = yappi.get_func_stats() yappi.clear_stats() - # print_yappi_snapshot(stats) - snapshot = yappi.convert2pstats(stats) - print_pstats_snapshot( - snapshot, logger=logger, timelimit=timelimit, sortby=sortby, color=color - ) + if yappi_output: + log_yappi_snapshot(stats) + else: + snapshot = yappi.convert2pstats(stats) + log_pstats_snapshot( + snapshot, logger=logger, timelimit=timelimit, sortby=sortby, color=color + ) if filename: if not isinstance(filename, str): filename = DEFAULT_FILENAME @@ -260,6 +302,12 @@ def time_context_yappi( # snapshot.save(filename) log(logger, f"Statistics saved as {repr(filename)}") + with execute_min_duration(func, min_duration=min_duration): + try: + yield + finally: + yappi.stop() + if yappi is None: time_context = time_context_cprofile @@ -269,7 +317,7 @@ else: @contextmanager def profile_context( - memory=True, + memory=False, time=True, memlimit=10, timelimit=None, @@ -279,6 +327,7 @@ def profile_context( filename=None, units="KB", logger=None, + min_duration=None, ): """ :param bool memory: profile memory usage @@ -291,6 +340,7 @@ def profile_context( :param str or bool filename: dump for visual tools :param str units: memory units :param logger: + :param num min_duration: no output when time < min_duration """ with ExitStack() as stack: if time: @@ -301,10 +351,13 @@ def profile_context( color=color, filename=filename, logger=logger, + min_duration=min_duration, ) stack.enter_context(ctx) if memory: - ctx = memory_context(limit=memlimit, units=units, logger=logger) + ctx = memory_context( + limit=memlimit, units=units, logger=logger, min_duration=min_duration + ) stack.enter_context(ctx) yield diff --git a/nexus_writer_service/utils/scan_utils.py b/nexus_writer_service/utils/scan_utils.py index 8bd95f8f652ecfadbb427523d10da3023152d9a8..049a8a5fe85944cb0f8d03d7229bd047a2026252 100644 --- a/nexus_writer_service/utils/scan_utils.py +++ b/nexus_writer_service/utils/scan_utils.py @@ -32,6 +32,20 @@ def scan_info(scan): return scan.info.get_all() +def scan_info_get(scan, key, default=None): + """ + :param Scan or ScanNode scan: + :param str key: + :param default: + """ + try: + # Scan + return scan.scan_info.get(key, default) + except AttributeError: + # ScanNode + return scan.info.get(key, default) + + def scan_node(scan): """ :param Scan or ScanNode scan: @@ -61,9 +75,10 @@ def scan_name(scan, subscan=1): :param Scan or ScanNode scan: :returns str: """ - info = scan_info(scan) - if info["data_writer"] == "nexus": - return "{}.{}".format(info["scan_nb"], subscan) + data_writer = scan_info_get(scan, "data_writer") + if data_writer == "nexus": + scan_nb = scan_info_get(scan, "scan_nb") + return f"{scan_nb}.{subscan}" else: name = scan_node(scan).name if subscan == 1: @@ -80,7 +95,7 @@ def scan_filename(scan): :param Scan or ScanNode scan: :returns str or None: """ - return scan_info(scan).get("filename", None) + return scan_info_get(scan, "filename") @config_utils.with_scan_saving @@ -106,8 +121,8 @@ def scan_master_filenames(scan, config=True): """ if not config: return {} - info = scan_info(scan) - return info.get("nexuswriter", {}).get("masterfiles", {}) + info = scan_info_get(scan, "nexuswriter", {}) + return info.get("masterfiles", {}) @config_utils.with_scan_saving @@ -141,12 +156,12 @@ def scan_filenames(scan, config=True): :returns dict(str): """ filenames = {} - info = scan_info(scan) - filename = info.get("filename", None) + filename = scan_info_get(scan, "filename", None) if filename: filenames["dataset"] = filename if config: - filenames.update(info.get("nexuswriter", {}).get("masterfiles", {})) + info = scan_info_get(scan, "nexuswriter", {}) + filenames.update(info.get("masterfiles", {})) return filenames @@ -197,7 +212,7 @@ def scan_uris(scan, subscan=None): try: tree = scan.acq_chain._tree except AttributeError: - nsubscans = len(scan_info(scan)["acquisition_chain"]) + nsubscans = len(scan_info_get(scan, "acquisition_chain", [])) else: nsubscans = len(tree.children(tree.root)) subscans = range(1, nsubscans + 1) diff --git a/tests/nexus_writer/conftest.py b/tests/nexus_writer/conftest.py index ce0b573f4796411ec979be4a00b6cb98c345beb4..0d3bb60ab5c68d775a9aa8aba147135e0a675c75 100644 --- a/tests/nexus_writer/conftest.py +++ b/tests/nexus_writer/conftest.py @@ -259,12 +259,11 @@ def writer_process(session=None, tmpdir=None, config=True, alt=False, **kwargs): yield greenlet -def writer_options(tango=True, config=True, alt=False, resource_profiling=False): +def writer_options(tango=True, config=True, alt=False): """ :param bool tango: launch writer as process/tango server :param bool config: writer uses/ignores extra Redis info :param bool alt: anable all options (all disabled by default) - :param bool resource_profiling: """ fixed = ( "copy_non_external", @@ -273,18 +272,18 @@ def writer_options(tango=True, config=True, alt=False, resource_profiling=False) "disable_external_hdf5", ) options = all_cli_saveoptions(configurable=config) + resource_profiling = options.pop("resource_profiling")["default"] if tango: properties = {"copy_non_external": True} attributes = {} properties["noconfig"] = not config - attributes["resource_profiling"] = resource_profiling + attributes["resource_profiling"] = int(resource_profiling) - 1 # to tango enum properties.update({k: alt for k in options if k not in fixed}) else: cliargs = ["--copy_non_external"] if not config: cliargs.append("--noconfig") - if resource_profiling: - cliargs.append("--resource_profiling") + cliargs.append("--resource_profiling " + resource_profiling.name) if alt: cliargs += ["--" + k for k in options if k not in fixed] if tango: diff --git a/tests/nexus_writer/test_nxw_tango.py b/tests/nexus_writer/test_nxw_tango.py index 1877e29899ec90524e81167134932110e6adf993..25752647889522b1e849f8cab5d728c149dc0aba 100644 --- a/tests/nexus_writer/test_nxw_tango.py +++ b/tests/nexus_writer/test_nxw_tango.py @@ -20,7 +20,6 @@ def test_nxw_tango_logging(nexus_writer_config): _test_nxw_tango_logging(**nexus_writer_config) -@pytest.mark.xfail(reason="Writer is not responsive enough") def test_nxw_tango_api(nexus_writer_config): _test_nxw_tango_api(**nexus_writer_config) diff --git a/tests/test_configuration/sessions/nexus_writer_sessions.yml b/tests/test_configuration/sessions/nexus_writer_sessions.yml index a8371f748d82b941e419aa4df1b7891dfdf1e27e..f28c17977693532878b2aea77fa13e0b389abdc9 100644 --- a/tests/test_configuration/sessions/nexus_writer_sessions.yml +++ b/tests/test_configuration/sessions/nexus_writer_sessions.yml @@ -1,6 +1,17 @@ - class: Session name: nexus_writer_session setup-file: ./nexus_writer_session_setup.py + scan_saving: + class: ESRFScanSaving + beamline: id00 + tmp_data_root: + fs1: /tmp/scans/fs1/{beamline}/tmp + fs2: /tmp/scans/fs2/{beamline}/tmp + icat_tmp_data_root: /tmp/scans/fsi/{beamline}/tmp_icat + visitor_data_root: + fs1: /tmp/scans/fs1/visitor + fs3: /tmp/scans/fs3/visitor + inhouse_data_root: /tmp/scans/fsi/{beamline}/inhouse config-objects: - m1 - roby