#!/usr/bin/env python
# -*- coding:utf-8 -*-
##############################################################################
## license :
##============================================================================
##
## File : MetadataManager.py
##
## Project : Metadata Manager for ICAT
##
## This file is part of Tango device class.
##
## Tango is free software: you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation, either version 3 of the License, or
## (at your option) any later version.
##
## Tango is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with Tango. If not, see .
##
##
## $Author : christophe.cleva$
##
## $Revision : $
##
## $Date : $
##
## $HeadUrl : $
##============================================================================
## This file is generated by POGO
## (Program Obviously used to Generate tango Object)
##############################################################################
__all__ = ["MetadataManager", "MetadataManagerClass", "main"]
__docformat__ = 'restructuredtext'
import tango
import sys
# Add additional import
#----- PROTECTED REGION ID(MetadataManager.additionnal_import) ENABLED START -----#
import os.path
import sys, traceback
import string
import json
from datetime import datetime
from time import gmtime, strftime
import logging
try:
import requests
except ImportError:
requests = None
try:
import graypy
except ImportError:
graypy = None
from collections import deque
from metadata_manager.icat_ingest_metadata.tomodb import tomodbFile
from metadata_manager.icat_ingest_metadata.dataset import dataset, parameter, sample, datafile
from metadata_manager.MessagingClient import StompClient
from metadata_manager.TangoLoggingHandler import TangoLoggingHandler
from metadata_manager.ParameterDefinition import ParameterDefinition
from metadata_manager.ICATParameterReader import ICATParameterReader
from metadata_manager.MetadataWorker import MetadataWorker
from metadata_manager import tangoworker
#----- PROTECTED REGION END -----# // MetadataManager.additionnal_import
## Device States Description
## FAULT : Device is not functioning correctly
## ON : Device is ready to record dataset
## OFF : No experiment ongoing
## STANDBY : Experiment started, sample or dataset name is missing
## RUNNING : Dataset currently recorded
class MetadataManager (tango.LatestDeviceImpl):
""""""
def __init__(self, cl, name):
tango.LatestDeviceImpl.__init__(self,cl,name)
self.metadataWorker = MetadataWorker(self.get_name())
MetadataManager.init_device(self)
def delete_device(self):
self.debug_stream("In delete_device()")
#----- PROTECTED REGION ID(MetadataManager.delete_device) ENABLED START -----#
try:
if self.expdev.is_locked_by_me():
try:
self.expdev.unlock()
except Exception as e:
self.error_stream("Error unlocking MetaExp: %s" % e)
except Exception as e:
self.warn_stream("Error accessing MetaExp: %s" % e)
try:
self.expdev.unsubscribe_event(self.proposalEventId)
except Exception as e:
self.error_stream("Error unsubscribing: %s" % e)
try:
self.expdev.unsubscribe_event(self.sampleEventId)
except Exception as e:
self.error_stream("Error unsubscribing: %s" % e)
try:
self.expdev.unsubscribe_event(self.dataRootEventId)
except Exception as e:
self.error_stream("Error unsubscribing: %s" % e)
self.client.disconnect()
#----- PROTECTED REGION END -----# // MetadataManager.delete_device
def init_device(self):
self.get_device_properties(self.get_device_class())
self.attr_proposal_read = ''
self.attr_sampleName_read = ''
self.attr_datasetName_read = ''
self.attr_metadataFile_read = ''
self.attr_lastDatafile_read = ''
self.attr_dataFolder_read = ''
self.attr_sampleParameters_read = ['']
self.attr_datafileList_read = ['']
self.attr_datasetNameList_read = ['']
#----- PROTECTED REGION ID(MetadataManager.init_device) ENABLED START -----#
# init members
self.client = None
self.proxy = None
self.running = False
self.startDate = None
self.endDate = None
self.metadataFileName = None
self.expdev = None
self.proposalEventId = None
self.sampleEventId = None
self.dataRootEventId = None
self.dataRoot = ''
self.datasetParentLocation = ''
# Priority of dataset metadata keys
# (highest number is highest priority)
# 1. parameters(local): from attributes of this device
# 2. labels: from device property
# 3. parameters(local): from attributes of other devices
# 4. custom: added with SetParameters
# 5. sample:
# 6. globals: proposal, beamline, etc.
self.datasetParamMap = dict() # priority (local:1, remote:3)
self.parameterDefinitionMap = dict()
self.attributeProxyMap = dict()
self.labels_map = dict() # priority (2)
self.customParamMap = dict() # priority (4)
self.sampleParamMap = dict() # priority (5)
self.attr_datafileList_read = []
self.attr_datasetNameList_read = []
self.msgList = deque(maxlen=20)
# logger bridge
logging.getLogger().setLevel(logging.DEBUG)
self.logger = logging.getLogger(self.get_name())
self.logger.addHandler(TangoLoggingHandler(self))
if graypy != None:
# this depends on the version of graypy used
try:
self.logger.addHandler(graypy.GELFHandler(self.graylogServer, self.graylogPort))
except:
self.logger.addHandler(graypy.GELFUDPHandler(self.graylogServer, self.graylogPort))
# self client
self.proxy = tango.DeviceProxy(self.get_name())
# JMS client
# we need to proxy to be created
self.client = StompClient(self.queueURLs, self.queueName, self.beamlineID, manager=self, jolokia_port=self.jolokiaPort)
self.logger.info("Init MetadataManager. url=%s, queue=%s metadatamanager=%s beamline=%s" % (self.client.getConfigURL(), self.queueName, self.get_name(), self.beamlineID))
self.Connect()
# MetaExp client
self.expdev = tango.DeviceProxy(self.metaExperimentDevice)
self.proposalEventId = self.expdev.subscribe_event("Proposal", tango.EventType.CHANGE_EVENT, self, stateless=True)
self.sampleEventId = self.expdev.subscribe_event("Sample", tango.EventType.CHANGE_EVENT, self, stateless=True)
self.dataRootEventId = self.expdev.subscribe_event("DataRoot", tango.EventType.CHANGE_EVENT, self, stateless=True)
# replicate attributes initialisation without calling write_XXX methods
db = tango.Util.instance().get_database()
attr_map = db.get_device_attribute_property(self.get_name(),['datasetName','dataFolder','metadataFile'])
self.attr_datasetName_read = attr_map['datasetName'].get('__value',[''])[0]
self.attr_dataFolder_read = attr_map['dataFolder'].get('__value',[''])[0]
self.attr_metadataFile_read = attr_map['metadataFile'].get('__value',[''])[0]
if self.attr_proposal_read:
self.info_stream("Recovered proposal %s" % self.attr_proposal_read)
if self.attr_sampleName_read:
self.info_stream("Recovered sample name %s" % self.attr_sampleName_read)
if self.attr_datasetName_read:
self.info_stream("Recovered dataset name %s" % self.attr_datasetName_read)
if self.attr_dataFolder_read:
self.info_stream("Recovered data folder %s" % self.attr_dataFolder_read)
if self.attr_metadataFile_read:
self.info_stream("Recovered metadata file %s" % self.attr_metadataFile_read)
self.info_stream("Device init. Proposal=%s sample=%s dataset=%s dataFolder=%s metadataFile=%s" % (self.attr_proposal_read, self.attr_sampleName_read, self.attr_datasetName_read, self.attr_dataFolder_read, self.attr_metadataFile_read))
# update state based on attributes
self.UpdateState()
'''
Some getters for me to easily retrieve the info we will most likely use to log messages
'''
def get_proposal(self):
return self.attr_proposal_read
def get_sample_name(self):
return self.attr_sampleName_read
def get_dataset_name(self):
return self.attr_datasetName_read
def get_dataroot(self):
return self.dataRoot
def always_executed_hook(self):
return
#----- PROTECTED REGION ID(MetadataManager.always_executed_hook) ENABLED START -----#
#----- PROTECTED REGION END -----# // MetadataManager.always_executed_hook
#--------------------------------------------------------------------------
# MetadataManager read/write attribute methods
#--------------------------------------------------------------------------
def read_beamlineID(self, attr):
attr.set_value(self.beamlineID.lower())
def read_proposal(self, attr):
attr.set_value(self.attr_proposal_read)
def read_datasetState(self, attr):
attr.set_value(str(self.dev_state()))
def is_proposal_allowed(self, attr):
if attr==tango.AttReqType.READ_REQ:
state_ok = not(self.get_state() in [tango.DevState.FAULT])
else:
state_ok = not(self.get_state() in [tango.DevState.FAULT,
tango.DevState.RUNNING])
return state_ok
def read_sampleName(self, attr):
attr.set_value(self.attr_sampleName_read)
self.logger.info("read_sampleName. sampleName=%s metadatamanager=%s" % (self.attr_sampleName_read, self.get_name()))
def is_sampleName_allowed(self, attr):
if attr==tango.AttReqType.READ_REQ:
state_ok = not(self.get_state() in [tango.DevState.FAULT,
tango.DevState.OFF])
else:
state_ok = not(self.get_state() in [tango.DevState.FAULT,
tango.DevState.OFF,
tango.DevState.RUNNING])
return state_ok
def read_datasetName(self, attr):
attr.set_value(self.attr_datasetName_read)
def write_datasetName(self, attr):
datasetName = attr.get_write_value()
#if datasetName in self.attr_datasetNameList_read:
# self.logger.error("datasetName already used. datasetName=%s metadatamanager=%s" % (datasetName, self.get_name()))
# tango.Except.throw_exception("datasetName already used: %s" % datasetName, "datasetNames have to be unique for a proposal", "datasetName")
if self.is_dataFolder_allowed(tango.AttReqType.WRITE_REQ):
self.proxy.write_attribute("dataFolder", "")
self.attr_dataFolder_read=""
if self.is_metadataFile_allowed(tango.AttReqType.WRITE_REQ):
self.proxy.write_attribute("metadataFile", "")
self.attr_metadataFile_read=""
self.attr_datasetName_read=datasetName
self.UpdateState()
def is_datasetName_allowed(self, attr):
if attr==tango.AttReqType.READ_REQ:
state_ok = not(self.get_state() in [tango.DevState.FAULT, tango.DevState.OFF])
else:
state_ok = not(self.get_state() in [tango.DevState.FAULT, tango.DevState.OFF, tango.DevState.RUNNING])
if state_ok and self.attr_sampleName_read:
state_ok = True
else:
state_ok = False
return state_ok
def read_metadataFile(self, attr):
if self.attr_metadataFile_read :
attr.set_value(self.metadataWorker.Sanitize(self.attr_metadataFile_read))
else :
attr.set_value(self.metadataWorker.DefaultFileName(self.attr_proposal_read, self.attr_sampleName_read, self.attr_datasetName_read))
def write_metadataFile(self, attr):
self.attr_metadataFile_read= attr.get_write_value()
def is_metadataFile_allowed(self, attr):
if attr==tango.AttReqType.READ_REQ:
state_ok = not(self.get_state() in [tango.DevState.FAULT, tango.DevState.OFF, tango.DevState.STANDBY])
else:
state_ok = not(self.get_state() in [tango.DevState.FAULT, tango.DevState.OFF,tango.DevState.STANDBY,tango.DevState.RUNNING])
return state_ok
def read_lastDatafile(self, attr):
attr.set_value(self.attr_lastDatafile_read)
def write_lastDatafile(self, attr):
data = attr.get_write_value()
if data and data != self.attr_lastDatafile_read:
self.attr_datafileList_read.append(data)
self.attr_lastDatafile_read = data
def is_lastDatafile_allowed(self, attr):
if attr==tango.AttReqType.READ_REQ:
state_ok = not(self.get_state() in [tango.DevState.FAULT,tango.DevState.ON,tango.DevState.OFF,tango.DevState.STANDBY])
else:
state_ok = not(self.get_state() in [tango.DevState.FAULT,tango.DevState.ON,tango.DevState.OFF,tango.DevState.STANDBY])
return state_ok
def read_dataFolder(self, attr):
if not self.attr_dataFolder_read:
self.attr_dataFolder_read = self.DefaultFolder()
attr.set_value(self.attr_dataFolder_read)
def write_dataFolder(self, attr):
self.attr_dataFolder_read = attr.get_write_value()
def is_dataFolder_allowed(self, attr):
if attr==tango.AttReqType.READ_REQ:
state_ok = not(self.get_state() in [tango.DevState.FAULT])
else:
state_ok = not(self.get_state() in [tango.DevState.FAULT,tango.DevState.OFF,tango.DevState.STANDBY,tango.DevState.RUNNING])
return state_ok
def read_CheckParameters(self):
self.debug_stream("In read_CheckParameters()")
def is_CheckParameters_allowed(self):
self.debug_stream("In is_CheckParameters_allowed()")
return True
def CheckParameters(self):
reader = ICATParameterReader(self.get_name(), self.authenticationPlugin, self.username, self.password, self.server, self.port)
return str(reader.compare(self.parameters, self.labels))
# returns the sample parameters set by the SetSampleParameters command as key value pairs
# the list is dynamically created on each call
# returns "None" if list is empty
def read_sampleParameters(self, attr):
self.attr_sampleParameters_read=[]
for k in self.sampleParamMap:
self.attr_sampleParameters_read.append(k)
self.attr_sampleParameters_read.append(self.sampleParamMap[k])
if not self.attr_sampleParameters_read:
self.attr_sampleParameters_read.append("None")
attr.set_value(self.attr_sampleParameters_read)
def is_sampleParameters_allowed(self, attr):
if attr==tango.AttReqType.READ_REQ:
state_ok = not(self.get_state() in [tango.DevState.FAULT,
tango.DevState.OFF])
else:
state_ok = not(self.get_state() in [])
if state_ok and self.attr_sampleName_read:
state_ok = True
else:
state_ok = False
return state_ok
def read_datafileList(self, attr):
if self.attr_datafileList_read:
attr.set_value(self.attr_datafileList_read)
else :
attr.set_value(["None"])
def is_datafileList_allowed(self, attr):
if attr==tango.AttReqType.READ_REQ:
state_ok = not(self.get_state() in [tango.DevState.FAULT,
tango.DevState.ON,
tango.DevState.OFF,
tango.DevState.STANDBY])
else:
state_ok = not(self.get_state() in [])
#----- PROTECTED REGION ID(MetadataManager.is_datafileList_allowed) ENABLED START -----#
#----- PROTECTED REGION END -----# // MetadataManager.is_datafileList_allowed
return state_ok
def read_datasetNameList(self, attr):
#----- PROTECTED REGION ID(MetadataManager.datasetNameList_read) ENABLED START -----#
attr.set_value(self.attr_datasetNameList_read)
#----- PROTECTED REGION END -----# // MetadataManager.datasetNameList_read
def read_messageList(self, attr):
#----- PROTECTED REGION ID(MetadataManager.messageList_read) ENABLED START -----#
attr.set_value(self.msgList)
#----- PROTECTED REGION END -----# // MetadataManager.messageList_read
def read_parameter(self, attr):
try:
paramdef = self.parameterDefinitionMap.get(attr.get_name(), '')
if paramdef.isRemote():
attr.set_value(paramdef.getValue(self.attributeProxyMap))
else:
attr.set_value(self.datasetParamMap.get(attr.get_name(), ''))
except:
traceback.print_exc(file=sys.stdout)
def read_label(self, attr):
attr.set_value(self.labels_map[attr.get_name()])
def write_parameter(self, attr):
self.debug_stream("In write_parameter()")
data = attr.get_write_value()
self.datasetParamMap[attr.get_name()]=data
def initialize_dynamic_attributes(self):
# create dynamic parameters as defined in the device property
# these need to be set_memorized_init(True) to be registered in the map on initialisation
if self.parameters:
for p in self.parameters:
# create parameter definiton
d = ParameterDefinition(p)
pname = d.getParameterName()
self.parameterDefinitionMap[pname] = d
# add attributes: local -> RW, remote -> RO
if d.isLocal():
self.info_stream("Local attribute: %s" %pname)
param = tango.Attr(pname, tango.DevString, tango.READ_WRITE)
param.set_memorized()
#param.set_memorized_init(True)
param.set_memorized_init(False)
self.add_attribute(param, MetadataManager.read_parameter, MetadataManager.write_parameter, None)
# register an empty value on creation, will be populated on initialization
self.datasetParamMap[pname] = ''
else:
self.info_stream("Remote attribute: %s - from: %s" % (pname, d.getParameterDefinition()))
param = tango.Attr(pname, tango.DevString, tango.READ)
self.add_attribute(param, MetadataManager.read_parameter, None, None)
# create device proxies for all remote attributes
for dev in d.getRemoteAttributes():
if dev not in self.attributeProxyMap:
try:
self.attributeProxyMap[dev] = tango.AttributeProxy(dev)
except Exception as e:
self.error_stream("Error getting attribute proxy for %s: %s" %(dev, e))
if self.labels:
for l in self.labels:
aa = l.split("=")
if len(aa) > 1:
pname = aa[0].strip()
self.info_stream("Label: %s" % pname)
param = tango.Attr(pname, tango.DevString, tango.READ)
self.add_attribute(param, MetadataManager.read_label, None, None)
self.labels_map[pname] = aa[1]
def read_attr_hardware(self, data):
self.debug_stream("In read_attr_hardware()")
#--------------------------------------------------------------------------
# MetadataManager command methods
#--------------------------------------------------------------------------
def dev_state(self):
""" This command gets the device state (stored in its device_state data member) and returns it to the caller.
:return: Device state
:rtype: tango.CmdArgType.DevState
"""
self.debug_stream("In dev_state()")
argout = tango.DevState.UNKNOWN
#----- PROTECTED REGION ID(MetadataManager.State) ENABLED START -----#
self.Connect()
#----- PROTECTED REGION END -----# // MetadataManager.State
if argout != tango.DevState.ALARM:
tango.LatestDeviceImpl.dev_state(self)
return self.get_state()
def dev_status(self):
""" This command gets the device status (stored in its device_status data member) and returns it to the caller.
:return: Device status
:rtype: str
"""
self.debug_stream("In dev_status()")
#----- PROTECTED REGION ID(MetadataManager.Status) ENABLED START -----#
self.Connect()
#----- PROTECTED REGION END -----# // MetadataManager.Status
self.__status = tango.LatestDeviceImpl.dev_status(self)
return self.__status
def IngestTomodbXML(self, argin):
""" Start the ingestion of the given tomodb XML file
:param argin: Absolute path to the tomodb XML file
:type argin: tango.DevString
"""
self.debug_stream("In IngestTomodbXML()")
#----- PROTECTED REGION ID(MetadataManager.IngestTomodbXML) ENABLED START -----#
tomo = tomodbFile(self.attr_proposal_read, self.beamlineID.lower(), argin)
self.client.sendObject(tomo)
#----- PROTECTED REGION END -----# // MetadataManager.IngestTomodbXML
def is_IngestTomodbXML_allowed(self):
self.debug_stream("In is_IngestTomodbXML_allowed()")
state_ok = not(self.get_state() in [tango.DevState.FAULT,
tango.DevState.OFF,
tango.DevState.STANDBY,
tango.DevState.RUNNING])
#----- PROTECTED REGION ID(MetadataManager.is_IngestTomodbXML_allowed) ENABLED START -----#
#----- PROTECTED REGION END -----# // MetadataManager.is_IngestTomodbXML_allowed
return state_ok
def SetSampleParameters(self, argin):
""" Sets the sample parameters, input is an array of successive key-value pairs
:param argin:
:type argin: tango.DevVarStringArray
"""
self.debug_stream("In SetSampleParameters()")
#----- PROTECTED REGION ID(MetadataManager.SetSampleParameters) ENABLED START -----#
if len(argin) % 2 != 0:
self.warn_stream("Sample Parameters have to be key-value pairs, last value will be missing")
argin.append("missing")
for i in range(len(argin) / 2):
self.sampleParamMap[argin[i*2]] = argin[i*2+1]
#----- PROTECTED REGION END -----# // MetadataManager.SetSampleParameters
def is_SetSampleParameters_allowed(self):
self.debug_stream("In is_SetSampleParameters_allowed()")
state_ok = not(self.get_state() in [tango.DevState.FAULT,
tango.DevState.OFF,
tango.DevState.RUNNING])
#----- PROTECTED REGION ID(MetadataManager.is_SetSampleParameters_allowed) ENABLED START -----#
if state_ok and self.attr_sampleName_read:
state_ok = True
else:
state_ok = False
#----- PROTECTED REGION END -----# // MetadataManager.is_SetSampleParameters_allowed
return state_ok
def GetDefaultFolder(self):
return self.DefaultFolder()
def GetDataFolderPattern(self):
return self.dataFolderPattern
def SetDatasetParentLocation(self, datasetParentLocation):
self.datasetParentLocation = datasetParentLocation
self.info_stream("Set datasetParentLocation. datasetParentLocation=%s metadatamanager=%s" % (self.datasetParentLocation, self.get_name()))
def GetDatasetParentLocation(self):
return self.datasetParentLocation
def ClearDatasetParentLocation(self):
self.datasetParentLocation = ''
self.info_stream("Clear datasetParentLocation. datasetParentLocation=%s metadatamanager=%s" % (self.datasetParentLocation, self.get_name()))
def StartDataset(self):
""" Indicates the beginning of a dataset. Sets the device in the RUNNING state.
"""
try:
self.expdev.lock(tango.constants.DEFAULT_LOCK_VALIDITY)
self.running = True
self.set_state(tango.DevState.RUNNING)
self.startDate = datetime.now()
self.entries = dict()
self.files = []
self.logger.info("Dataset is starting. proposal=%s sample=%s datasetName=%s metadatamanager=%s datasetFolder=%s" % (self.get_proposal(), self.get_sample_name(), self.get_dataset_name(), self.get_name(), self.DefaultFolder()))
tangoworker.execute(self._export_initial)
self.logger.info("Dataset is started. proposal=%s sample=%s datasetName=%s metadatamanager=%s datasetFolder=%s" % (self.get_proposal(), self.get_sample_name(), self.get_dataset_name(), self.get_name(), self.DefaultFolder()))
for datasetParam in self.datasetParamMap:
self.datasetParamMap[datasetParam] = ''
self.customParamMap = dict()
except:
traceback.print_exc(file=sys.stdout)
def is_StartDataset_allowed(self):
return not(self.get_state() in [tango.DevState.FAULT,tango.DevState.OFF,tango.DevState.STANDBY,tango.DevState.RUNNING])
def EndDataset(self):
""" Indicates the end of the dataset in normal conditions. Triggers reporting of metadata.
"""
self.endDate = datetime.now()
dataset = self.CreateDataset()
self.logger.info("Dataset is ending. proposal=%s sample=%s datasetName=%s metadatamanager=%s datasetFolder=%s" % (self.get_proposal(), self.get_sample_name(), self.get_dataset_name(), self.get_name(), self.DefaultFolder()))
tangoworker.execute(self._export_final, dataset, self.metadataFileName, self.entries, self.files)
# cleanup
self.CleanupDatafiles()
self.running = False
self.UpdateState()
# keep dataset name in list
self.attr_datasetNameList_read.append(self.attr_datasetName_read)
# clean the dataset
self.logger.info("Dataset has ended. proposal=%s sample=%s datasetName=%s metadatamanager=%s datasetFolder=%s" % (self.get_proposal(), self.get_sample_name(), self.get_dataset_name(), self.get_name(), self.DefaultFolder()))
self.proxy.write_attribute("datasetName", "")
self.expdev.unlock()
def is_EndDataset_allowed(self):
self.debug_stream("In is_EndDataset_allowed()")
state_ok = not(self.get_state() in [tango.DevState.FAULT,
tango.DevState.ON,
tango.DevState.OFF,
tango.DevState.STANDBY])
return state_ok
# This sends a notification to the electronic logbook (ICAT+)
def sendNotification(self, eventType, category, message):
try:
if requests:
# The content of the message
content = [{"format" : "plainText", "text" : message}]
datasetName = None
if self.attr_datasetName_read :
datasetName = self.attr_datasetName_read
params = dict(
{
"type" : eventType,
"datasetName" : datasetName,
"category" : category,
"content" : content,
"creationDate" : str(datetime.now().isoformat())
}
)
# Some versions of requests have not got this configuration
#requests.adapters.DEFAULT_RETRIES = 1
headers = {'Content-Type': 'application/json', 'Accept':'application/json'}
url = self.icatplus_server + "/logbook/" + str(self.API_KEY) + "/investigation/name/" + str(self.get_proposal()) + "/instrument/name/" + str(self.beamlineID.lower()) + "/event"
self.logger.debug(url)
requests.post(url,
data=(json.dumps(params)),
headers=headers,
timeout=0.1)
else:
self.logger.info("No requests installed")
except Exception as e:
sys.excepthook(*sys.exc_info())
pass
def uploadBase64(self, base64):
try:
if requests:
params = dict(
{
"base64" : base64,
"creationDate" : str(datetime.now().isoformat())
}
)
headers = {'Content-Type': 'application/json', 'Accept':'application/json'}
url = self.icatplus_server + "/logbook/" + str(self.API_KEY) + "/investigation/name/" + str(self.get_proposal()) + "/instrument/name/" + str(self.beamlineID.lower()) + "/event/createFrombase64"
self.logger.debug(url)
requests.post(url,
data=(json.dumps(params)),
headers=headers,
timeout=0.1)
else:
self.logger.info("No requests installed")
except Exception as e:
sys.excepthook(*sys.exc_info())
pass
def notifyInfo(self, message):
self.sendNotification("notification", "info", message)
def notifyDebug(self, message):
self.sendNotification("notification", "debug", message)
def notifyCommand(self, message):
self.sendNotification("notification", "commandLine", message)
def notifyError(self, message):
self.sendNotification("notification", "error", message)
def userComment(self, message):
self.sendNotification("annotation", "comment", message)
def InterruptDataset(self):
self.debug_stream("In InterruptDataset()")
self.endDate = datetime.now()
dts = self.CreateDataset(False)
tangoworker.execute(self._export_final, dts, self.metadataFileName, self.entries, self.files)
self.CleanupDatafiles()
self.running = False
self.UpdateState()
self.attr_datasetNameList_read.append(self.attr_datasetName_read)
self.proxy.write_attribute("datasetName", "")
self.expdev.unlock()
self.debug_stream("InterruptDataset() done")
#----- PROTECTED REGION END -----# // MetadataManager.EndDataset
def is_InterruptDataset_allowed(self):
self.debug_stream("In is_EndDataset_allowed()")
state_ok = not(self.get_state() in [tango.DevState.FAULT,
tango.DevState.ON,
tango.DevState.OFF,
tango.DevState.STANDBY])
#----- PROTECTED REGION ID(MetadataManager.is_EndDataset_allowed) ENABLED START -----#
#----- PROTECTED REGION END -----# // MetadataManager.is_EndDataset_allowed
return state_ok
def AbortDataset(self):
""" Indicates the abnormal termination of a running dataset. Does not report metadata.
"""
self.debug_stream("In AbortDataset()")
#----- PROTECTED REGION ID(MetadataManager.AbortDataset) ENABLED START -----#
self.CleanupDatafiles()
self.running = False
self.set_state(tango.DevState.ON)
self.expdev.unlock()
#----- PROTECTED REGION END -----# // MetadataManager.AbortDataset
def is_AbortDataset_allowed(self):
self.debug_stream("In is_AbortDataset_allowed()")
state_ok = not(self.get_state() in [tango.DevState.FAULT,
tango.DevState.ON,
tango.DevState.OFF,
tango.DevState.STANDBY])
#----- PROTECTED REGION ID(MetadataManager.is_AbortDataset_allowed) ENABLED START -----#
#----- PROTECTED REGION END -----# // MetadataManager.is_AbortDataset_allowed
return state_ok
def Reset(self):
""" Resets the device to the OFF state. Clears attributes.
"""
self.debug_stream("In Reset()")
#----- PROTECTED REGION ID(MetadataManager.Reset) ENABLED START -----#
# clears the proposal, this cascade clears the sampleName, sample parameters and datasetName
self.proxy.write_attribute("datasetName", "")
#----- PROTECTED REGION END -----# // MetadataManager.Reset
def is_Reset_allowed(self):
self.debug_stream("In is_Reset_allowed()")
state_ok = not(self.get_state() in [tango.DevState.FAULT,
tango.DevState.RUNNING])
#----- PROTECTED REGION ID(MetadataManager.is_Reset_allowed) ENABLED START -----#
#----- PROTECTED REGION END -----# // MetadataManager.is_Reset_allowed
return state_ok
def addParametersToDataSet(self, dataset, parameters):
for parameter in parameters:
dataset.add_parameter(parameter)
return dataset
#----- PROTECTED REGION ID(MetadataManager.programmer_methods) ENABLED START -----#
def _export_final(self, dataset, metadatafilename, entries, files, ):
self.metadataWorker.exportFinal(dataset, metadatafilename, entries, files, self.attr_metadataFile_read, self.attr_proposal_read, self.attr_sampleName_read, self.attr_datasetName_read, self.getParameters(), self.client)
def _export_initial(self):
try:
# now we need to make sure the datafolder attribute is correct
if not self.attr_dataFolder_read:
self.attr_dataFolder_read = self.DefaultFolder()
# record h5 file name for later stage
if os.path.exists(self.attr_dataFolder_read) and os.access(self.attr_dataFolder_read, os.W_OK | os.X_OK):
self.metadataFileName = self.metadataWorker.MakeFileName(self.attr_dataFolder_read, "h5", self.attr_metadataFile_read, self.attr_proposal_read, self.attr_sampleName_read, self.attr_datasetName_read, True)
else:
self.warn_stream("Folder %s does not exist, or is not writable, will save data in local folder" % self.attr_dataFolder_read)
self.metadataFileName = self.metadataWorker.MakeFileName("data", "h5", self.attr_metadataFile_read, self.attr_proposal_read, self.attr_sampleName_read, self.attr_datasetName_read, False)
# write the initial parameters
self.endDate = None #datetime.now()
dataset = self.CreateDataset()
# we need to prepare everything for async processing
self.entries = dict()
self.files = []
self.debug_stream("globalHDFfiles %s" % (self.globalHDFfiles))
for f in self.globalHDFfiles:
self.files.append(self._replaceInPath(f))
entries = self.metadataWorker.exportInitial(dataset, self.metadataFileName, self.entries, self.files, self.getParameters(), self.sampleParamMap)
except Exception as e:
traceback.print_exc(file=sys.stdout)
self.error_stream("Error exporting %s: %s" % (self.metadataFileName, e))
def _makedict(self, dataset, initial=None):
d = dict()
self.info_stream(">> makedict i");
d['proposal'] = dataset.get_investigation()
d['beamlineID'] = dataset.get_instrument()
d['datasetName'] = dataset.get_name()
d['location'] = dataset.get_location()
d['startDate'] = dataset.get_startDate()
d['endDate'] = dataset.get_endDate()
d['sampleName'] = dataset.get_sample().get_name()
self.info_stream(">> makedict iI");
for p in dataset.get_parameter():
d[p.get_name()] = p.get_value()
# add parameters from sample
for key in self.sampleParamMap:
self.info_stream(key + 'corresponds to' + self.sampleParamMap[key])
d[key] = self.sampleParamMap[key]
self.info_stream("\tMAKEDICT : [{0}] ".format(str(d)))
return d
def Connect(self):
try:
beat = self.get_attribute_poll_period('State') * 1.2
self.client.setHeartBeat(beat)
self.client.beat()
self.debug_stream("Connection status: %s" %self.client.getState())
self.UpdateState()
except Exception as e:
traceback.print_exc(file=sys.stdout)
self.warn_stream("Error retrieving poll value: %s" %e)
beat = 0
def SetParameters(self, json_string):
"""This will overwrite ICAT key-value pairs defined
by the "parameters" and "labels" device properties,
but not the sample parameters.
"""
self.customParamMap = json.loads(json_string)
original = set(self.parameters) | set(self.labels)
new = set(self.customParamMap.keys())
overwritten = len(original & new)
original = len(original) + len(self.sampleParamMap) - overwritten
new = len(new) - overwritten
self.info_stream(f"Add parameters ({new} new, {overwritten} overwritten, {original} original)")
# Gets the parameters defined on the parameters list in the tango device
# Return List
def getParameters(self):
"""Remote and custom parameters.
:returns list(parameter):
"""
try:
notallowed = self.notallowed_parameters
parameters = []
# add the linked parameters (need to get the value)
# async reads of every proxy
callids = dict()
for devstr, devproxy in self.attributeProxyMap.items():
if devproxy:
try:
callids[devproxy] = devproxy.read_asynch()
except Exception as e:
self.error_stream("Error reading attribute %s: %s" %(devstr, e))
# read replies and store values
values = dict()
for devstr, devproxy in self.attributeProxyMap.items():
v = None
if devproxy and devproxy in callids:
try:
v = devproxy.read_reply(callids[devproxy], 3000, tango.ExtractAs.Numpy).value
except Exception as e:
self.error_stream("Error getting attribute value %s: %s" %(devstr, e))
values[devstr] = v
# loop on param definition and add built values to dataset
for n, d in self.parameterDefinitionMap.items():
if d.isRemote():
v = d.buildValue(self.attributeProxyMap, values)
if v and n not in notallowed:
parameters.append(parameter(n, v))
except:
traceback.print_exc(file=sys.stdout)
notallowed = self.notallowed_custom
parameters += list(parameter(n, v) for n, v in self.customParamMap.items()
if n not in notallowed)
return parameters
@property
def notallowed_parameters(self):
return set(self.labels_map.keys()) | set(self.customParamMap.keys()) | set(self.sampleParamMap.keys())
@property
def notallowed_labels(self):
return set(self.customParamMap.keys()) | set(self.sampleParamMap.keys())
@property
def notallowed_custom(self):
return set(self.sampleParamMap.keys())
def CreateDataset(self, complete=True):
# Avoid name collisions:
notallowed = self.notallowed_parameters
datasetParamMap = {k:v for k,v in self.datasetParamMap.items()
if k not in notallowed}
notallowed = self.notallowed_labels
labels_map = {k:v for k,v in self.labels_map.items()
if k not in notallowed}
return self.metadataWorker.createDataset(
complete,
self.attr_proposal_read,
self.beamlineID.lower(),
self.attr_datasetName_read,
self.attr_dataFolder_read,
self.startDate,
self.endDate,
labels_map,
datasetParamMap,
self.attr_sampleName_read,
self.sampleParamMap,
self.attr_datafileList_read)
def CleanupDatafiles(self):
# clean the datafile list
self.attr_datafileList_read = []
# clean the last datafile
self.proxy.write_attribute("lastDatafile", "")
'''
Gets default data root. Replacing dataRoot, proposal, sampleName, datasetName and beamlineId
'''
def DefaultFolder(self):
if self.datasetParentLocation is '':
folder = self._replaceInPath(self.dataFolderPattern)
#self.info_stream("Replacing dataRoot folder. dataFolderPattern=%s folder=%s" % (self.dataFolderPattern, folder))
else:
patternDatasetParentLocation = os.path.join(self.datasetParentLocation, '{datasetName}')
folder = self._replaceInPath(patternDatasetParentLocation)
#self.info_stream("Replacing datasetParentLocation folder. datasetParentLocation=%s patternDatasetParentLocation=%s folder=%s" % (self.datasetParentLocation, patternDatasetParentLocation, folder))
return folder
def _replaceInPath(self, s):
if self.dataRoot:
s = s.replace('{dataRoot}', self.dataRoot)
if self.attr_proposal_read:
s = s.replace('{proposal}', self.metadataWorker.Sanitize(self.attr_proposal_read))
if self.attr_sampleName_read:
s = s.replace('{sampleName}', self.metadataWorker.Sanitize(self.attr_sampleName_read))
if self.attr_datasetName_read:
s = s.replace('{datasetName}', self.metadataWorker.Sanitize(self.attr_datasetName_read))
s = s.replace('{beamlineID}', self.metadataWorker.Sanitize(self.beamlineID.lower()))
return os.path.normpath(s)
def UpdateState(self):
if self.running:
self.set_state(tango.DevState.RUNNING)
self.set_status("Dataset running")
elif not self.attr_proposal_read:
self.set_state(tango.DevState.OFF)
self.set_status("No experiment started")
elif self.attr_sampleName_read and self.attr_datasetName_read:
self.set_state(tango.DevState.ON)
self.set_status("Ready to start dataset")
else:
self.set_state(tango.DevState.STANDBY)
self.set_status("Experiment " + self.attr_proposal_read + " started")
try:
self.append_status(self.client.getStatus(),True)
except Exception:
self.debug_stream("Error appending status")
def push_event(self, ev):
self.info_stream("Receiving event: " + ev.attr_name)
if ev.err is True:
self.error_stream("Error receiving event")
elif ev.attr_name.endswith(self.metaExperimentDevice + "/proposal"):
data = ev.attr_value.value
if 'please enter' == data:
data = ''
if data != self.attr_proposal_read:
self.attr_proposal_read=data
self.attr_datasetNameList_read = []
self.UpdateState()
elif ev.attr_name.endswith(self.metaExperimentDevice + "/sample"):
# does nothing if the sample name does not change
data = ev.attr_value.value
if 'please enter' == data:
data = ''
if data != self.attr_sampleName_read:
# we clear sample parameters on a new sample
self.sampleParamMap.clear()
# clears dataset name
if self.attr_sampleName_read and not(self.get_state() == tango.DevState.OFF):
self.proxy.write_attribute("datasetName", "")
self.attr_sampleName_read=data
self.UpdateState()
elif ev.attr_name.endswith(self.metaExperimentDevice + "/dataroot"):
self.dataRoot = ev.attr_value.value
else:
self.error_stream("Received event for unknown attribute:" + ev.attr_name)
# Update data folder for each change
self.attr_dataFolder_read = self.DefaultFolder()
def appendMessage(self, msg):
self.msgList.appendleft(msg)
def getID(self):
return "%s:%s/%s" % (self.proxy.get_db_host(), self.proxy.get_db_port(), self.get_name())
#----- PROTECTED REGION END -----# // MetadataManager.programmer_methods
def SetDatasetFiles(self, attr):
self.attr_datafileList_read = attr
class MetadataManagerClass(tango.DeviceClass):
#--------- Add you global class variables here --------------------------
#----- PROTECTED REGION ID(MetadataManager.global_class_variables) ENABLED START -----#
#----- PROTECTED REGION END -----# // MetadataManager.global_class_variables
def dyn_attr(self, dev_list):
"""Invoked to create dynamic attributes for the given devices.
Default implementation calls
:meth:`MetadataManager.initialize_dynamic_attributes` for each device
:param dev_list: list of devices
:type dev_list: :class:`tango.DeviceImpl`"""
for dev in dev_list:
try:
dev.initialize_dynamic_attributes()
except:
import traceback
dev.warn_stream("Failed to initialize dynamic attributes")
dev.debug("Details: " + traceback.format_exc())
#----- PROTECTED REGION ID(MetadataManager.dyn_attr) ENABLED START -----#
#----- PROTECTED REGION END -----# // MetadataManager.dyn_attr
# Class Properties
class_property_list = {
'queueURLs':
[tango.DevVarStringArray,
"URLs of the queues (including port)",
["bcu-mq-01.esrf.fr:61613"]],
'queueName':
[tango.DevString,
"Name of the stomp queue for sending ingestion requests",
["/queue/icatIngest"] ],
'authenticationPlugin':
[tango.DevString,
"Name of the authentication plugin needed for ICAT (Only needed for checking parameters, not for ingestion)",
["db"] ],
'username':
[tango.DevString,
"Username used for connecting to ICAT",
["reader"] ],
'password':
[tango.DevString,
"Password used for connecting to ICAT",
["reader"] ],
'server':
[tango.DevString,
"Name of the machine where ICAT server is running",
["icat.esrf.fr"] ],
'port':
[tango.DevString,
"Port where ICAT server is running",
["80"] ],
'icatplus_server':
[tango.DevString,
"Server where ICAT+ is running",
["https://icatplus.esrf.fr"] ],
'graylogServer':
[tango.DevString,
"Graylog Server",
["graylog-dau.esrf.fr"] ],
'graylogPort':
[tango.DevInt,
"Graylog port",
[12203] ],
'API_KEY':
[tango.DevString,
"API_KEY for connecting to ICAT+",
[""] ],
'jolokiaPort':
[tango.DevInt,
"Jolokia port (same host as queueURLs)",
[8778] ]
}
# Device Properties
device_property_list = {
'beamlineID':
[tango.DevString,
"Beamline ID for reporting metadata",
[] ],
'parameters':
[tango.DevVarStringArray,
"Names of default dataset parameters.",
[]],
'labels':
[tango.DevVarStringArray,
"List of labels.",
[]],
'globalHDFfiles':
[tango.DevVarStringArray,
"List of proposal level hdf5 files to write.\nAllowed attributes are:\n{dataRoot} : data root folder (default to /data/visitor)\n{proposal} : proposal name\n{beamlineID} : beamline name",
["{dataRoot}/{proposal}/{beamlineID}/{proposal}-{beamlineID}.h5"]],
'dataFolderPattern':
[tango.DevString,
"Defines the absolute path to the data folder as a pattern based on attributes.\nAllowed attributes are:\n{dataRoot} : data root folder (default to /data/visitor)\n{proposal} : proposal name\n{beamlineID} : beamline name\n{sampleName} : sample name\n{datasetName} : dataset name",
["{dataRoot}/{proposal}/{beamlineID}/{sampleName}/{datasetName}"] ],
'metaExperimentDevice':
[tango.DevString,
"Device that manages proposal and sample",
[] ]
}
# Command definitions
cmd_list = {
'SetDatasetFiles':
[[tango.DevVarStringArray, "none"],
[tango.DevVoid, "none"]],
'IngestTomodbXML':
[[tango.DevString, "Absolute path to the tomodb XML file"],
[tango.DevVoid, "none"]],
'SetSampleParameters':
[[tango.DevVarStringArray, "none"],
[tango.DevVoid, "none"]],
'StartDataset':
[[tango.DevVoid, "none"],
[tango.DevVoid, "none"]],
'EndDataset':
[[tango.DevVoid, "none"],
[tango.DevVoid, "none"]],
'AbortDataset':
[[tango.DevVoid, "none"],
[tango.DevVoid, "none"]],
'InterruptDataset':
[[tango.DevVoid, "none"],
[tango.DevVoid, "none"]],
'Reset':
[[tango.DevVoid, "none"],
[tango.DevVoid, "none"]],
'CheckParameters':
[[tango.DevVoid, "none"],
[tango.DevString, "none"]],
'GetDefaultFolder':
[[tango.DevVoid, "none"],
[tango.DevString, "none"]],
'GetDataFolderPattern':
[[tango.DevVoid, "none"],
[tango.DevString, "none"]],
'SetDatasetParentLocation':
[[tango.DevString, "none"],
[tango.DevVoid, "none"]],
'GetDatasetParentLocation':
[[tango.DevVoid, "none"],
[tango.DevString, "none"]],
'ClearDatasetParentLocation':
[[tango.DevVoid, "none"],
[tango.DevVoid, "none"]],
'uploadBase64':
[[tango.DevString, "Message"],
[tango.DevVoid, "none"]],
'notifyError':
[[tango.DevString, "Message"],
[tango.DevVoid, "none"]],
'notifyInfo':
[[tango.DevString, "Message"],
[tango.DevVoid, "none"]],
'notifyDebug':
[[tango.DevString, "Message"],
[tango.DevVoid, "none"]],
'notifyCommand':
[[tango.DevString, "Message"],
[tango.DevVoid, "none"]],
'userComment':
[[tango.DevString, "Message"],
[tango.DevVoid, "none"]],
"SetParameters": [
[tango.DevString, "JSON-representation of a parameter dictionary (overwrites)"],
[tango.DevVoid, "none"],
],
}
# Attribute definitions
attr_list = {
'beamlineID':
[[tango.DevString,
tango.SCALAR,
tango.READ],
{
'description': "Proposal ID",
} ],
'proposal':
[[tango.DevString,
tango.SCALAR,
tango.READ],
{
'description': "Proposal ID",
} ],
'sampleName':
[[tango.DevString,
tango.SCALAR,
tango.READ],
{
'description': "Sample name",
} ],
'datasetName':
[[tango.DevString,
tango.SCALAR,
tango.READ_WRITE],
{
'description': "Dataset number",
'Memorized':"true_without_hard_applied"
} ],
'metadataFile':
[[tango.DevString,
tango.SCALAR,
tango.READ_WRITE],
{
'description': "Name of the file for storing metadata",
'Memorized':"true_without_hard_applied"
} ],
'lastDatafile':
[[tango.DevString,
tango.SCALAR,
tango.READ_WRITE],
{
'description': "Name of the last datafile registered",
} ],
'dataFolder':
[[tango.DevString,
tango.SCALAR,
tango.READ_WRITE],
{
'description': "Absolute path to the data folder",
'Memorized':"true_without_hard_applied"
} ],
'datasetState':
[[tango.DevString,
tango.SCALAR,
tango.READ],
{
'description': "dataset staet",
} ],
'sampleParameters':
[[tango.DevString,
tango.SPECTRUM,
tango.READ, 256],
{
'description': "Array of key-value pairs dataset parameters",
} ],
'datafileList':
[[tango.DevString,
tango.SPECTRUM,
tango.READ, 256],
{
'description': "Full list of datafiles registered for this dataset",
} ],
'datasetNameList':
[[tango.DevString,
tango.SPECTRUM,
tango.READ, 256]],
'messageList':
[[tango.DevString,
tango.SPECTRUM,
tango.READ, 20]]
}
def main():
try:
py = tango.Util(sys.argv)
py.add_class(MetadataManagerClass, MetadataManager, 'MetadataManager')
#----- PROTECTED REGION ID(MetadataManager.add_classes) ENABLED START -----#
#----- PROTECTED REGION END -----# // MetadataManager.add_classes
U = tango.Util.instance()
U.server_init()
U.server_run()
except tango.DevFailed as e:
print ('-------> Received a DevFailed exception:',e)
except Exception as e:
print ('-------> An unforeseen exception occured....',e)
if __name__ == '__main__':
main()