Commit 7a4aab48 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

Client for direct communication with ICAT

parent dc11a614
"""Currently two ICAT clients exist with the same API:
1. `bliss.icat.client.IcatClient`: communicates directly with ICAT
2. `bliss.icat.client.IcatTangoProxy`: communicates through tango servers
`IcatClient` is used when configured in the beacon configuration
icat_servers:
metadata_urls: [URL1, URL2]
elogbook_url: URL3
elogbook_token: elogbook-00000000-0000-0000-0000-000000000000
The default URL prefix is "tcp://" for the metadata and "https://"
for the e-logbook.
When not configured BLISS will fall back to `IcatTangoProxy` which
requires two TANGO devices with names "id00/metaexp/session_name"
and "id00/metadata/session_name". The configuration is done in the
TANGO device class properties.
"""
from .main import IcatClient # noqa: F401
from .tango import IcatTangoProxy # noqa: F401
from .config import icat_client_from_config # noqa: F401
from bliss.config.static import get_config
from bliss.icat.client.main import IcatClient
def icat_client_from_config():
kwargs = get_config().root.get("icat_servers")
return IcatClient(**kwargs)
from enum import Enum
from datetime import datetime
import requests
import logging
import base64
import mimetypes
from bliss.icat.client.url import normalize_url
logger = logging.getLogger(__name__)
MessageCategory = Enum("MessageCategory", "debug info error commandLine comment")
MessageType = Enum("MessageType", "annotation notification")
MessageCategoryMapping = {
"debug": MessageCategory.debug,
"info": MessageCategory.info,
"warning": MessageCategory.error,
"warn": MessageCategory.error,
"error": MessageCategory.error,
"critical": MessageCategory.error,
"fatal": MessageCategory.error,
"command": MessageCategory.commandLine,
"comment": MessageCategory.comment,
}
MessageTypeMapping = {
MessageCategory.debug: MessageType.notification,
MessageCategory.info: MessageType.notification,
MessageCategory.error: MessageType.notification,
MessageCategory.commandLine: MessageType.notification,
MessageCategory.comment: MessageType.annotation,
}
class IcatElogbookClient:
"""Client for the e-logbook part of the ICAT+ REST API.
REST API docs:
https://icatplus.esrf.fr/api-docs/
The ICAT+ server project:
https://gitlab.esrf.fr/icat/icat-plus/-/blob/master/README.md
"""
DEFAULT_SCHEME = "https"
def __init__(
self, url: str, api_key: str = "elogbook-00000000-0000-0000-0000-000000000000"
):
url = normalize_url(url, default_scheme=self.DEFAULT_SCHEME)
self._message_url = f"{url}/logbook/{api_key}/investigation/name/{{proposal}}/instrument/name/{{beamline}}/event"
self._data_url = f"{url}/logbook/{api_key}/investigation/name/{{proposal}}/instrument/name/{{beamline}}/event/createFrombase64"
self.timeout = 0.5
def send_message(
self, message: str, category: str, proposal: str, beamline: str, dataset: str
):
url = self._message_url.format(proposal=proposal, beamline=beamline)
payload = self.encode_message(message, category, dataset)
requests.post(url, json=payload, timeout=self.timeout)
def send_text_file(self, filename: str, proposal: str, beamline: str, dataset: str):
with open(filename, "r") as f:
message = f.read()
self.send_message(
message,
category="comment",
proposal=proposal,
beamline=beamline,
dataset=dataset,
)
def send_data(self, data: bytes, mimetype: str, proposal: str, beamline: str):
url = self._data_url.format(proposal=proposal, beamline=beamline)
payload = self.encode_mime_data(data, mimetype)
requests.post(url, json=payload, timeout=self.timeout)
def send_binary_file(self, filename: str, proposal: str, beamline: str):
with open(filename, "rb") as f:
data = f.read()
mimetype, _ = mimetypes.guess_type(filename, strict=True)
self.send_data(data, mimetype=mimetype, proposal=proposal, beamline=beamline)
@staticmethod
def encode_mime_data(data: bytes, mimetype: str) -> str:
if not mimetype:
# arbitrary binary data
mimetype = "application/octet-stream"
data_header = f"data:{mimetype};base64,"
data_blob = base64.b64encode(data).decode("latin-1")
return {
"base64": data_header + data_blob,
"creationDate": str(datetime.now().isoformat()),
}
@staticmethod
def encode_message(message: str, category: str, dataset: str) -> str:
try:
category = MessageCategoryMapping[category.lower()]
except KeyError:
raise ValueError(category, "Not a valid e-logbook category") from None
message_type = MessageTypeMapping[category]
return {
"type": message_type.name,
"datasetName": dataset,
"category": category.name,
"content": [{"format": "plainText", "text": message}],
"creationDate": str(datetime.now().isoformat()),
}
from bliss.icat.client.elogbook import IcatElogbookClient
from bliss.icat.client.metadata import IcatMetadataClient
from bliss import current_session
class IcatClient:
"""Direct communication with ICAT: e-logbook and metadata
"""
def __init__(
self,
metadata_urls: list,
elogbook_url: str,
elogbook_token: str = "elogbook-00000000-0000-0000-0000-000000000000",
metadata_queue: str = "icatIngest",
metadata_queue_monitor_port: int = None,
):
self._elogbook = IcatElogbookClient(url=elogbook_url, api_key=elogbook_token)
self._metadata = IcatMetadataClient(
queue_urls=metadata_urls,
queue_name=metadata_queue,
monitor_port=metadata_queue_monitor_port,
)
def send_message(
self,
msg: str,
msg_type="comment",
proposal: str = None,
beamline: str = None,
dataset: str = None,
):
if proposal is None:
proposal = current_session.scan_saving.proposal_name
if beamline is None:
beamline = current_session.scan_saving.beamline
if dataset is None:
dataset = current_session.scan_saving.dataset_name
self._elogbook.send_message(
message=msg,
category=msg_type,
proposal=proposal,
beamline=beamline,
dataset=dataset,
)
def send_data(
self,
data: bytes,
mimetype: str = None,
proposal: str = None,
beamline: str = None,
):
if proposal is None:
proposal = current_session.scan_saving.proposal_name
if beamline is None:
beamline = current_session.scan_saving.beamline
self._elogbook.send_data(
data, mimetype=mimetype, proposal=proposal, beamline=beamline
)
def send_text_file(
self,
filename: str,
proposal: str = None,
beamline: str = None,
dataset: str = None,
):
if proposal is None:
proposal = current_session.scan_saving.proposal_name
if beamline is None:
beamline = current_session.scan_saving.beamline
if dataset is None:
dataset = current_session.scan_saving.dataset_name
self._elogbook.send_text_file(
filename, proposal=proposal, beamline=beamline, dataset=dataset
)
def send_binary_file(
self, filename: str, proposal: str = None, beamline: str = None
):
if proposal is None:
proposal = current_session.scan_saving.proposal_name
if beamline is None:
beamline = current_session.scan_saving.beamline
self._elogbook.send_binary_file(filename, proposal=proposal, beamline=beamline)
def start_investigation(
self, proposal: str = None, beamline: str = None, start_datetime=None
):
if proposal is None:
proposal = current_session.scan_saving.proposal_name
if beamline is None:
beamline = current_session.scan_saving.beamline
self._metadata.start_investigation(
proposal=proposal, beamline=beamline, start_datetime=start_datetime
)
def store_dataset(
self,
proposal: str = None,
beamline: str = None,
collection: str = None,
dataset: str = None,
path: str = None,
metadata: dict = None,
start_datetime=None,
end_datetime=None,
):
if proposal is None:
proposal = current_session.scan_saving.proposal_name
if beamline is None:
beamline = current_session.scan_saving.beamline
if collection is None:
collection = current_session.scan_saving.collection
if dataset is None:
dataset = current_session.scan_saving.dataset_name
if path is None:
path = current_session.scan_saving.icat_root_path
if metadata is None:
metadata = current_session.scan_saving.dataset.get_current_icat_metadata()
self._metadata.send_metadata(
proposal=proposal,
beamline=beamline,
collection=collection,
dataset=dataset,
path=path,
metadata=metadata,
start_datetime=start_datetime,
end_datetime=end_datetime,
)
def ping(self, *args, **kw):
"""For compatibility with IcatTangoProxy
"""
pass
import base64
import requests
from stompest.config import StompConfig
from stompest.protocol import StompSpec
from stompest.protocol import StompSession
from stompest.sync import Stomp
from bliss.icat.client.url import normalize_url
class IcatMessagingClient:
"""Client for the ICAT message broker.
The message broker is currently ActiveMQ, a message
broker using the STOMP protocol. It also has a REST
server for monitoring the broker status.
"""
DEFAULT_SCHEME = "tcp"
DEFAULT_PORT = 61613
MONITOR_SCHEME = "http"
DEFAULT_MONITOR_PORT = 8778
MONITOR_USER = "user"
MONITOR_PWD = "user"
def __init__(self, queue_urls: list, queue_name: str, monitor_port: int = None):
urls = [
normalize_url(
url, default_scheme=self.DEFAULT_SCHEME, default_port=self.DEFAULT_PORT
)
for url in queue_urls
]
failover = ",".join(urls)
url = f"failover:({failover})?maxReconnectAttempts=3,initialReconnectDelay=250,maxReconnectDelay=1000"
self.__max_version = StompSpec.VERSION_1_1
self.__client = Stomp(StompConfig(url, version=self.__max_version))
self.socket_timeout = None
self.connect_timeout = 1
self.__send_destination = "/queue/" + queue_name
self.__send_headers = {
"persistent": "true",
StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL,
}
if not monitor_port:
monitor_port = self.DEFAULT_MONITOR_PORT
self.__consumer_count_url = f"{self.MONITOR_SCHEME}://{{host}}:{monitor_port}/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=metadata,destinationType=Queue,destinationName={queue_name}/ConsumerCount"
self.__jolokia_headers = {
"Authorization": b"Basic "
+ base64.b64encode(f"{self.MONITOR_USER}:{self.MONITOR_PWD}".encode())
}
def close(self):
self.__client.close()
@property
def _connected_client(self):
if self.__client.session.state != StompSession.CONNECTED:
self.__client.connect(
versions=[self.__max_version],
connectTimeout=self.socket_timeout,
connectedTimeout=self.connect_timeout,
)
return self.__client
@property
def _host(self):
return self._connected_client._transport.host
def send(self, data: bytes):
self._connected_client.send(
self.__send_destination, body=data, headers=self.__send_headers
)
@property
def _consumer_count(self):
url = self.__consumer_count_url.format(host=self._host)
response = requests.get(url, headers=self.__jolokia_headers)
if not response.ok:
raise RuntimeError(
response, "Failed to retrieve the ActiveMQ consumer count"
)
response = response.json()
return response["value"]
def check_health(self):
"""Raises an exception when:
- not connected
- no message consumers
"""
state = self._connected_client.session.state
if state != StompSession.CONNECTED:
raise RuntimeError(
"The connection with the message broker is " + str(state).upper()
)
if self._consumer_count < 1:
raise RuntimeError("The message broker has no consumers")
import xml.etree.ElementTree as etree
from bliss.icat.client.xmlns import dataset_as_xml
from bliss.icat.client.xmlns import investigation_as_xml
from bliss.icat.client.messaging import IcatMessagingClient
class IcatMetadataClient:
"""Client for storing dataset metadata in ICAT.
"""
def __init__(
self, queue_urls: list, queue_name: str = "icatIngest", monitor_port: int = None
):
self._client = IcatMessagingClient(
queue_urls, queue_name, monitor_port=monitor_port
)
def send_metadata(
self,
proposal: str,
beamline: str,
collection: str,
dataset: str,
path: str,
metadata: dict,
start_datetime=None,
end_datetime=None,
):
root = dataset_as_xml(
proposal=proposal,
beamline=beamline,
collection=collection,
dataset=dataset,
path=path,
metadata=metadata,
start_datetime=start_datetime,
end_datetime=end_datetime,
)
self._client.send(etree.tostring(root))
def start_investigation(self, proposal: str, beamline: str, start_datetime=None):
root = investigation_as_xml(
proposal=proposal, beamline=beamline, start_datetime=start_datetime
)
self._client.send(etree.tostring(root))
def check_health(self):
"""Raises an exception when not healthy
"""
self._client.check_health()
......@@ -14,7 +14,7 @@ Currently the communication goes through two tango devices.
To start communication with the ICAT ingester, it is sufficient
to instantiate a proxy:
icat_proxy = IcatIngesterProxy(beamline, session)
icat_proxy = IcatTangoProxy(beamline, session)
The tango devices are discovered automatically based on beamline
name and Bliss session name.
......@@ -26,6 +26,7 @@ import functools
import json
import logging
import warnings
import base64
from bliss.common.tango import DeviceProxy, DevState
from bliss.tango.clients.utils import (
is_devfailed,
......@@ -40,7 +41,7 @@ logger = logging.getLogger(__name__)
class IcatError(RuntimeError):
"""Unified exception raised by IcatIngesterProxy, from Timeout or Devfailed
"""Unified exception raised by IcatTangoProxy, from Timeout or Devfailed
(the cause is preserved).
"""
......@@ -285,7 +286,7 @@ class IcatDeviceProxy:
raise
class IcatIngesterProxy(object):
class IcatTangoProxy:
"""This class provides one API to ICAT ingester (currently two tango devices) in a singleton pattern.
It is a proxy and therefore does not contain state.
"""
......@@ -674,15 +675,15 @@ class IcatIngesterProxy(object):
)
@icat_comm
def start_dataset(self, proposal, sample, dataset, path, comm_state=None):
"""Set the proposal, sample and dataset name. Then start the
def start_dataset(self, proposal, collection, dataset, path, comm_state=None):
"""Set the proposal, collection and dataset name. Then start the
dataset. The final state (when not exception is raised)
will be RUNNING.
This method is NOT always idempotent as it modifies the state.
:param str proposal:
:param str sample:
:param str collection:
:param str dataset:
:param str path: full path of the dataset
:param dict comm_state:
......@@ -691,8 +692,8 @@ class IcatIngesterProxy(object):
comm_state["error_msg"] = "Failed to start the ICAT dataset"
if proposal != self.get_proposal(comm_state=comm_state):
self.set_proposal(proposal, comm_state=comm_state)
if sample != self.get_sample(comm_state=comm_state):
self.set_sample(sample, comm_state=comm_state)
if collection != self.get_sample(comm_state=comm_state):
self.set_sample(collection, comm_state=comm_state)
if dataset != self.get_dataset(comm_state=comm_state):
self.set_dataset(dataset, comm_state=comm_state)
if path != self.get_path(comm_state=comm_state):
......@@ -753,21 +754,31 @@ class IcatIngesterProxy(object):
except IcatError as e:
logger.error(self, f"elogbook: {e}")
@icat_comm
def send_data(self, data, mimetype=None, comm_state=None):
data = f"data:{mimetype};base64," + base64.b64encode(data).decode("latin-1")
try:
self.metadata_manager.exec_command(
"uploadBase64", args=(data,), comm_state=comm_state
)
except IcatError as e:
logger.error(self, f"elogbook: {e}")
@icat_comm
def store_dataset(
self, proposal, sample, dataset, path, metadata=None, comm_state=None
self, proposal, collection, dataset, path, metadata=None, comm_state=None
):
"""Send a new dataset to the ICAT ingester.
:param str proposal:
:param str sample:
:param str collection:
:param str dataset:
:param str path: full path of the dataset
:param dict metadata: optional dataset metadata
:param dict comm_state:
:raises IcatError:
"""
self.start_dataset(proposal, sample, dataset, path, comm_state=comm_state)
self.start_dataset(proposal, collection, dataset, path, comm_state=comm_state)
comm_state["error_msg"] = "Failed to push ICAT metadata"
if metadata:
json_string = json.dumps(metadata)
......
from urllib.parse import urlparse
def normalize_url(
url: str,
default_scheme: str = None,
default_port: int = None,
absolute: bool = True,
) -> str:
if not url:
raise ValueError("URL is missing")
if absolute and "//" not in url:
url = "//" + url
result = urlparse(url)
scheme, netloc, *others = result
if not netloc:
raise ValueError(url, "URL is missing a network location")
if not scheme and default_scheme:
scheme = default_scheme
if default_port and not result.port:
netloc = f"{result.hostname}:{default_port}"
newurl = type(result)(scheme, netloc, *others)
return newurl.geturl()
from datetime import datetime
import xml.etree.ElementTree as etree
ICAT_NAMESPACE_URL = "http://www.esrf.fr/icat"
etree.register_namespace("tns", ICAT_NAMESPACE_URL)
def root_node(name: str, **kw):
return etree.Element(f"{{{ICAT_NAMESPACE_URL}}}{name}", **kw)
def child_node(parent, name: str, **kw):
return etree.SubElement(parent, f"{{{ICAT_NAMESPACE_URL}}}{name}", **kw)
def encode_node_data(data) -> str:
if isinstance(data, str):
return data
elif isinstance(data, (list, tuple)):
return " ".join(list(map(str, data)))
elif isinstance(data, bytes):
return data.encode()
elif isinstance(data, datetime):
return data.isoformat()
else:
return str(data)
def data_node(parent, name: str, data, **kw):
node = child_node(parent, name, **kw)
node.text = encode_node_data(data)
def parameter_node(parent, name: str, value, **kw):
node = child_node(parent, "parameter", **kw)
data_node(node, "name", name)
data_node(node, "value", value)
def dataset_as_xml(
proposal: str,
beamline: str,
collection: str,
dataset: str,
path: str,
metadata: dict = None,
start_datetime=None,
end_datetime=None,
):
root = root_node("dataset", attrib={"complete": "true"})
data_node(root, "investigation", proposal)
data_node(root, "instrument", beamline)