Commit 64bb68cc authored by Linus Pithan's avatar Linus Pithan
Browse files

add icat controller

parent c831cff1
......@@ -212,6 +212,10 @@ class Session:
self.__children_tree = None
self.__include_sessions = config_tree.get("include-sessions")
self.__config_aliases = config_tree.get("aliases", [])
if config_tree.get("icat-mapping"):
self.__icat_mapping = self.config.get(config_tree.get("icat-mapping"))
self.__icat_mapping = None
self.__default_user_script_homedir = config_tree.get("default-userscript-dir")
if self.__default_user_script_homedir and not self._get_user_script_home():
......@@ -282,6 +286,10 @@ class Session:
def _scripts_module_path(self):
return self.__scripts_module_path
def icat_mapping(self):
return self.__icat_mapping
def _child_session_iter(self):
sessions_tree = self.sessions_tree
for child_session in reversed(
# -*- coding: utf-8 -*-
# This file is part of the bliss project
# Copyright (c) 2015-2010 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
from bliss.icat.definitions import Definitions
from os.path import commonprefix
from bliss.common.protocols import IcatPublisher
from bliss.config.static import ConfigList
class ICATmeta:
def __init__(self, name, config):
self.config = config
self.mapping = config.get("mapping", {})
self.objects = config.get("objects", {})
self.positioners = config.get("positioners", {})
self.definitions = Definitions()
for key, obj in self.objects.items():
if not isinstance(obj, IcatPublisher):
raise RuntimeError(
f"{} ({key}) is not a valid metadata publisher!"
def get_metadata(self):
res = dict()
for key, value in self.mapping.items():
if isinstance(value, ConfigList):
res[key] = ""
for sub_value in value:
res[key] += str(sub_value) + " "
res[key] = str(value)
instrumentation = self.definitions.instrumentation._asdict()
for key, device in self.objects.items():
assert key in instrumentation, f"{key} is not a known icat field group"
assert hasattr(
device, "metadata"
), f"{} has no metadata function"
prefix = commonprefix(list(instrumentation[key].fields))
obj_meta = device.metadata()
for icat_key in instrumentation[key].fields:
obj_key = icat_key.split(prefix)[-1]
if obj_key in obj_meta:
res[icat_key] = str(obj_meta[obj_key])
positioners = self.definitions.positioners._asdict()
for key, pos_list in self.positioners.items():
if not isinstance(pos_list, list):
pos_list = list(pos_list)
assert key in positioners, f"{key} not a known positioner group in icat"
icat_name = [x for x in positioners[key].fields if "_name" in x][0]
icat_value = [x for x in positioners[key].fields if "_value" in x][0]
res[icat_name] = " ".join([ for x in pos_list])
res[icat_value] = " ".join([str(x.position) for x in pos_list])
return res
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment