scan_writer_base.py 62.2 KB
Newer Older
1 2
# -*- coding: utf-8 -*-
#
3
# This file is part of the nexus writer service of the BLISS project.
4
#
5 6 7 8 9
# Code is maintained by the ESRF Data Analysis Unit.
#
# Original author: Wout de Nolf
#
# Copyright (c) 2015-2019 ESRF
10 11 12
# Distributed under the GNU LGPLv3. See LICENSE for more info.

"""
13
Basic Nexus writer listening to Redis events of a scan
14 15 16 17 18 19 20 21 22
"""

import gevent
import os
import numpy
import traceback
import logging
import datetime
from contextlib import contextmanager
Wout De Nolf's avatar
Wout De Nolf committed
23
from bliss.data.node import get_node
24
from . import devices
Wout De Nolf's avatar
Wout De Nolf committed
25
from . import dataset_proxy
26
from . import reference_proxy
Wout De Nolf's avatar
Wout De Nolf committed
27
from . import base_subscriber
28
from ..io import nexus
29
from ..utils import scan_utils
Wout De Nolf's avatar
Wout De Nolf committed
30 31
from ..utils.logging_utils import CustomLogger
from ..utils.array_order import Order
32 33 34 35 36


logger = logging.getLogger(__name__)


Wout De Nolf's avatar
Wout De Nolf committed
37 38 39 40 41 42 43 44 45
cli_saveoptions = {
    "keepshape": {
        "dest": "flat",
        "action": "store_false",
        "help": "Keep shape of multi-dimensional grid scans",
    },
    "multivalue_positioners": {
        "dest": "multivalue_positioners",
        "action": "store_true",
46
        "help": "Group positioners values",
Wout De Nolf's avatar
Wout De Nolf committed
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
    },
    "enable_external_nonhdf5": {
        "dest": "allow_external_nonhdf5",
        "action": "store_true",
        "help": "Enable external non-hdf5 files like edf (ABSOLUTE LINK!)",
    },
    "disable_external_hdf5": {
        "dest": "allow_external_hdf5",
        "action": "store_false",
        "help": "Disable external hdf5 files (virtual datasets)",
    },
    "copy_non_external": {
        "dest": "copy_non_external",
        "action": "store_true",
        "help": "Copy data instead of saving the uri when external linking is disabled",
    },
63
    "resource_profiling": {
64
        "dest": "resource_profiling",
Wout De Nolf's avatar
Wout De Nolf committed
65
        "action": "store_true",
66
        "help": "Enable resource profiling",
Wout De Nolf's avatar
Wout De Nolf committed
67
    },
68 69 70
}


Wout De Nolf's avatar
Wout De Nolf committed
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
def default_saveoptions():
    return {
        options["dest"]: options["action"] == "store_false"
        for options in cli_saveoptions.values()
    }


class Subscan(object):
    def __init__(self, subscriber, node, parentlogger=None):
        """
        :param BaseSubscriber subscriber:
        :param DataNode node:
        :param Logger parentlogger:
        """
        self.name = node.name
        self.db_name = node.db_name
        self.fullname = node.fullname
        self.node_type = node.node_type

        self.enabled = True
        self.datasets = {}  # bliss.data.node.DataNode.fullname -> DatasetProxy
92
        self.references = {}  # bliss.data.node.DataNode.fullname -> ReferenceProxy
Wout De Nolf's avatar
Wout De Nolf committed
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
        self._info_cache = {}  # cache calls to self.get_info
        if parentlogger is None:
            parentlogger = logger
        self.logger = CustomLogger(parentlogger, self)

        # Associate resources with greenlet
        glt = subscriber._greenlet
        try:
            nodes = glt._subscan_nodes
        except AttributeError:
            nodes = glt._subscan_nodes = {}
        nodes[self.db_name] = node
        self._subscriber = subscriber

    def hasnode(self, node):
        """
        :param DataNode node:
        """
        return node.db_name.startswith(self.db_name)

    def __repr__(self):
        return self.db_name

    def __str__(self):
        return "{} ({})".format(self.name, "ENABLED" if self.enabled else "DISABLED")

    @property
    def node(self):
        if self._subscriber._local_greenlet:
            return self._subscriber._greenlet._subscan_nodes[self.db_name]
        else:
            return get_node(self.node_type, self.db_name)

    def get_info(self, key, default=None, cache=False):
        """
        Get from the node's info dictionary.
        Try subscribers's info dictionary when key is missing.

        :param str key:
        :param default: never cached
        :param bool cache: cache this value when retrieved
        """
        if cache:
            try:
                return self._info_cache[key]
            except KeyError:
                pass
        try:
            result = self.node.info[key]
        except KeyError:
            result = self._subscriber.get_info(key, default=default, cache=cache)
        if cache:
            self._info_cache[key] = result
        return result
147 148


Wout De Nolf's avatar
Wout De Nolf committed
149
class NexusScanWriterBase(base_subscriber.BaseSubscriber):
150
    """
Wout De Nolf's avatar
Wout De Nolf committed
151
    Listen to events of a particular scan and write the result in Nexus format.
152 153 154
    No configuration needed.
    """

Wout De Nolf's avatar
Wout De Nolf committed
155 156 157 158
    def __init__(
        self,
        db_name,
        node_type=None,
159
        resource_profiling=False,
Wout De Nolf's avatar
Wout De Nolf committed
160 161 162
        parentlogger=None,
        **saveoptions
    ):
163
        """
Wout De Nolf's avatar
Wout De Nolf committed
164 165 166
        :param str db_name:
        :param str node_type:
        :param Logger parentlogger:
167
        :param bool resource_profiling:
168 169 170 171
        :param saveoptions:
        """
        if parentlogger is None:
            parentlogger = logger
Wout De Nolf's avatar
Wout De Nolf committed
172
        super().__init__(
173 174 175 176
            db_name,
            node_type=node_type,
            parentlogger=parentlogger,
            resource_profiling=resource_profiling,
177
        )
Wout De Nolf's avatar
Wout De Nolf committed
178

179
        # Save options
Wout De Nolf's avatar
Wout De Nolf committed
180
        for option, default in default_saveoptions().items():
181
            saveoptions[option] = saveoptions.get(option, default)
Wout De Nolf's avatar
Wout De Nolf committed
182 183
        saveoptions["short_names"] = True
        saveoptions["expand_variable_length"] = True
Wout De Nolf's avatar
Wout De Nolf committed
184 185 186
        saveoptions["hold_file_open"] = True
        saveoptions["enable_file_locking"] = True
        saveoptions["swmr"] = False
187
        self.saveoptions = saveoptions
Wout De Nolf's avatar
Wout De Nolf committed
188 189 190 191 192
        self.saveorder = Order("C")
        # self.publishorder = Order("C")  # not true for snake scans
        # self.plotorder = Order("C")  # currently the same as saveorder
        self._h5flush_task_period = 0.5  # flushing blocks the gevent loop

193
        # Cache
194
        self._filename = None
Wout De Nolf's avatar
Wout De Nolf committed
195 196 197 198
        self._subscans = set()  # set(Subscan)
        self._devices = {}  # str -> dict(subscan.name:dict)
        self._nxroot = {}  # for recursive calling
        self._nxentry = None  # for recursive calling
199

Wout De Nolf's avatar
Wout De Nolf committed
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
    def _listen_event_loop(self, **kwargs):
        """
        Listen to Redis events
        """
        if not self.save:
            self.logger.info("No saving requested")
            return
        if not self.filename:
            raise RuntimeError("No filename specified")
        if self.saveoptions["hold_file_open"]:
            with self.nxroot() as nxroot:
                kwargs["nxroot"] = nxroot
                super()._listen_event_loop(**kwargs)
        else:
            super()._listen_event_loop(**kwargs)
215

Wout De Nolf's avatar
Wout De Nolf committed
216 217 218 219 220 221
    def _event_loop_initialize(self, **kwargs):
        """
        Executed at the start of the event loop
        """
        super()._event_loop_initialize(**kwargs)
        self.logger.info(
222 223
            "Start writing to {} with options ({}) {}".format(
                repr(self.filename), self.__class__.__name__, self.saveoptions
Wout De Nolf's avatar
Wout De Nolf committed
224 225
            )
        )
226

Wout De Nolf's avatar
Wout De Nolf committed
227
    def _event_loop_finalize(self, **kwargs):
228
        """
Wout De Nolf's avatar
Wout De Nolf committed
229
        Executed at the end of the event loop
230
        """
231
        with self._capture_finalize_exceptions():
Wout De Nolf's avatar
Wout De Nolf committed
232
            self._finalize_hdf5()
233 234
        self.log_progress("Finished writing to {}".format(repr(self.filename)))
        super()._event_loop_finalize(**kwargs)
235

Wout De Nolf's avatar
Wout De Nolf committed
236
    def _register_event_loop_tasks(self, nxroot=None, **kwargs):
237
        """
Wout De Nolf's avatar
Wout De Nolf committed
238
        Tasks to be run periodically after succesfully processing a Redis event
239
        """
Wout De Nolf's avatar
Wout De Nolf committed
240 241 242 243 244
        super()._register_event_loop_tasks(**kwargs)
        if nxroot is not None:
            self._periodic_tasks.append(
                base_subscriber.PeriodicTask(nxroot.flush, self._h5flush_task_period)
            )
245

Wout De Nolf's avatar
Wout De Nolf committed
246
    def _finalize_hdf5(self):
247
        """
Wout De Nolf's avatar
Wout De Nolf committed
248 249 250 251 252 253 254 255 256 257
        Finish writing
        """
        if self._exception_is_fatal:
            self.logger.info("Finalization skipped due fatal errors")
            return
        else:
            self.logger.info("Finalize writing to {}".format(repr(self.filename)))

        self.logger.info("Fetch last data")
        for node in self._nodes:
258 259
            with self._capture_finalize_exceptions():
                self._fetch_data(node, last=True)
260

Wout De Nolf's avatar
Wout De Nolf committed
261 262 263 264
        # Skip because fix length scans can have variable data points
        # self.logger.info(
        #    "Ensure all dataset have the same number of points"
        # )
265
        #
Wout De Nolf's avatar
Wout De Nolf committed
266
        # for subscan in self._enabled_subscans:
267 268
        #    with self._capture_finalize_exceptions():
        #        self._ensure_same_length(subscan)
Wout De Nolf's avatar
Wout De Nolf committed
269

270 271
        self.logger.info("Link external data (VDS or raw)")
        for node in self._nodes:
272 273
            with self._capture_finalize_exceptions():
                self._ensure_dataset_existance(node)
274

275 276 277 278 279
        self.logger.info("Save detector metadata")
        skip = set()
        for node in self._nodes:
            self._fetch_node_metadata(node, skip)

Wout De Nolf's avatar
Wout De Nolf committed
280 281 282 283
        for subscan in self._enabled_subscans:
            self._finalize_subscan(subscan)
            self._mark_done(subscan)

284 285 286 287 288 289 290 291 292 293
    @contextmanager
    def _capture_finalize_exceptions(self):
        try:
            yield
        except BaseException as e:
            self._set_state(self.STATES.FAULT, e)
            self.logger.error(
                "Captured finalization exception:\n{}".format(traceback.format_exc())
            )

Wout De Nolf's avatar
Wout De Nolf committed
294
    def get_subscan_info(self, subscan, key, default=None, cache=False):
295
        """
Wout De Nolf's avatar
Wout De Nolf committed
296 297 298 299 300 301 302 303 304
        Get from the subscan's info dictionary.
        Try scan's info dictionary when key is missing.

        :param Subscan subscan:
        :param str key: subscan node's info key
        :param default:
        :param bool cache:
        """
        return subscan.get_info(key, default=default, cache=cache)
305 306 307 308 309 310

    @property
    def save(self):
        """
        Saving intended for this scan?
        """
311 312 313 314
        return (
            self.get_info("save", False)
            and self.get_info("data_writer", "null") == "nexus"
        )
315 316 317 318

    @property
    def filename(self):
        """
Wout De Nolf's avatar
Wout De Nolf committed
319
        HDF5 file name for data
320
        """
321 322 323
        if not self._filename:
            self._filename = scan_utils.scan_filename(self.node)
        return self._filename
324

Wout De Nolf's avatar
Wout De Nolf committed
325 326 327 328 329 330 331 332 333 334 335 336
    @property
    def uris(self):
        filename = self.filename
        ret = {}
        for subscan in self._subscans:
            name = self._nxentry_name(subscan)
            ret[name.split(".")[-1]] = filename + "::/" + name
        return [v for _, v in sorted(ret.items())]

    def subscan_uri(self, subscan):
        return self.filename + "::/" + self._nxentry_name(subscan)

337 338 339 340 341 342 343 344
    @property
    def config_devices(self):
        return {}

    @property
    def devices(self):
        """
        Maps subscan name to a dictionary of devices,
Wout De Nolf's avatar
Wout De Nolf committed
345 346
        which maps fullname to device info. Ordered
        according to position in acquisition chain.
347 348
        """
        if not self._devices:
349 350
            self._devices = devices.device_info(
                self.config_devices,
Wout De Nolf's avatar
Wout De Nolf committed
351
                self.info,
Wout De Nolf's avatar
Wout De Nolf committed
352
                short_names=self.saveoptions["short_names"],
353 354
                multivalue_positioners=self.saveoptions["multivalue_positioners"],
            )
355 356 357
        return self._devices

    @property
Wout De Nolf's avatar
Wout De Nolf committed
358 359 360 361 362
    def scan_number(self):
        return self.get_info("scan_nb")

    @property
    def _expected_subscans(self):
363 364
        """
        Subscan names for which there are devices defined.
Wout De Nolf's avatar
Wout De Nolf committed
365
        Ordered according to position in acquisition chain.
366 367 368
        """
        return list(sorted(self.devices.keys()))

Wout De Nolf's avatar
Wout De Nolf committed
369 370 371 372 373
    @property
    def _enabled_subscans(self):
        for subscan in self._subscans:
            if subscan.enabled:
                yield subscan
374

Wout De Nolf's avatar
Wout De Nolf committed
375 376 377 378 379 380
    @property
    def _enabled_and_expected_subscans(self):
        expected = self._expected_subscans
        for subscan in self._enabled_subscans:
            if subscan.name in expected:
                yield subscan
381 382 383 384 385

    def device(self, subscan, node):
        """
        Get device information of the node belonging to the subscan

Wout De Nolf's avatar
Wout De Nolf committed
386
        :param Subscan subscan:
387 388 389 390
        :param bliss.data.node.DataNode node:
        :returns dict:
        """
        fullname = node.fullname
Wout De Nolf's avatar
Wout De Nolf committed
391
        subdevices = self.devices[subscan.name]
392 393 394
        device = subdevices.get(fullname, None)
        if device is None:
            device = devices.update_device(subdevices, fullname)
395 396
        if not device["device_type"]:
            device["device_type"] = self._device_type(node)
397 398 399 400 401
        if self.is_scan_group and device["device_type"] == "positioner":
            device["device_type"] = "groupinfo"
            if device["data_name"] == "value":
                device["data_name"] = "data"
        return device
402 403 404 405 406 407 408 409

    def _device_type(self, node):
        """
        Get device type from data node

        :param bliss.data.node.DataNode node:
        :returns str:
        """
410
        return "unknown{}D".format(len(node.shape))
411 412

    @contextmanager
413
    def nxroot(self, filename=None):
414 415 416 417
        """
        Yields the NXroot instance (h5py.File) or None
        when information is missing
        """
418 419 420 421 422
        if not filename:
            filename = self.filename
            if not filename:
                self._h5missing("filename")
        nxroot = self._nxroot.get(filename, None)
423
        if nxroot is None:
424
            if filename:
Wout De Nolf's avatar
Wout De Nolf committed
425 426 427
                try:
                    with nexus.nxRoot(filename, **self._nxroot_kwargs) as nxroot:
                        try:
428
                            self._nxroot[filename] = nxroot
Wout De Nolf's avatar
Wout De Nolf committed
429 430
                            yield nxroot
                        finally:
431
                            self._nxroot[filename] = None
432 433 434 435 436 437
                except OSError as e:
                    if nxroot is None and nexus.isLockedError(e):
                        self._exception_is_fatal = True
                        raise RuntimeError(nexus.lockedErrorMessage(filename)) from None
                    else:
                        raise
438 439 440
            else:
                yield None
        else:
441
            yield nxroot
442

Wout De Nolf's avatar
Wout De Nolf committed
443 444 445 446 447 448 449 450 451
    @property
    def _nxroot_kwargs(self):
        return {
            "mode": "a",
            "enable_file_locking": self.saveoptions["enable_file_locking"],
            "swmr": self.saveoptions["swmr"],
        }

    @contextmanager
452 453
    def _modify_nxroot(self, filename=None):
        with self.nxroot(filename=filename) as nxroot:
Wout De Nolf's avatar
Wout De Nolf committed
454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482
            if nxroot is None:
                yield nxroot
            else:
                with nxroot.acquire_lock():
                    yield nxroot

    @property
    def has_write_permissions(self):
        """
        This process has permission to write/create file and/or directory
        """
        filename = self.filename
        if os.path.exists(filename):
            # Check whether we can write to the file
            return os.access(filename, os.W_OK)
        else:
            # Check whether we can create the file (and possibly subdirs)
            dirname = os.path.dirname(filename)
            while dirname and dirname != os.sep:
                if os.path.exists(dirname):
                    if os.path.isdir(dirname):
                        return os.access(dirname, os.W_OK)
                    else:
                        return False
                else:
                    dirname = os.path.dirname(dirname)
            else:
                return False

483 484 485
    @contextmanager
    def nxentry(self, subscan):
        """
Wout De Nolf's avatar
Wout De Nolf committed
486
        :param Subscan subscan:
487 488 489 490 491 492 493 494 495

        Yields the NXentry instance (h5py.Group) or None
        when information is missing
        """
        with self.nxroot() as nxroot:
            if nxroot is None:
                yield None
                return
            if self._nxentry is None:
Wout De Nolf's avatar
Wout De Nolf committed
496
                nxentry = self._get_nxentry(nxroot, subscan)
497 498 499 500 501 502 503 504
                try:
                    self._nxentry = nxentry
                    yield nxentry
                finally:
                    self._nxentry = None
            else:
                yield self._nxentry

Wout De Nolf's avatar
Wout De Nolf committed
505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546
    def _get_nxentry(self, nxroot, subscan):
        """
        Create/get NXentry of subscan

        :param h5py.File nxroot:
        :param Subscan subscan:
        """
        name = self._nxentry_name(subscan)
        if not name:
            return None
        # Create NXentry instance when missing
        nxentry = nxroot.get(name, None)
        if nxentry is None:
            if not subscan.enabled:
                return None
            kwargs = self._nxentry_create_args()
            if not kwargs:
                return None
            try:
                nxentry = nexus.nxEntry(nxroot, name, raise_on_exists=True, **kwargs)
            except nexus.NexusInstanceExists:
                subscan.enabled = False
            self._on_subscan_creation(subscan)
        return nxentry

    def _on_subscan_creation(self, subscan):
        """
        Actions taken right after subscan NXentry creation

        :param Subscan subscan:
        """
        uri = repr(self.subscan_uri(subscan))
        name = repr(subscan.name)
        if subscan.enabled:
            msg = "Start writing subscan {} to {}"
            msg = msg.format(name, uri)
            self.logger.info(msg)
        else:
            msg = "Writing subscan {} is disabled destination {} exists (probably another writer exists for this session)"
            msg = msg.format(name, uri)
            self._set_state(self.STATES.FAULT, msg)

547 548
    def _nxentry_name(self, subscan):
        """
Wout De Nolf's avatar
Wout De Nolf committed
549 550 551
        Name of the NXentry associated with a subscan

        :param Subscan subscan:
552 553 554
        :returns str:
        """
        try:
Wout De Nolf's avatar
Wout De Nolf committed
555 556 557
            # The subscan name x.1, x.2, ... depends on the order
            # in which the
            i = self._expected_subscans.index(subscan.name)
558
        except ValueError:
Wout De Nolf's avatar
Wout De Nolf committed
559
            self._h5missing("subscan " + repr(subscan.name))
560 561
            return None
        try:
Wout De Nolf's avatar
Wout De Nolf committed
562
            name = scan_utils.scan_name(self.node, i + 1)
563 564 565 566 567 568 569 570 571 572 573
        except AttributeError as e:
            self._h5missing(str(e))
            return None
        return name

    def _nxentry_create_args(self):
        """
        Arguments for nxEntry creation

        :returns dict:
        """
Wout De Nolf's avatar
Wout De Nolf committed
574
        start_timestamp = self.get_info("start_timestamp")
575
        if not start_timestamp:
576
            self._h5missing("start_timestamp")
577
            return None
Wout De Nolf's avatar
Wout De Nolf committed
578
        title = self.get_info("title")
579
        if not title:
580
            self._h5missing("title")
581 582
            return None
        start_time = datetime.datetime.fromtimestamp(start_timestamp)
583 584
        datasets = {"title": title}
        kwargs = {"start_time": start_time, "datasets": datasets}
585 586 587 588 589
        return kwargs

    @contextmanager
    def nxmeasurement(self, subscan):
        """
Wout De Nolf's avatar
Wout De Nolf committed
590
        Yields the measurement instance (h5py.Group) or None
591 592 593 594 595 596
        when NXentry is missing
        """
        with self.nxentry(subscan) as nxentry:
            if nxentry is None:
                yield None
            else:
597
                yield nexus.nxCollection(nxentry, "measurement")
598

Wout De Nolf's avatar
Wout De Nolf committed
599 600 601 602 603 604 605 606 607 608 609 610
    @contextmanager
    def nxnotes(self, subscan):
        """
        Yields the notes instance (h5py.Group) or None
        when NXentry is missing
        """
        with self.nxentry(subscan) as nxentry:
            if nxentry is None:
                yield None
            else:
                yield nexus.nxCollection(nxentry, "notes")

611 612 613 614
    def _h5missing(self, variable):
        """
        :param str variable:
        """
615 616 617
        self.logger.debug(
            "HDF5 group not created yet ({} missing)".format(repr(variable))
        )
618 619 620 621 622 623

    @property
    def plots(self):
        """
        NXdata signals
        """
624
        return {"plot{}D".format(ndim): {"ndim": ndim} for ndim in self.detector_ndims}
625 626 627 628 629 630

    @property
    def plotselect(self):
        """
        Default NXdata group
        """
631
        return "plot0D"
632 633 634 635 636

    def _create_plots(self, subscan):
        """
        Create default plot in Nexus structure

Wout De Nolf's avatar
Wout De Nolf committed
637
        :param Subscan subscan:
638 639 640 641 642 643
        """
        with self.nxentry(subscan) as nxentry:
            if nxentry is None:
                return
            plotselect = self.plotselect
            firstplot = None
Wout De Nolf's avatar
Wout De Nolf committed
644
            plots = self.plots
Wout De Nolf's avatar
Wout De Nolf committed
645
            subscan.logger.info("Create {} plots".format(len(plots)))
Wout De Nolf's avatar
Wout De Nolf committed
646
            for plotname, plotparams in plots.items():
647
                if plotname in nxentry:
Wout De Nolf's avatar
Wout De Nolf committed
648
                    subscan.logger.warning(
649 650 651 652
                        "Cannot create plot {} (name already exists)".format(
                            repr(plotname)
                        )
                    )
653
                    continue
654
                signaldict = self._select_plot_signals(subscan, plotname, **plotparams)
Wout De Nolf's avatar
Wout De Nolf committed
655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673
                if not signaldict:
                    continue
                # Create axes belonging to the signals and save plots
                fmt = self._plot_name_format(signaldict)
                for (i, (k, v)) in enumerate(sorted(signaldict.items()), 1):
                    name, scan_shape, detector_shape = k
                    devicetype, signals = v
                    plotname = fmt.format(name, devicetype, i)
                    axes = self._select_plot_axes(subscan, scan_shape, detector_shape)
                    plot = self._create_plot(nxentry, plotname, signals, axes)
                    if firstplot is None or plotname == plotselect:
                        firstplot = plot
                    subscan.logger.info("Plot " + repr(plotname) + " created")
            # Default plot
            with self._modify_nxroot():
                if firstplot is None:
                    nexus.markDefault(nxentry)
                else:
                    nexus.markDefault(firstplot)
674 675 676 677 678

    def _select_plot_signals(self, subscan, plotname, ndim=-1, grid=False):
        """
        Select plot signals based on detector dimensions.

Wout De Nolf's avatar
Wout De Nolf committed
679
        :param Subscan subscan:
680 681 682 683 684 685 686 687 688 689 690 691
        :param str plotname:
        :param int ndim: detector dimensions
        :param bool grid: preserve scan shape
        :returns dict: (str, tuple): (str, [(name, value, attrs)])
        """
        signaldict = {}
        if ndim >= 0:
            for fullname, dproxy in self.detector_iter(subscan):
                if dproxy.detector_ndim == ndim:
                    self._add_signal(plotname, grid, dproxy, signaldict)
        return signaldict

Wout De Nolf's avatar
Wout De Nolf committed
692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711
    @staticmethod
    def _plot_name_format(signaldict):
        """
        When signals have different dimensions, the plot is split in multiple plots.

        :param dict signaldict: (str, tuple): (str, [(name, value, attrs)])
        :returns str:
        """
        fmt = "{}"
        plotnames = set(fmt.format(name) for name, _, _ in signaldict.keys())
        if len(plotnames) != len(signaldict):
            fmt = "{}_{}"
            plotnames = set(
                fmt.format(name, devicetype)
                for (name, _, _), (devicetype, signals) in signaldict.items()
            )
            if len(plotnames) != len(signaldict):
                fmt = "{}_{}{}"
        return fmt

712 713 714 715 716 717 718 719 720 721 722 723
    def _add_signal(self, plotname, grid, dproxy, signaldict):
        """
        Add dataset to NXdata signal dictionary

        :param str plotname:
        :param bool grid:
        :param DatasetProxy dproxy:
        :param dict signaldict:
        """
        with dproxy.open() as dset:
            if dset is None:
                return
Wout De Nolf's avatar
Wout De Nolf committed
724 725 726 727
            linkname = dproxy.linkname
            if not linkname:
                dproxy.logger.warning("cannot be linked too")
                return
728
            # Determine signal shape
Wout De Nolf's avatar
Wout De Nolf committed
729
            if dproxy.reshaped == grid:
730
                if not nexus.HASVIRTUAL:
731
                    dproxy.logger.error(
Wout De Nolf's avatar
Wout De Nolf committed
732
                        "Cannot reshape for plot {} due to missing VDS support".format(
733 734 735
                            repr(plotname)
                        )
                    )
736 737 738 739 740
                    return
                if grid:
                    shape = dproxy.grid_shape
                else:
                    shape = dproxy.flat_shape
Wout De Nolf's avatar
Wout De Nolf committed
741 742 743
                npoints = dataset_proxy.shape_to_size(dset.shape)
                enpoints = dataset_proxy.shape_to_size(shape)
                if npoints != enpoints:
744 745 746 747 748
                    dproxy.logger.error(
                        "Cannot reshape {} to {} for plot {}".format(
                            dset.shape, shape, repr(plotname)
                        )
                    )
749 750 751 752 753
                    return
            else:
                shape = dset.shape
            # Arguments for dataset creation
            attrs = {}
Wout De Nolf's avatar
Wout De Nolf committed
754
            scan_shape, detector_shape = dataset_proxy.split_shape(
755 756
                shape, dproxy.detector_ndim
            )
757 758 759 760 761 762 763
            if shape == dset.shape:
                # Same shape so this will be a link
                value = dset
            else:
                # Different shape so this will be a virtual dataset
                scan_ndim = len(scan_shape)
                detector_ndim = len(detector_shape)
764 765 766 767 768
                value = {"data": nexus.getUri(dset), "shape": shape}
                interpretation = nexus.nxDatasetInterpretation(
                    scan_ndim, detector_ndim, scan_ndim
                )
                attrs["interpretation"] = interpretation
769
            # Add arguments to signaldict
Wout De Nolf's avatar
Wout De Nolf committed
770
            signal = (linkname, value, attrs)
771 772
            key = plotname, scan_shape, detector_shape
            if key not in signaldict:
Wout De Nolf's avatar
Wout De Nolf committed
773
                signaldict[key] = dproxy.device_type, []
774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794
            lst = signaldict[key][1]
            lst.append(signal)

    def _create_plot(self, nxentry, plotname, signals, axes):
        """
        Create on NXdata with signals and axes.

        :param h5py.Group nxentry:
        :param str plotname:
        :param list signals:
        :param list axes:
        :returns h5py.Group:
        """
        h5group = nexus.nxData(nxentry, plotname)
        nexus.nxDataAddSignals(h5group, signals)
        if axes:
            nexus.nxDataAddAxes(h5group, axes)
        return h5group

    def _select_plot_axes(self, subscan, scan_shape, detector_shape):
        """
Wout De Nolf's avatar
Wout De Nolf committed
795
        :param Subscan subscan:
796 797 798 799 800 801 802 803 804 805
        :param str scan_shape: signal scan shape
        :param str scan_shape: signal detector shape
        :returns list(3-tuple): name, value, attrs
        """
        scan_ndim = len(scan_shape)
        detector_ndim = len(detector_shape)
        ndim = scan_ndim + detector_ndim
        if scan_ndim:
            # Scan axes dict
            axes = {}
806 807 808
            for fullname, dproxy in self.positioner_iter(
                subscan, onlyprincipals=True, onlymasters=True
            ):
809 810 811 812 813 814
                with dproxy.open(dproxy) as dset:
                    if dset is None:
                        continue
                    axes[dproxy.master_index] = dproxy.linkname, dset, None

            # TODO: Scatter plot of non-scalar detector
Wout De Nolf's avatar
Wout De Nolf committed
815 816
            #       does not have a visualization so no
            #       axes for now
817 818 819 820 821 822 823
            flattened = scan_ndim != len(axes)
            if flattened and detector_ndim:
                return []

            # Sort axes
            #   grid plot: fast axis last
            #   scatter plot: fast axis first
Wout De Nolf's avatar
Wout De Nolf committed
824
            if self.saveorder.order == "C" and not flattened:
825 826 827 828 829 830 831 832 833 834
                # Fast axis last
                keys = reversed(sorted(axes))
            else:
                # Fast axis first
                keys = sorted(axes)

            # Dataset creation arguments
            lst = []
            for i, key in enumerate(keys):
                name, value, attrs = axes[key]
Wout De Nolf's avatar
Wout De Nolf committed
835
                if scan_ndim > 1:  # Grid plot (e.g. image)
836 837 838 839 840 841 842
                    # Axis values: shape of scan
                    value = value[()]
                    if value.ndim == 1:
                        value = value.reshape(scan_shape)
                    avgdims = tuple(j for j in range(value.ndim) if i != j)
                    # Average along all but axis dimension
                    value = value.mean(avgdims)
Wout De Nolf's avatar
Wout De Nolf committed
843 844 845 846 847
                    # Make linear
                    x = numpy.arange(value.size)
                    mask = numpy.isfinite(value)
                    try:
                        m, b = numpy.polyfit(x[mask], value[mask], 1)
848
                    except BaseException:
Wout De Nolf's avatar
Wout De Nolf committed
849 850 851 852 853 854
                        value = numpy.linspace(
                            numpy.nanmin(value), numpy.nanmax(value), value.size
                        )
                    else:
                        value = m * x + b
                else:  # Scatter plot (coordinates and value)
855
                    if value.ndim > 1:
856
                        value = {"data": nexus.getUri(value), "shape": scan_shape}
857 858 859 860 861 862 863
                lst.append((name, value, attrs))
            axes = lst
        else:
            axes = []

        # Add axes for the data dimensions (which are always at the end)
        if ndim - len(axes) == detector_ndim:
864 865 866 867
            axes += [
                ("datadim{}".format(i), numpy.arange(detector_shape[i]), None)
                for i in range(detector_ndim)
            ]
868 869 870 871 872 873
        return axes

    def _mark_done(self, subscan):
        """
        The NXentry is completed

Wout De Nolf's avatar
Wout De Nolf committed
874
        :param Subscan subscan:
875
        """
Wout De Nolf's avatar
Wout De Nolf committed
876 877 878
        if self.state == self.STATES.FAULT:
            subscan.logger.warning("Data NOT marked as DONE in HDF5")
            return
879 880
        with self.nxentry(subscan) as nxentry:
            if nxentry is not None:
Wout De Nolf's avatar
Wout De Nolf committed
881
                with self._modify_nxroot():
882
                    nexus.updated(nxentry, final=True, parents=True)
Wout De Nolf's avatar
Wout De Nolf committed
883
                subscan.logger.info("Data marked as DONE in HDF5")
884 885 886

    @property
    def instrument_name(self):
887
        return ""
888

889 890 891 892 893 894 895 896
    @property
    def positioner_info(self):
        return self.get_info("positioners", {})

    @property
    def motors(self):
        return list(self.positioner_info.get("positioners_start", {}).keys())

897 898 899 900 901 902
    @contextmanager
    def nxinstrument(self, subscan):
        """
        Yields the NXinstrument instance (h5py.Group) or None
        when NXentry is missing

Wout De Nolf's avatar
Wout De Nolf committed
903
        :param Subscan subscan:
904 905 906 907 908 909 910 911
        """
        with self.nxentry(subscan) as nxentry:
            if nxentry is None:
                yield None
            else:
                datasets = {}
                title = self.instrument_name
                if title:
912 913
                    datasets["title"] = title
                yield nexus.nxInstrument(nxentry, "instrument", datasets=datasets)
914 915 916 917 918 919 920

    @contextmanager
    def nxdetector(self, subscan, name, **kwargs):
        """
        Yields the NXinstrument instance (h5py.Group) or None
        when NXentry is missing

Wout De Nolf's avatar
Wout De Nolf committed
921
        :param Subscan subscan:
922 923 924 925 926 927 928 929 930 931 932 933 934
        """
        with self.nxinstrument(subscan) as nxinstrument:
            if nxinstrument is None:
                yield None
            else:
                yield nexus.nxDetector(nxinstrument, name, **kwargs)

    @contextmanager
    def nxpositioner(self, subscan, name, **kwargs):
        """
        Yields the NXinstrument instance (h5py.Group) or None
        when NXentry is missing

Wout De Nolf's avatar
Wout De Nolf committed
935
        :param Subscan subscan:
936 937 938 939 940 941 942 943
        """
        with self.nxinstrument(subscan) as nxinstrument:
            if nxinstrument is None:
                yield None
            else:
                yield nexus.nxPositioner(nxinstrument, name, **kwargs)

    @contextmanager
944
    def nxpositioners(self, subscan, suffix=""):
945 946 947 948
        """
        Yields the generic positioners instance (h5py.Group) or None
        when NXentry is missing

Wout De Nolf's avatar
Wout De Nolf committed
949
        :param Subscan subscan:
950 951 952 953 954
        """
        with self.nxinstrument(subscan) as nxinstrument:
            if nxinstrument is None:
                yield None
            else:
955
                yield nexus.nxCollection(nxinstrument, "positioners" + suffix)
956

Wout De Nolf's avatar
Wout De Nolf committed
957
    def _init_subscan(self, subscan):
958
        """
Wout De Nolf's avatar
Wout De Nolf committed
959 960
        Things that can already be saved right after
        receiving the new subscan event.
Wout De Nolf's avatar
Wout De Nolf committed
961

Wout De Nolf's avatar
Wout De Nolf committed
962
        :param Subscan subscan:
Wout De Nolf's avatar
Wout De Nolf committed
963
        """
Wout De Nolf's avatar
Wout De Nolf committed
964 965 966
        self._save_positioners(subscan)

    def _finalize_subscan(self, subscan):
Wout De Nolf's avatar
Wout De Nolf committed
967
        """
Wout De Nolf's avatar
Wout De Nolf committed
968
        Save final subscan data.
969

Wout De Nolf's avatar
Wout De Nolf committed
970
        :param Subscan subscan:
971
        """
972 973 974 975 976 977 978 979
        with self._capture_finalize_exceptions():
            self._save_positioners(subscan)
        with self._capture_finalize_exceptions():
            self._create_plots(subscan)
        with self._capture_finalize_exceptions():
            self._fetch_subscan_metadata(subscan)
        with self._capture_finalize_exceptions():
            self._fetch_subscan_notes(subscan)
980

Wout De Nolf's avatar
Wout De Nolf committed
981 982 983 984
    @property
    def current_bytes(self):
        """
        Total bytes (data-only) of all subscans
985 986
        """
        nbytes = 0
Wout De Nolf's avatar
Wout De Nolf committed
987 988
        for subscan in self._subscans:
            for dproxy in subscan.datasets.values():
989 990 991 992
                if dproxy is not None:
                    nbytes += dproxy.current_bytes
        return nbytes

Wout De Nolf's avatar
Wout De Nolf committed
993 994
    @property
    def progress(self):
995
        """
Wout De Nolf's avatar
Wout De Nolf committed
996
        Mininal/maximal scan data progress
997
        """
Wout De Nolf's avatar
Wout De Nolf committed
998 999 1000 1001 1002 1003 1004 1005 1006
        lst = []
        for subscan in self._subscans:
            for dproxy in list(subscan.datasets.values()):
                if dproxy is not None:
                    lst.append(dproxy.progress)
        if lst:
            return min(lst), max(lst)
        else:
            return 0, 0
1007 1008

    @property
Wout De Nolf's avatar
Wout De Nolf committed
1009
    def progress_string(self):
1010
        """
Wout De Nolf's avatar
Wout De Nolf committed
1011
        Mininal/maximal scan data progress
1012
        """
Wout De Nolf's avatar
Wout De Nolf committed
1013 1014 1015 1016
        progress = []
        for subscan in self._subscans:
            lst = []
            for dproxy in list(subscan.datasets.values()):
1017
                if dproxy is not None:
Wout De Nolf's avatar
Wout De Nolf committed
1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061
                    lst.append(dproxy.progress_string)
            if lst:

                def getprogress(tpl):
                    return tpl[1]

                subscan_progress = "{}-{}".format(
                    min(lst, key=getprogress)[0], max(lst, key=getprogress)[0]
                )
            else:
                subscan_progress = "0pts-0pts"
            progress.append((subscan.name, subscan_progress))
        if progress:
            if len(progress) == 1:
                return progress[0][1]
            else:
                return " ".join([name + ":" + s for name, s in progress])
        else:
            return "0pts-0pts"

    def log_progress(self, msg=None):
        data = dataset_proxy.format_bytes(self.current_bytes)
        progress = self.progress_string
        duration = self.duration
        if msg:
            self.logger.info("{} ({} {} {})".format(msg, progress, data, duration))
        else:
            self.logger.info(" {} {} {}".format(progress, data, duration))

    @property
    def info_string(self):
        data = dataset_proxy.format_bytes(self.current_bytes)
        state = self.state.name
        progress = self.progress_string
        start = self.starttime.strftime("%Y-%m-%d %H:%M:%S")
        end = self.endtime
        if end:
            end = end.strftime("%Y-%m-%d %H:%M:%S")
        else:
            end = "not finished"
        duration = self.duration
        return "{} {} {} {}, start: {}, end: {}".format(
            state, progress, data, duration, start, end
        )
1062 1063 1064 1065 1066 1067 1068

    @property
    def detector_ndims(self):
        """
        All detector dimensions
        """
        ret = set()
Wout De Nolf's avatar
Wout De Nolf committed
1069 1070
        for subscan in self._subscans:
            for dproxy in subscan.datasets.values():
1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084
                if dproxy is not None:
                    ret.add(dproxy.detector_ndim)
        return ret

    def _process_event(self, event_type, node):
        """
        Process event belonging to this scan
        """
        if event_type.name == "NEW_NODE":
            self._event_new_node(node)
        elif event_type.name == "NEW_DATA_IN_CHANNEL":
            self._event_new_data(node)
        else:
            event_info = event_type.name, node.type, node.name, node.fullname
1085
            self.logger.debug("Untreated event: {}".format(event_info))
1086 1087 1088 1089 1090 1091

    def _event_new_node(self, node):
        """
        Creation of a new Redis node
        """
        name = repr(node.name)
1092
        if node.type in ["scan", "scan_group"]:
1093
            self.logger.debug("Start scan " + name)
1094 1095
            self._event_start_scan(node)
        elif node.type == "channel":
1096
            self.logger.debug("New channel " + name)
1097
            self._event_new_datanode(node)
1098 1099 1100
        elif node.type == "node_ref_channel":
            self.logger.debug("New reference channel " + name)
            self._event_new_datanode(node)
1101
        elif node.type == "lima":
1102
            self.logger.debug("New Lima " + name)
1103
            self._event_new_datanode(node)
1104
        elif node.parent.type in ["scan", "scan_group"]:
1105
            self.logger.debug("New subscan " + name)
1106 1107
            self._event_new_subscan(node)
        else:
Wout De Nolf's avatar
Wout De Nolf committed
1108
            self.logger.debug(
1109
                "New {} node event on {} not treated".format(repr(str(node.type)), name)
Wout De Nolf's avatar
Wout De Nolf committed
1110
            )
1111 1112 1113 1114 1115

    def _event_new_subscan(self, node):
        """
        :param node bliss.data.node.DataNodeContainer:
        """
Wout De Nolf's avatar
Wout De Nolf committed
1116
        subscan = Subscan(self, node, parentlogger=self.logger)
1117
        self.logger.info("New subscan " + repr(subscan.name))
Wout De Nolf's avatar
Wout De Nolf committed
1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129
        if subscan.name not in self._expected_subscans:
            self._log_unexpected_subscan(subscan)
            # subscan.enabled = False
            # self._set_state(self.STATES.FAULT, msg)
        self._subscans.add(subscan)
        self._init_subscan(subscan)

    def _log_unexpected_subscan(self, subscan):
        msg = "Subscan {} not one of the expected subscans {}".format(
            repr(subscan.name), self._expected_subscans
        )
        self.logger.warning(msg)
1130 1131 1132 1133 1134 1135 1136 1137 1138

    def _event_new_data(self, node):
        """
        Creation of a new data node
        """
        # New data in an existing Redis node
        name = repr(node.fullname)
        if node.type == "channel":
            self._fetch_data(node)
1139 1140
        elif node.type == "node_ref_channel":
            self._fetch_data(node)
1141 1142 1143
        elif node.type == "lima":
            self._fetch_data(node)
        else:
1144
            self.logger.warning(
1145
                "New {} data for {} not treated".format(repr(str(node.type)), name)
1146
            )
1147 1148 1149 1150 1151

    def _event_start_scan(self, scan):
        """
        Scan has started

Wout De Nolf's avatar
Wout De Nolf committed
1152
        :param bliss.data.nodes.scan.Scan scan:
1153
        """
Wout De Nolf's avatar
Wout De Nolf committed
1154 1155 1156 1157 1158
        title = self.get_info("title", "")
        self.logger.info("title = " + repr(title))
        if not self.has_write_permissions:
            self._exception_is_fatal = True
            raise RuntimeError("Cannot write to {}".format(self.filename))
1159 1160 1161 1162 1163 1164 1165

    def _event_new_datanode(self, node):
        """
        New data node is created

        :param bliss.data.node.DataNode node:
        """
1166 1167
        # Node will appear in HDF5 but not
        # necessarily with data (dset.shape[0] == 0)
Wout De Nolf's avatar
Wout De Nolf committed
1168
        self._nodes.append(node)
1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187
        self._node_proxy(node)

    @property
    def is_scan_group(self):
        return self.node.type == "scan_group"

    def _node_proxy(self, node):
        """
        Get node proxy associated with node (create when needed).
        The subscan to which the node belongs too must be known,
        expected and enabled.

        :param bliss.data.node.DataNode node:
        :returns DatasetProxy or None:
        """
        if node.type == "node_ref_channel":
            return self._reference_proxy(node)
        else:
            return self._dataset_proxy(node)
1188

Wout De Nolf's avatar
Wout De Nolf committed
1189
    def _dataset_proxy(self, node):
1190
        """
Wout De Nolf's avatar
Wout De Nolf committed
1191 1192 1193
        Get dataset proxy associated with node (create when needed).
        The subscan to which the node belongs too must be known,
        expected and enabled.
1194 1195

        :param bliss.data.node.DataNode node:
Wout De Nolf's avatar
Wout De Nolf committed
1196
        :returns DatasetProxy or None:
1197
        """
Wout De Nolf's avatar
Wout De Nolf committed
1198 1199 1200 1201 1202
        dproxy = None
        subscan = self._datanode_subscan(node)
        if subscan is None:
            # unknown, unexpected or disabled
            return dproxy
1203
        # Already initialized?