#!/usr/bin/env python # -*- coding:utf-8 -*- ############################################################################## ## license : ##============================================================================ ## ## File : MetadataManager.py ## ## Project : Metadata Manager for ICAT ## ## This file is part of Tango device class. ## ## Tango is free software: you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation, either version 3 of the License, or ## (at your option) any later version. ## ## Tango is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with Tango. If not, see . ## ## ## $Author : christophe.cleva$ ## ## $Revision : $ ## ## $Date : $ ## ## $HeadUrl : $ ##============================================================================ ## This file is generated by POGO ## (Program Obviously used to Generate tango Object) ############################################################################## __all__ = ["MetadataManager", "MetadataManagerClass", "main"] __docformat__ = 'restructuredtext' import tango import sys # Add additional import #----- PROTECTED REGION ID(MetadataManager.additionnal_import) ENABLED START -----# import os.path import sys, traceback import string import json from datetime import datetime from time import gmtime, strftime import logging try: import requests except ImportError: requests = None try: import graypy except ImportError: graypy = None from collections import deque from metadata_manager.icat_ingest_metadata.tomodb import tomodbFile from metadata_manager.icat_ingest_metadata.dataset import dataset, parameter, sample, datafile from metadata_manager.MessagingClient import StompClient from metadata_manager.TangoLoggingHandler import TangoLoggingHandler from metadata_manager.ParameterDefinition import ParameterDefinition from metadata_manager.ICATParameterReader import ICATParameterReader from metadata_manager.MetadataWorker import MetadataWorker from metadata_manager import tangoworker #----- PROTECTED REGION END -----# // MetadataManager.additionnal_import ## Device States Description ## FAULT : Device is not functioning correctly ## ON : Device is ready to record dataset ## OFF : No experiment ongoing ## STANDBY : Experiment started, sample or dataset name is missing ## RUNNING : Dataset currently recorded class MetadataManager (tango.LatestDeviceImpl): """""" def __init__(self, cl, name): tango.LatestDeviceImpl.__init__(self,cl,name) self.metadataWorker = MetadataWorker(self.get_name()) MetadataManager.init_device(self) def delete_device(self): self.debug_stream("In delete_device()") #----- PROTECTED REGION ID(MetadataManager.delete_device) ENABLED START -----# try: if self.expdev.is_locked_by_me(): try: self.expdev.unlock() except Exception as e: self.error_stream("Error unlocking MetaExp: %s" % e) except Exception as e: self.warn_stream("Error accessing MetaExp: %s" % e) try: self.expdev.unsubscribe_event(self.proposalEventId) except Exception as e: self.error_stream("Error unsubscribing: %s" % e) try: self.expdev.unsubscribe_event(self.sampleEventId) except Exception as e: self.error_stream("Error unsubscribing: %s" % e) try: self.expdev.unsubscribe_event(self.dataRootEventId) except Exception as e: self.error_stream("Error unsubscribing: %s" % e) self.client.disconnect() #----- PROTECTED REGION END -----# // MetadataManager.delete_device def init_device(self): self.get_device_properties(self.get_device_class()) self.attr_proposal_read = '' self.attr_sampleName_read = '' self.attr_datasetName_read = '' self.attr_metadataFile_read = '' self.attr_lastDatafile_read = '' self.attr_dataFolder_read = '' self.attr_sampleParameters_read = [''] self.attr_datafileList_read = [''] self.attr_datasetNameList_read = [''] #----- PROTECTED REGION ID(MetadataManager.init_device) ENABLED START -----# # init members self.client = None self.proxy = None self.running = False self.startDate = None self.endDate = None self.metadataFileName = None self.expdev = None self.proposalEventId = None self.sampleEventId = None self.dataRootEventId = None self.dataRoot = '' self.datasetParentLocation = '' # Priority of dataset metadata keys # (highest number is highest priority) # 1. parameters(local): from attributes of this device # 2. labels: from device property # 3. parameters(local): from attributes of other devices # 4. custom: added with SetParameters # 5. sample: # 6. globals: proposal, beamline, etc. self.datasetParamMap = dict() # priority (local:1, remote:3) self.parameterDefinitionMap = dict() self.attributeProxyMap = dict() self.labels_map = dict() # priority (2) self.customParamMap = dict() # priority (4) self.sampleParamMap = dict() # priority (5) self.attr_datafileList_read = [] self.attr_datasetNameList_read = [] self.msgList = deque(maxlen=20) # logger bridge logging.getLogger().setLevel(logging.DEBUG) self.logger = logging.getLogger(self.get_name()) self.logger.addHandler(TangoLoggingHandler(self)) if graypy != None: # this depends on the version of graypy used try: self.logger.addHandler(graypy.GELFHandler(self.graylogServer, self.graylogPort)) except: self.logger.addHandler(graypy.GELFUDPHandler(self.graylogServer, self.graylogPort)) # self client self.proxy = tango.DeviceProxy(self.get_name()) # JMS client # we need to proxy to be created self.client = StompClient(self.queueURLs, self.queueName, self.beamlineID, manager=self, jolokia_port=self.jolokiaPort) self.logger.info("Init MetadataManager. url=%s, queue=%s metadatamanager=%s beamline=%s" % (self.client.getConfigURL(), self.queueName, self.get_name(), self.beamlineID)) self.Connect() # MetaExp client self.expdev = tango.DeviceProxy(self.metaExperimentDevice) self.proposalEventId = self.expdev.subscribe_event("Proposal", tango.EventType.CHANGE_EVENT, self, stateless=True) self.sampleEventId = self.expdev.subscribe_event("Sample", tango.EventType.CHANGE_EVENT, self, stateless=True) self.dataRootEventId = self.expdev.subscribe_event("DataRoot", tango.EventType.CHANGE_EVENT, self, stateless=True) # replicate attributes initialisation without calling write_XXX methods db = tango.Util.instance().get_database() attr_map = db.get_device_attribute_property(self.get_name(),['datasetName','dataFolder','metadataFile']) self.attr_datasetName_read = attr_map['datasetName'].get('__value',[''])[0] self.attr_dataFolder_read = attr_map['dataFolder'].get('__value',[''])[0] self.attr_metadataFile_read = attr_map['metadataFile'].get('__value',[''])[0] if self.attr_proposal_read: self.info_stream("Recovered proposal %s" % self.attr_proposal_read) if self.attr_sampleName_read: self.info_stream("Recovered sample name %s" % self.attr_sampleName_read) if self.attr_datasetName_read: self.info_stream("Recovered dataset name %s" % self.attr_datasetName_read) if self.attr_dataFolder_read: self.info_stream("Recovered data folder %s" % self.attr_dataFolder_read) if self.attr_metadataFile_read: self.info_stream("Recovered metadata file %s" % self.attr_metadataFile_read) self.info_stream("Device init. Proposal=%s sample=%s dataset=%s dataFolder=%s metadataFile=%s" % (self.attr_proposal_read, self.attr_sampleName_read, self.attr_datasetName_read, self.attr_dataFolder_read, self.attr_metadataFile_read)) # update state based on attributes self.UpdateState() ''' Some getters for me to easily retrieve the info we will most likely use to log messages ''' def get_proposal(self): return self.attr_proposal_read def get_sample_name(self): return self.attr_sampleName_read def get_dataset_name(self): return self.attr_datasetName_read def get_dataroot(self): return self.dataRoot def always_executed_hook(self): return #----- PROTECTED REGION ID(MetadataManager.always_executed_hook) ENABLED START -----# #----- PROTECTED REGION END -----# // MetadataManager.always_executed_hook #-------------------------------------------------------------------------- # MetadataManager read/write attribute methods #-------------------------------------------------------------------------- def read_beamlineID(self, attr): attr.set_value(self.beamlineID.lower()) def read_proposal(self, attr): attr.set_value(self.attr_proposal_read) def read_datasetState(self, attr): attr.set_value(str(self.dev_state())) def is_proposal_allowed(self, attr): if attr==tango.AttReqType.READ_REQ: state_ok = not(self.get_state() in [tango.DevState.FAULT]) else: state_ok = not(self.get_state() in [tango.DevState.FAULT, tango.DevState.RUNNING]) return state_ok def read_sampleName(self, attr): attr.set_value(self.attr_sampleName_read) self.logger.info("read_sampleName. sampleName=%s metadatamanager=%s" % (self.attr_sampleName_read, self.get_name())) def is_sampleName_allowed(self, attr): if attr==tango.AttReqType.READ_REQ: state_ok = not(self.get_state() in [tango.DevState.FAULT, tango.DevState.OFF]) else: state_ok = not(self.get_state() in [tango.DevState.FAULT, tango.DevState.OFF, tango.DevState.RUNNING]) return state_ok def read_datasetName(self, attr): attr.set_value(self.attr_datasetName_read) def write_datasetName(self, attr): datasetName = attr.get_write_value() #if datasetName in self.attr_datasetNameList_read: # self.logger.error("datasetName already used. datasetName=%s metadatamanager=%s" % (datasetName, self.get_name())) # tango.Except.throw_exception("datasetName already used: %s" % datasetName, "datasetNames have to be unique for a proposal", "datasetName") if self.is_dataFolder_allowed(tango.AttReqType.WRITE_REQ): self.proxy.write_attribute("dataFolder", "") self.attr_dataFolder_read="" if self.is_metadataFile_allowed(tango.AttReqType.WRITE_REQ): self.proxy.write_attribute("metadataFile", "") self.attr_metadataFile_read="" self.attr_datasetName_read=datasetName self.UpdateState() def is_datasetName_allowed(self, attr): if attr==tango.AttReqType.READ_REQ: state_ok = not(self.get_state() in [tango.DevState.FAULT, tango.DevState.OFF]) else: state_ok = not(self.get_state() in [tango.DevState.FAULT, tango.DevState.OFF, tango.DevState.RUNNING]) if state_ok and self.attr_sampleName_read: state_ok = True else: state_ok = False return state_ok def read_metadataFile(self, attr): if self.attr_metadataFile_read : attr.set_value(self.metadataWorker.Sanitize(self.attr_metadataFile_read)) else : attr.set_value(self.metadataWorker.DefaultFileName(self.attr_proposal_read, self.attr_sampleName_read, self.attr_datasetName_read)) def write_metadataFile(self, attr): self.attr_metadataFile_read= attr.get_write_value() def is_metadataFile_allowed(self, attr): if attr==tango.AttReqType.READ_REQ: state_ok = not(self.get_state() in [tango.DevState.FAULT, tango.DevState.OFF, tango.DevState.STANDBY]) else: state_ok = not(self.get_state() in [tango.DevState.FAULT, tango.DevState.OFF,tango.DevState.STANDBY,tango.DevState.RUNNING]) return state_ok def read_lastDatafile(self, attr): attr.set_value(self.attr_lastDatafile_read) def write_lastDatafile(self, attr): data = attr.get_write_value() if data and data != self.attr_lastDatafile_read: self.attr_datafileList_read.append(data) self.attr_lastDatafile_read = data def is_lastDatafile_allowed(self, attr): if attr==tango.AttReqType.READ_REQ: state_ok = not(self.get_state() in [tango.DevState.FAULT,tango.DevState.ON,tango.DevState.OFF,tango.DevState.STANDBY]) else: state_ok = not(self.get_state() in [tango.DevState.FAULT,tango.DevState.ON,tango.DevState.OFF,tango.DevState.STANDBY]) return state_ok def read_dataFolder(self, attr): if not self.attr_dataFolder_read: self.attr_dataFolder_read = self.DefaultFolder() attr.set_value(self.attr_dataFolder_read) def write_dataFolder(self, attr): self.attr_dataFolder_read = attr.get_write_value() def is_dataFolder_allowed(self, attr): if attr==tango.AttReqType.READ_REQ: state_ok = not(self.get_state() in [tango.DevState.FAULT]) else: state_ok = not(self.get_state() in [tango.DevState.FAULT,tango.DevState.OFF,tango.DevState.STANDBY,tango.DevState.RUNNING]) return state_ok def read_CheckParameters(self): self.debug_stream("In read_CheckParameters()") def is_CheckParameters_allowed(self): self.debug_stream("In is_CheckParameters_allowed()") return True def CheckParameters(self): reader = ICATParameterReader(self.get_name(), self.authenticationPlugin, self.username, self.password, self.server, self.port) return str(reader.compare(self.parameters, self.labels)) # returns the sample parameters set by the SetSampleParameters command as key value pairs # the list is dynamically created on each call # returns "None" if list is empty def read_sampleParameters(self, attr): self.attr_sampleParameters_read=[] for k in self.sampleParamMap: self.attr_sampleParameters_read.append(k) self.attr_sampleParameters_read.append(self.sampleParamMap[k]) if not self.attr_sampleParameters_read: self.attr_sampleParameters_read.append("None") attr.set_value(self.attr_sampleParameters_read) def is_sampleParameters_allowed(self, attr): if attr==tango.AttReqType.READ_REQ: state_ok = not(self.get_state() in [tango.DevState.FAULT, tango.DevState.OFF]) else: state_ok = not(self.get_state() in []) if state_ok and self.attr_sampleName_read: state_ok = True else: state_ok = False return state_ok def read_datafileList(self, attr): if self.attr_datafileList_read: attr.set_value(self.attr_datafileList_read) else : attr.set_value(["None"]) def is_datafileList_allowed(self, attr): if attr==tango.AttReqType.READ_REQ: state_ok = not(self.get_state() in [tango.DevState.FAULT, tango.DevState.ON, tango.DevState.OFF, tango.DevState.STANDBY]) else: state_ok = not(self.get_state() in []) #----- PROTECTED REGION ID(MetadataManager.is_datafileList_allowed) ENABLED START -----# #----- PROTECTED REGION END -----# // MetadataManager.is_datafileList_allowed return state_ok def read_datasetNameList(self, attr): #----- PROTECTED REGION ID(MetadataManager.datasetNameList_read) ENABLED START -----# attr.set_value(self.attr_datasetNameList_read) #----- PROTECTED REGION END -----# // MetadataManager.datasetNameList_read def read_messageList(self, attr): #----- PROTECTED REGION ID(MetadataManager.messageList_read) ENABLED START -----# attr.set_value(self.msgList) #----- PROTECTED REGION END -----# // MetadataManager.messageList_read def read_parameter(self, attr): try: paramdef = self.parameterDefinitionMap.get(attr.get_name(), '') if paramdef.isRemote(): attr.set_value(paramdef.getValue(self.attributeProxyMap)) else: attr.set_value(self.datasetParamMap.get(attr.get_name(), '')) except: traceback.print_exc(file=sys.stdout) def read_label(self, attr): attr.set_value(self.labels_map[attr.get_name()]) def write_parameter(self, attr): self.debug_stream("In write_parameter()") data = attr.get_write_value() self.datasetParamMap[attr.get_name()]=data def initialize_dynamic_attributes(self): # create dynamic parameters as defined in the device property # these need to be set_memorized_init(True) to be registered in the map on initialisation if self.parameters: for p in self.parameters: # create parameter definiton d = ParameterDefinition(p) pname = d.getParameterName() self.parameterDefinitionMap[pname] = d # add attributes: local -> RW, remote -> RO if d.isLocal(): self.info_stream("Local attribute: %s" %pname) param = tango.Attr(pname, tango.DevString, tango.READ_WRITE) param.set_memorized() #param.set_memorized_init(True) param.set_memorized_init(False) self.add_attribute(param, MetadataManager.read_parameter, MetadataManager.write_parameter, None) # register an empty value on creation, will be populated on initialization self.datasetParamMap[pname] = '' else: self.info_stream("Remote attribute: %s - from: %s" % (pname, d.getParameterDefinition())) param = tango.Attr(pname, tango.DevString, tango.READ) self.add_attribute(param, MetadataManager.read_parameter, None, None) # create device proxies for all remote attributes for dev in d.getRemoteAttributes(): if dev not in self.attributeProxyMap: try: self.attributeProxyMap[dev] = tango.AttributeProxy(dev) except Exception as e: self.error_stream("Error getting attribute proxy for %s: %s" %(dev, e)) if self.labels: for l in self.labels: aa = l.split("=") if len(aa) > 1: pname = aa[0].strip() self.info_stream("Label: %s" % pname) param = tango.Attr(pname, tango.DevString, tango.READ) self.add_attribute(param, MetadataManager.read_label, None, None) self.labels_map[pname] = aa[1] def read_attr_hardware(self, data): self.debug_stream("In read_attr_hardware()") #-------------------------------------------------------------------------- # MetadataManager command methods #-------------------------------------------------------------------------- def dev_state(self): """ This command gets the device state (stored in its device_state data member) and returns it to the caller. :return: Device state :rtype: tango.CmdArgType.DevState """ self.debug_stream("In dev_state()") argout = tango.DevState.UNKNOWN #----- PROTECTED REGION ID(MetadataManager.State) ENABLED START -----# self.Connect() #----- PROTECTED REGION END -----# // MetadataManager.State if argout != tango.DevState.ALARM: tango.LatestDeviceImpl.dev_state(self) return self.get_state() def dev_status(self): """ This command gets the device status (stored in its device_status data member) and returns it to the caller. :return: Device status :rtype: str """ self.debug_stream("In dev_status()") #----- PROTECTED REGION ID(MetadataManager.Status) ENABLED START -----# self.Connect() #----- PROTECTED REGION END -----# // MetadataManager.Status self.__status = tango.LatestDeviceImpl.dev_status(self) return self.__status def IngestTomodbXML(self, argin): """ Start the ingestion of the given tomodb XML file :param argin: Absolute path to the tomodb XML file :type argin: tango.DevString """ self.debug_stream("In IngestTomodbXML()") #----- PROTECTED REGION ID(MetadataManager.IngestTomodbXML) ENABLED START -----# tomo = tomodbFile(self.attr_proposal_read, self.beamlineID.lower(), argin) self.client.sendObject(tomo) #----- PROTECTED REGION END -----# // MetadataManager.IngestTomodbXML def is_IngestTomodbXML_allowed(self): self.debug_stream("In is_IngestTomodbXML_allowed()") state_ok = not(self.get_state() in [tango.DevState.FAULT, tango.DevState.OFF, tango.DevState.STANDBY, tango.DevState.RUNNING]) #----- PROTECTED REGION ID(MetadataManager.is_IngestTomodbXML_allowed) ENABLED START -----# #----- PROTECTED REGION END -----# // MetadataManager.is_IngestTomodbXML_allowed return state_ok def SetSampleParameters(self, argin): """ Sets the sample parameters, input is an array of successive key-value pairs :param argin: :type argin: tango.DevVarStringArray """ self.debug_stream("In SetSampleParameters()") #----- PROTECTED REGION ID(MetadataManager.SetSampleParameters) ENABLED START -----# if len(argin) % 2 != 0: self.warn_stream("Sample Parameters have to be key-value pairs, last value will be missing") argin.append("missing") for i in range(len(argin) / 2): self.sampleParamMap[argin[i*2]] = argin[i*2+1] #----- PROTECTED REGION END -----# // MetadataManager.SetSampleParameters def is_SetSampleParameters_allowed(self): self.debug_stream("In is_SetSampleParameters_allowed()") state_ok = not(self.get_state() in [tango.DevState.FAULT, tango.DevState.OFF, tango.DevState.RUNNING]) #----- PROTECTED REGION ID(MetadataManager.is_SetSampleParameters_allowed) ENABLED START -----# if state_ok and self.attr_sampleName_read: state_ok = True else: state_ok = False #----- PROTECTED REGION END -----# // MetadataManager.is_SetSampleParameters_allowed return state_ok def GetDefaultFolder(self): return self.DefaultFolder() def GetDataFolderPattern(self): return self.dataFolderPattern def SetDatasetParentLocation(self, datasetParentLocation): self.datasetParentLocation = datasetParentLocation self.info_stream("Set datasetParentLocation. datasetParentLocation=%s metadatamanager=%s" % (self.datasetParentLocation, self.get_name())) def GetDatasetParentLocation(self): return self.datasetParentLocation def ClearDatasetParentLocation(self): self.datasetParentLocation = '' self.info_stream("Clear datasetParentLocation. datasetParentLocation=%s metadatamanager=%s" % (self.datasetParentLocation, self.get_name())) def StartDataset(self): """ Indicates the beginning of a dataset. Sets the device in the RUNNING state. """ try: self.expdev.lock(tango.constants.DEFAULT_LOCK_VALIDITY) self.running = True self.set_state(tango.DevState.RUNNING) self.startDate = datetime.now() self.entries = dict() self.files = [] self.logger.info("Dataset is starting. proposal=%s sample=%s datasetName=%s metadatamanager=%s datasetFolder=%s" % (self.get_proposal(), self.get_sample_name(), self.get_dataset_name(), self.get_name(), self.DefaultFolder())) tangoworker.execute(self._export_initial) self.logger.info("Dataset is started. proposal=%s sample=%s datasetName=%s metadatamanager=%s datasetFolder=%s" % (self.get_proposal(), self.get_sample_name(), self.get_dataset_name(), self.get_name(), self.DefaultFolder())) for datasetParam in self.datasetParamMap: self.datasetParamMap[datasetParam] = '' self.customParamMap = dict() except: traceback.print_exc(file=sys.stdout) def is_StartDataset_allowed(self): return not(self.get_state() in [tango.DevState.FAULT,tango.DevState.OFF,tango.DevState.STANDBY,tango.DevState.RUNNING]) def EndDataset(self): """ Indicates the end of the dataset in normal conditions. Triggers reporting of metadata. """ self.endDate = datetime.now() dataset = self.CreateDataset() self.logger.info("Dataset is ending. proposal=%s sample=%s datasetName=%s metadatamanager=%s datasetFolder=%s" % (self.get_proposal(), self.get_sample_name(), self.get_dataset_name(), self.get_name(), self.DefaultFolder())) tangoworker.execute(self._export_final, dataset, self.metadataFileName, self.entries, self.files) # cleanup self.CleanupDatafiles() self.running = False self.UpdateState() # keep dataset name in list self.attr_datasetNameList_read.append(self.attr_datasetName_read) # clean the dataset self.logger.info("Dataset has ended. proposal=%s sample=%s datasetName=%s metadatamanager=%s datasetFolder=%s" % (self.get_proposal(), self.get_sample_name(), self.get_dataset_name(), self.get_name(), self.DefaultFolder())) self.proxy.write_attribute("datasetName", "") self.expdev.unlock() def is_EndDataset_allowed(self): self.debug_stream("In is_EndDataset_allowed()") state_ok = not(self.get_state() in [tango.DevState.FAULT, tango.DevState.ON, tango.DevState.OFF, tango.DevState.STANDBY]) return state_ok # This sends a notification to the electronic logbook (ICAT+) def sendNotification(self, eventType, category, message): try: if requests: # The content of the message content = [{"format" : "plainText", "text" : message}] datasetName = None if self.attr_datasetName_read : datasetName = self.attr_datasetName_read params = dict( { "type" : eventType, "datasetName" : datasetName, "category" : category, "content" : content, "creationDate" : str(datetime.now().isoformat()) } ) # Some versions of requests have not got this configuration #requests.adapters.DEFAULT_RETRIES = 1 headers = {'Content-Type': 'application/json', 'Accept':'application/json'} url = self.icatplus_server + "/logbook/" + str(self.API_KEY) + "/investigation/name/" + str(self.get_proposal()) + "/instrument/name/" + str(self.beamlineID.lower()) + "/event" self.logger.debug(url) requests.post(url, data=(json.dumps(params)), headers=headers, timeout=0.1) else: self.logger.info("No requests installed") except Exception as e: sys.excepthook(*sys.exc_info()) pass def uploadBase64(self, base64): try: if requests: params = dict( { "base64" : base64, "creationDate" : str(datetime.now().isoformat()) } ) headers = {'Content-Type': 'application/json', 'Accept':'application/json'} url = self.icatplus_server + "/logbook/" + str(self.API_KEY) + "/investigation/name/" + str(self.get_proposal()) + "/instrument/name/" + str(self.beamlineID.lower()) + "/event/createFrombase64" self.logger.debug(url) requests.post(url, data=(json.dumps(params)), headers=headers, timeout=0.1) else: self.logger.info("No requests installed") except Exception as e: sys.excepthook(*sys.exc_info()) pass def notifyInfo(self, message): self.sendNotification("notification", "info", message) def notifyDebug(self, message): self.sendNotification("notification", "debug", message) def notifyCommand(self, message): self.sendNotification("notification", "commandLine", message) def notifyError(self, message): self.sendNotification("notification", "error", message) def userComment(self, message): self.sendNotification("annotation", "comment", message) def InterruptDataset(self): self.debug_stream("In InterruptDataset()") self.endDate = datetime.now() dts = self.CreateDataset(False) tangoworker.execute(self._export_final, dts, self.metadataFileName, self.entries, self.files) self.CleanupDatafiles() self.running = False self.UpdateState() self.attr_datasetNameList_read.append(self.attr_datasetName_read) self.proxy.write_attribute("datasetName", "") self.expdev.unlock() self.debug_stream("InterruptDataset() done") #----- PROTECTED REGION END -----# // MetadataManager.EndDataset def is_InterruptDataset_allowed(self): self.debug_stream("In is_EndDataset_allowed()") state_ok = not(self.get_state() in [tango.DevState.FAULT, tango.DevState.ON, tango.DevState.OFF, tango.DevState.STANDBY]) #----- PROTECTED REGION ID(MetadataManager.is_EndDataset_allowed) ENABLED START -----# #----- PROTECTED REGION END -----# // MetadataManager.is_EndDataset_allowed return state_ok def AbortDataset(self): """ Indicates the abnormal termination of a running dataset. Does not report metadata. """ self.debug_stream("In AbortDataset()") #----- PROTECTED REGION ID(MetadataManager.AbortDataset) ENABLED START -----# self.CleanupDatafiles() self.running = False self.set_state(tango.DevState.ON) self.expdev.unlock() #----- PROTECTED REGION END -----# // MetadataManager.AbortDataset def is_AbortDataset_allowed(self): self.debug_stream("In is_AbortDataset_allowed()") state_ok = not(self.get_state() in [tango.DevState.FAULT, tango.DevState.ON, tango.DevState.OFF, tango.DevState.STANDBY]) #----- PROTECTED REGION ID(MetadataManager.is_AbortDataset_allowed) ENABLED START -----# #----- PROTECTED REGION END -----# // MetadataManager.is_AbortDataset_allowed return state_ok def Reset(self): """ Resets the device to the OFF state. Clears attributes. """ self.debug_stream("In Reset()") #----- PROTECTED REGION ID(MetadataManager.Reset) ENABLED START -----# # clears the proposal, this cascade clears the sampleName, sample parameters and datasetName self.proxy.write_attribute("datasetName", "") #----- PROTECTED REGION END -----# // MetadataManager.Reset def is_Reset_allowed(self): self.debug_stream("In is_Reset_allowed()") state_ok = not(self.get_state() in [tango.DevState.FAULT, tango.DevState.RUNNING]) #----- PROTECTED REGION ID(MetadataManager.is_Reset_allowed) ENABLED START -----# #----- PROTECTED REGION END -----# // MetadataManager.is_Reset_allowed return state_ok def addParametersToDataSet(self, dataset, parameters): for parameter in parameters: dataset.add_parameter(parameter) return dataset #----- PROTECTED REGION ID(MetadataManager.programmer_methods) ENABLED START -----# def _export_final(self, dataset, metadatafilename, entries, files, ): self.metadataWorker.exportFinal(dataset, metadatafilename, entries, files, self.attr_metadataFile_read, self.attr_proposal_read, self.attr_sampleName_read, self.attr_datasetName_read, self.getParameters(), self.client) def _export_initial(self): try: # now we need to make sure the datafolder attribute is correct if not self.attr_dataFolder_read: self.attr_dataFolder_read = self.DefaultFolder() # record h5 file name for later stage if os.path.exists(self.attr_dataFolder_read) and os.access(self.attr_dataFolder_read, os.W_OK | os.X_OK): self.metadataFileName = self.metadataWorker.MakeFileName(self.attr_dataFolder_read, "h5", self.attr_metadataFile_read, self.attr_proposal_read, self.attr_sampleName_read, self.attr_datasetName_read, True) else: self.warn_stream("Folder %s does not exist, or is not writable, will save data in local folder" % self.attr_dataFolder_read) self.metadataFileName = self.metadataWorker.MakeFileName("data", "h5", self.attr_metadataFile_read, self.attr_proposal_read, self.attr_sampleName_read, self.attr_datasetName_read, False) # write the initial parameters self.endDate = None #datetime.now() dataset = self.CreateDataset() # we need to prepare everything for async processing self.entries = dict() self.files = [] self.debug_stream("globalHDFfiles %s" % (self.globalHDFfiles)) for f in self.globalHDFfiles: self.files.append(self._replaceInPath(f)) entries = self.metadataWorker.exportInitial(dataset, self.metadataFileName, self.entries, self.files, self.getParameters(), self.sampleParamMap) except Exception as e: traceback.print_exc(file=sys.stdout) self.error_stream("Error exporting %s: %s" % (self.metadataFileName, e)) def _makedict(self, dataset, initial=None): d = dict() self.info_stream(">> makedict i"); d['proposal'] = dataset.get_investigation() d['beamlineID'] = dataset.get_instrument() d['datasetName'] = dataset.get_name() d['location'] = dataset.get_location() d['startDate'] = dataset.get_startDate() d['endDate'] = dataset.get_endDate() d['sampleName'] = dataset.get_sample().get_name() self.info_stream(">> makedict iI"); for p in dataset.get_parameter(): d[p.get_name()] = p.get_value() # add parameters from sample for key in self.sampleParamMap: self.info_stream(key + 'corresponds to' + self.sampleParamMap[key]) d[key] = self.sampleParamMap[key] self.info_stream("\tMAKEDICT : [{0}] ".format(str(d))) return d def Connect(self): try: beat = self.get_attribute_poll_period('State') * 1.2 self.client.setHeartBeat(beat) self.client.beat() self.debug_stream("Connection status: %s" %self.client.getState()) self.UpdateState() except Exception as e: traceback.print_exc(file=sys.stdout) self.warn_stream("Error retrieving poll value: %s" %e) beat = 0 def SetParameters(self, json_string): """This will overwrite ICAT key-value pairs defined by the "parameters" and "labels" device properties, but not the sample parameters. """ self.customParamMap = json.loads(json_string) original = set(self.parameters) | set(self.labels) new = set(self.customParamMap.keys()) overwritten = len(original & new) original = len(original) + len(self.sampleParamMap) - overwritten new = len(new) - overwritten self.info_stream(f"Add parameters ({new} new, {overwritten} overwritten, {original} original)") # Gets the parameters defined on the parameters list in the tango device # Return List def getParameters(self): """Remote and custom parameters. :returns list(parameter): """ try: notallowed = self.notallowed_parameters parameters = [] # add the linked parameters (need to get the value) # async reads of every proxy callids = dict() for devstr, devproxy in self.attributeProxyMap.items(): if devproxy: try: callids[devproxy] = devproxy.read_asynch() except Exception as e: self.error_stream("Error reading attribute %s: %s" %(devstr, e)) # read replies and store values values = dict() for devstr, devproxy in self.attributeProxyMap.items(): v = None if devproxy and devproxy in callids: try: v = devproxy.read_reply(callids[devproxy], 3000, tango.ExtractAs.Numpy).value except Exception as e: self.error_stream("Error getting attribute value %s: %s" %(devstr, e)) values[devstr] = v # loop on param definition and add built values to dataset for n, d in self.parameterDefinitionMap.items(): if d.isRemote(): v = d.buildValue(self.attributeProxyMap, values) if v and n not in notallowed: parameters.append(parameter(n, v)) except: traceback.print_exc(file=sys.stdout) notallowed = self.notallowed_custom parameters += list(parameter(n, v) for n, v in self.customParamMap.items() if n not in notallowed) return parameters @property def notallowed_parameters(self): return set(self.labels_map.keys()) | set(self.customParamMap.keys()) | set(self.sampleParamMap.keys()) @property def notallowed_labels(self): return set(self.customParamMap.keys()) | set(self.sampleParamMap.keys()) @property def notallowed_custom(self): return set(self.sampleParamMap.keys()) def CreateDataset(self, complete=True): # Avoid name collisions: notallowed = self.notallowed_parameters datasetParamMap = {k:v for k,v in self.datasetParamMap.items() if k not in notallowed} notallowed = self.notallowed_labels labels_map = {k:v for k,v in self.labels_map.items() if k not in notallowed} return self.metadataWorker.createDataset( complete, self.attr_proposal_read, self.beamlineID.lower(), self.attr_datasetName_read, self.attr_dataFolder_read, self.startDate, self.endDate, labels_map, datasetParamMap, self.attr_sampleName_read, self.sampleParamMap, self.attr_datafileList_read) def CleanupDatafiles(self): # clean the datafile list self.attr_datafileList_read = [] # clean the last datafile self.proxy.write_attribute("lastDatafile", "") ''' Gets default data root. Replacing dataRoot, proposal, sampleName, datasetName and beamlineId ''' def DefaultFolder(self): if self.datasetParentLocation is '': folder = self._replaceInPath(self.dataFolderPattern) #self.info_stream("Replacing dataRoot folder. dataFolderPattern=%s folder=%s" % (self.dataFolderPattern, folder)) else: patternDatasetParentLocation = os.path.join(self.datasetParentLocation, '{datasetName}') folder = self._replaceInPath(patternDatasetParentLocation) #self.info_stream("Replacing datasetParentLocation folder. datasetParentLocation=%s patternDatasetParentLocation=%s folder=%s" % (self.datasetParentLocation, patternDatasetParentLocation, folder)) return folder def _replaceInPath(self, s): if self.dataRoot: s = s.replace('{dataRoot}', self.dataRoot) if self.attr_proposal_read: s = s.replace('{proposal}', self.metadataWorker.Sanitize(self.attr_proposal_read)) if self.attr_sampleName_read: s = s.replace('{sampleName}', self.metadataWorker.Sanitize(self.attr_sampleName_read)) if self.attr_datasetName_read: s = s.replace('{datasetName}', self.metadataWorker.Sanitize(self.attr_datasetName_read)) s = s.replace('{beamlineID}', self.metadataWorker.Sanitize(self.beamlineID.lower())) return os.path.normpath(s) def UpdateState(self): if self.running: self.set_state(tango.DevState.RUNNING) self.set_status("Dataset running") elif not self.attr_proposal_read: self.set_state(tango.DevState.OFF) self.set_status("No experiment started") elif self.attr_sampleName_read and self.attr_datasetName_read: self.set_state(tango.DevState.ON) self.set_status("Ready to start dataset") else: self.set_state(tango.DevState.STANDBY) self.set_status("Experiment " + self.attr_proposal_read + " started") try: self.append_status(self.client.getStatus(),True) except Exception: self.debug_stream("Error appending status") def push_event(self, ev): self.info_stream("Receiving event: " + ev.attr_name) if ev.err is True: self.error_stream("Error receiving event") elif ev.attr_name.endswith(self.metaExperimentDevice + "/proposal"): data = ev.attr_value.value if 'please enter' == data: data = '' if data != self.attr_proposal_read: self.attr_proposal_read=data self.attr_datasetNameList_read = [] self.UpdateState() elif ev.attr_name.endswith(self.metaExperimentDevice + "/sample"): # does nothing if the sample name does not change data = ev.attr_value.value if 'please enter' == data: data = '' if data != self.attr_sampleName_read: # we clear sample parameters on a new sample self.sampleParamMap.clear() # clears dataset name if self.attr_sampleName_read and not(self.get_state() == tango.DevState.OFF): self.proxy.write_attribute("datasetName", "") self.attr_sampleName_read=data self.UpdateState() elif ev.attr_name.endswith(self.metaExperimentDevice + "/dataroot"): self.dataRoot = ev.attr_value.value else: self.error_stream("Received event for unknown attribute:" + ev.attr_name) # Update data folder for each change self.attr_dataFolder_read = self.DefaultFolder() def appendMessage(self, msg): self.msgList.appendleft(msg) def getID(self): return "%s:%s/%s" % (self.proxy.get_db_host(), self.proxy.get_db_port(), self.get_name()) #----- PROTECTED REGION END -----# // MetadataManager.programmer_methods def SetDatasetFiles(self, attr): self.attr_datafileList_read = attr class MetadataManagerClass(tango.DeviceClass): #--------- Add you global class variables here -------------------------- #----- PROTECTED REGION ID(MetadataManager.global_class_variables) ENABLED START -----# #----- PROTECTED REGION END -----# // MetadataManager.global_class_variables def dyn_attr(self, dev_list): """Invoked to create dynamic attributes for the given devices. Default implementation calls :meth:`MetadataManager.initialize_dynamic_attributes` for each device :param dev_list: list of devices :type dev_list: :class:`tango.DeviceImpl`""" for dev in dev_list: try: dev.initialize_dynamic_attributes() except: import traceback dev.warn_stream("Failed to initialize dynamic attributes") dev.debug("Details: " + traceback.format_exc()) #----- PROTECTED REGION ID(MetadataManager.dyn_attr) ENABLED START -----# #----- PROTECTED REGION END -----# // MetadataManager.dyn_attr # Class Properties class_property_list = { 'queueURLs': [tango.DevVarStringArray, "URLs of the queues (including port)", ["bcu-mq-01.esrf.fr:61613"]], 'queueName': [tango.DevString, "Name of the stomp queue for sending ingestion requests", ["/queue/icatIngest"] ], 'authenticationPlugin': [tango.DevString, "Name of the authentication plugin needed for ICAT (Only needed for checking parameters, not for ingestion)", ["db"] ], 'username': [tango.DevString, "Username used for connecting to ICAT", ["reader"] ], 'password': [tango.DevString, "Password used for connecting to ICAT", ["reader"] ], 'server': [tango.DevString, "Name of the machine where ICAT server is running", ["icat.esrf.fr"] ], 'port': [tango.DevString, "Port where ICAT server is running", ["80"] ], 'icatplus_server': [tango.DevString, "Server where ICAT+ is running", ["https://icatplus.esrf.fr"] ], 'graylogServer': [tango.DevString, "Graylog Server", ["graylog-dau.esrf.fr"] ], 'graylogPort': [tango.DevInt, "Graylog port", [12203] ], 'API_KEY': [tango.DevString, "API_KEY for connecting to ICAT+", [""] ], 'jolokiaPort': [tango.DevInt, "Jolokia port (same host as queueURLs)", [8778] ] } # Device Properties device_property_list = { 'beamlineID': [tango.DevString, "Beamline ID for reporting metadata", [] ], 'parameters': [tango.DevVarStringArray, "Names of default dataset parameters.", []], 'labels': [tango.DevVarStringArray, "List of labels.", []], 'globalHDFfiles': [tango.DevVarStringArray, "List of proposal level hdf5 files to write.\nAllowed attributes are:\n{dataRoot} : data root folder (default to /data/visitor)\n{proposal} : proposal name\n{beamlineID} : beamline name", ["{dataRoot}/{proposal}/{beamlineID}/{proposal}-{beamlineID}.h5"]], 'dataFolderPattern': [tango.DevString, "Defines the absolute path to the data folder as a pattern based on attributes.\nAllowed attributes are:\n{dataRoot} : data root folder (default to /data/visitor)\n{proposal} : proposal name\n{beamlineID} : beamline name\n{sampleName} : sample name\n{datasetName} : dataset name", ["{dataRoot}/{proposal}/{beamlineID}/{sampleName}/{datasetName}"] ], 'metaExperimentDevice': [tango.DevString, "Device that manages proposal and sample", [] ] } # Command definitions cmd_list = { 'SetDatasetFiles': [[tango.DevVarStringArray, "none"], [tango.DevVoid, "none"]], 'IngestTomodbXML': [[tango.DevString, "Absolute path to the tomodb XML file"], [tango.DevVoid, "none"]], 'SetSampleParameters': [[tango.DevVarStringArray, "none"], [tango.DevVoid, "none"]], 'StartDataset': [[tango.DevVoid, "none"], [tango.DevVoid, "none"]], 'EndDataset': [[tango.DevVoid, "none"], [tango.DevVoid, "none"]], 'AbortDataset': [[tango.DevVoid, "none"], [tango.DevVoid, "none"]], 'InterruptDataset': [[tango.DevVoid, "none"], [tango.DevVoid, "none"]], 'Reset': [[tango.DevVoid, "none"], [tango.DevVoid, "none"]], 'CheckParameters': [[tango.DevVoid, "none"], [tango.DevString, "none"]], 'GetDefaultFolder': [[tango.DevVoid, "none"], [tango.DevString, "none"]], 'GetDataFolderPattern': [[tango.DevVoid, "none"], [tango.DevString, "none"]], 'SetDatasetParentLocation': [[tango.DevString, "none"], [tango.DevVoid, "none"]], 'GetDatasetParentLocation': [[tango.DevVoid, "none"], [tango.DevString, "none"]], 'ClearDatasetParentLocation': [[tango.DevVoid, "none"], [tango.DevVoid, "none"]], 'uploadBase64': [[tango.DevString, "Message"], [tango.DevVoid, "none"]], 'notifyError': [[tango.DevString, "Message"], [tango.DevVoid, "none"]], 'notifyInfo': [[tango.DevString, "Message"], [tango.DevVoid, "none"]], 'notifyDebug': [[tango.DevString, "Message"], [tango.DevVoid, "none"]], 'notifyCommand': [[tango.DevString, "Message"], [tango.DevVoid, "none"]], 'userComment': [[tango.DevString, "Message"], [tango.DevVoid, "none"]], "SetParameters": [ [tango.DevString, "JSON-representation of a parameter dictionary (overwrites)"], [tango.DevVoid, "none"], ], } # Attribute definitions attr_list = { 'beamlineID': [[tango.DevString, tango.SCALAR, tango.READ], { 'description': "Proposal ID", } ], 'proposal': [[tango.DevString, tango.SCALAR, tango.READ], { 'description': "Proposal ID", } ], 'sampleName': [[tango.DevString, tango.SCALAR, tango.READ], { 'description': "Sample name", } ], 'datasetName': [[tango.DevString, tango.SCALAR, tango.READ_WRITE], { 'description': "Dataset number", 'Memorized':"true_without_hard_applied" } ], 'metadataFile': [[tango.DevString, tango.SCALAR, tango.READ_WRITE], { 'description': "Name of the file for storing metadata", 'Memorized':"true_without_hard_applied" } ], 'lastDatafile': [[tango.DevString, tango.SCALAR, tango.READ_WRITE], { 'description': "Name of the last datafile registered", } ], 'dataFolder': [[tango.DevString, tango.SCALAR, tango.READ_WRITE], { 'description': "Absolute path to the data folder", 'Memorized':"true_without_hard_applied" } ], 'datasetState': [[tango.DevString, tango.SCALAR, tango.READ], { 'description': "dataset staet", } ], 'sampleParameters': [[tango.DevString, tango.SPECTRUM, tango.READ, 256], { 'description': "Array of key-value pairs dataset parameters", } ], 'datafileList': [[tango.DevString, tango.SPECTRUM, tango.READ, 256], { 'description': "Full list of datafiles registered for this dataset", } ], 'datasetNameList': [[tango.DevString, tango.SPECTRUM, tango.READ, 256]], 'messageList': [[tango.DevString, tango.SPECTRUM, tango.READ, 20]] } def main(): try: py = tango.Util(sys.argv) py.add_class(MetadataManagerClass, MetadataManager, 'MetadataManager') #----- PROTECTED REGION ID(MetadataManager.add_classes) ENABLED START -----# #----- PROTECTED REGION END -----# // MetadataManager.add_classes U = tango.Util.instance() U.server_init() U.server_run() except tango.DevFailed as e: print ('-------> Received a DevFailed exception:',e) except Exception as e: print ('-------> An unforeseen exception occured....',e) if __name__ == '__main__': main()