Skip to content
Snippets Groups Projects
Commit 7dc5232b authored by Generic Bliss account for Control Software's avatar Generic Bliss account for Control Software
Browse files

Upgrade to blissdata API 1.0

parent 90d0d0e4
1 merge request!69Draft: Upgrade to blissdata API 1.0
Pipeline #140194 failed
......@@ -10,14 +10,18 @@ from numpy.typing import ArrayLike
from silx.io import h5py_utils
from silx.utils import retry as retrymod
from silx.io.utils import get_data as silx_get_data
from blissdata.h5api import dynamic_hdf5
from blissdata.redis_engine.scan import Scan
from blissdata.redis_engine.stream import StreamingClient
from blissdata.redis_engine.models import ScanState
from blissdata.redis_engine.exceptions import EndOfStream
from blissdata.lima.client import lima_client_factory
try:
import gevent.queue # noqa F401 bliss MR 5369
from blissdata.data.node import get_node
from blissdata.data.events.lima import ImageNotSaved
except ImportError:
get_node = None
pass
from . import hdf5
from . import nexus
......@@ -163,57 +167,49 @@ def iter_bliss_scan_data_from_memory(
retry_timeout: Optional[Number] = None,
retry_period: Optional[Number] = None,
):
scan_node = _get_node(
db_name, "scan", retry_timeout=retry_timeout, retry_period=retry_period
)
indices = {name: 0 for name in lima_names + counter_names}
scan = Scan.load(db_name)
buffers = {name: list() for name in lima_names + counter_names}
lima_acq_nb = dict()
for event_type, node, event_data in scan_node.walk_events():
if node.type == "lima":
name = node.db_name.split(":")[-2]
if name not in lima_names:
continue
dataview = _get_lima_dataview(
node,
indices[name],
retry_timeout=retry_timeout,
retry_period=retry_period,
)
current_lima_acq_nb = dataview.status_event.status["lima_acq_nb"]
first_lima_acq_nb = lima_acq_nb.setdefault(name, current_lima_acq_nb)
if first_lima_acq_nb != current_lima_acq_nb:
logger.warning("lima is already acquiring the next scan")
continue
try:
data = list(dataview)
except ImageNotSaved:
logger.warning(
"cannot read lima data from file because images are not being saved"
)
continue
except Exception as e:
logger.warning("cannot read lima data (%s)", str(e))
continue
indices[name] += len(data)
buffers[name].extend(data)
elif node.type == "channel":
name = node.db_name.split(":")[-1]
if name not in counter_names:
continue
if event_data:
data = event_data.data
else:
data = node.get_as_array(indices[name], -1)
indices[name] += len(data)
buffers[name].extend(data)
nyield = min(len(v) for v in buffers.values())
if nyield:
for i in range(nyield):
yield {name: values[i] for name, values in buffers.items()}
buffers = {name: values[nyield:] for name, values in buffers.items()}
if event_type == event_type.END_SCAN:
while scan.state < ScanState.PREPARED:
scan.update()
if scan.state is ScanState.ABORTED:
return
lima_streams = dict()
lima_clients = dict()
counter_streams = dict()
for name, stream in scan.streams.items():
if stream.encoding["type"] == "json" and "lima" in stream.info["format"]:
if name.split(":")[-2] in lima_names:
lima_streams[name] = stream
lima_clients[name] = lima_client_factory(stream.info)
elif name.split(":")[-1] in counter_names:
counter_streams[name] = stream
client = StreamingClient({**lima_streams, **counter_streams})
while True:
try:
output = client.read()
except EndOfStream:
break
else:
for stream, data in output.items():
_, payload = data # drop index
if stream.name in lima_streams:
lima_client = lima_clients[stream.name]
prev_len = len(lima_client)
lima_client.update(**payload[-1])
data = lima_client[prev_len-1:]
buffers[stream.name.split(":")[-2]].extend(data)
else:
buffers[stream.name.split(":")[-1]].extend(payload)
nyield = min(len(v) for v in buffers.values())
if nyield:
for i in range(nyield):
yield {name: values[i] for name, values in buffers.items()}
buffers = {name: values[nyield:] for name, values in buffers.items()}
@retrymod.retry()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment