Skip to content
Snippets Groups Projects
Commit c36de6d9 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

WIP

parent 90d0d0e4
No related branches found
No related tags found
1 merge request!70Resolve "Provide support for bliss 2.0"
Pipeline #150946 passed
......@@ -5,6 +5,30 @@ include:
test-3.7:
extends: .test-3.7
test-3.9-master:
extends: .test-3.9
before_script:
- git clone https://gitlab.esrf.fr/bliss/bliss.git --branch master blissrepo
- pip install blissrepo/blissdata
- rm -rf blissrepo
- !reference [.test-3.9, before_script]
test-3.9-id31_2.0:
extends: .test-3.9
before_script:
- git clone https://gitlab.esrf.fr/bliss/bliss.git --branch id31_2.0 blissrepo
- pip install blissrepo/blissdata
- rm -rf blissrepo
- !reference [.test-3.9, before_script]
test-3.9-1.11.x:
extends: .test-3.9
before_script:
- git clone https://gitlab.esrf.fr/bliss/bliss.git --branch 1.11.x blissrepo
- pip install blissrepo/blissdata redis
- rm -rf blissrepo
- !reference [.test-3.9, before_script]
build_sdist:
extends: .build_sdist
......
......@@ -10,14 +10,10 @@ from numpy.typing import ArrayLike
from silx.io import h5py_utils
from silx.utils import retry as retrymod
from silx.io.utils import get_data as silx_get_data
from blissdata.h5api import dynamic_hdf5
try:
import gevent.queue # noqa F401 bliss MR 5369
from blissdata.data.node import get_node
from blissdata.data.events.lima import ImageNotSaved
except ImportError:
get_node = None
from .blissdata import iter_bliss_scan_data_from_memory # noqa F401
from .blissdata import last_lima_image # noqa F401
from .blissdata import dynamic_hdf5
from . import hdf5
from . import nexus
......@@ -140,100 +136,3 @@ def iter_bliss_data(
def _is_bliss_file(h5item: Union[h5py.Dataset, h5py.Group]) -> bool:
return h5item.file.attrs.get("creator", "").lower() == "bliss"
def last_lima_image(db_name: str) -> ArrayLike:
"""Get last lima image from memory"""
node = _get_node(db_name, "lima")
node.from_stream = True
dataview = node.get(-1)
try:
image = dataview.get_last_live_image()
except AttributeError:
image = None
if image is None or image.data is None:
raise RuntimeError("Cannot get last image from lima")
return image.data
def iter_bliss_scan_data_from_memory(
db_name: str,
lima_names: List[str],
counter_names: List[str],
retry_timeout: Optional[Number] = None,
retry_period: Optional[Number] = None,
):
scan_node = _get_node(
db_name, "scan", retry_timeout=retry_timeout, retry_period=retry_period
)
indices = {name: 0 for name in lima_names + counter_names}
buffers = {name: list() for name in lima_names + counter_names}
lima_acq_nb = dict()
for event_type, node, event_data in scan_node.walk_events():
if node.type == "lima":
name = node.db_name.split(":")[-2]
if name not in lima_names:
continue
dataview = _get_lima_dataview(
node,
indices[name],
retry_timeout=retry_timeout,
retry_period=retry_period,
)
current_lima_acq_nb = dataview.status_event.status["lima_acq_nb"]
first_lima_acq_nb = lima_acq_nb.setdefault(name, current_lima_acq_nb)
if first_lima_acq_nb != current_lima_acq_nb:
logger.warning("lima is already acquiring the next scan")
continue
try:
data = list(dataview)
except ImageNotSaved:
logger.warning(
"cannot read lima data from file because images are not being saved"
)
continue
except Exception as e:
logger.warning("cannot read lima data (%s)", str(e))
continue
indices[name] += len(data)
buffers[name].extend(data)
elif node.type == "channel":
name = node.db_name.split(":")[-1]
if name not in counter_names:
continue
if event_data:
data = event_data.data
else:
data = node.get_as_array(indices[name], -1)
indices[name] += len(data)
buffers[name].extend(data)
nyield = min(len(v) for v in buffers.values())
if nyield:
for i in range(nyield):
yield {name: values[i] for name, values in buffers.items()}
buffers = {name: values[nyield:] for name, values in buffers.items()}
if event_type == event_type.END_SCAN:
break
@retrymod.retry()
def _get_node(db_name: str, node_type: str):
if get_node is None:
raise ModuleNotFoundError("No module named 'blissdata'")
node = get_node(db_name)
if node is None:
raise retrymod.RetryError(f"Redis node {db_name} does not exist")
if node.type != node_type:
raise RuntimeError(f"Not a Redis {node_type} node")
return node
@retrymod.retry()
def _get_lima_dataview(node, start_index: int):
dataview = node.get(start_index, -1)
try:
if dataview.status_event.proxy is None:
raise retrymod.RetryError("Lima proxy not known (yet)")
except Exception:
raise retrymod.RetryError("Lima proxy not known (yet)")
return dataview
try:
# blissdata >=1
from .blissdatav1 import iter_bliss_scan_data_from_memory
from .blissdatav1 import last_lima_image
from blissdata.h5api import dynamic_hdf5
except ImportError:
try:
# blissdata >0.3.3,<1 (unreleased, branch id31_2.0)
from .blissdatavid31 import iter_bliss_scan_data_from_memory
from .blissdatavid31 import last_lima_image
from blissdata.h5api import dynamic_hdf5
except ImportError:
try:
# blissdata <=0.3.3
from .blissdatav0 import iter_bliss_scan_data_from_memory
from .blissdatav0 import last_lima_image
from blissdata.h5api import dynamic_hdf5
except ImportError as exc: # noqa F841
_EXC = exc
def dynamic_hdf5(*args, **kw):
raise _EXC
def iter_bliss_scan_data_from_memory(*args, **kw):
raise _EXC
def last_lima_image(*args, **kw):
raise _EXC
import logging
from numbers import Number
from typing import List, Optional
from numpy.typing import ArrayLike
from silx.utils import retry as retrymod
from blissdata.data.node import get_node
from blissdata.data.events.lima import ImageNotSaved
logger = logging.getLogger(__name__)
def iter_bliss_scan_data_from_memory(
db_name: str,
lima_names: List[str],
counter_names: List[str],
retry_timeout: Optional[Number] = None,
retry_period: Optional[Number] = None,
):
scan_node = _get_node(
db_name, "scan", retry_timeout=retry_timeout, retry_period=retry_period
)
indices = {name: 0 for name in lima_names + counter_names}
buffers = {name: list() for name in lima_names + counter_names}
lima_acq_nb = dict()
for event_type, node, event_data in scan_node.walk_events():
if node.type == "lima":
name = node.db_name.split(":")[-2]
if name not in lima_names:
continue
dataview = _get_lima_dataview(
node,
indices[name],
retry_timeout=retry_timeout,
retry_period=retry_period,
)
current_lima_acq_nb = dataview.status_event.status["lima_acq_nb"]
first_lima_acq_nb = lima_acq_nb.setdefault(name, current_lima_acq_nb)
if first_lima_acq_nb != current_lima_acq_nb:
logger.warning("lima is already acquiring the next scan")
continue
try:
data = list(dataview)
except ImageNotSaved:
logger.warning(
"cannot read lima data from file because images are not being saved"
)
continue
except Exception as e:
logger.warning("cannot read lima data (%s)", str(e))
continue
indices[name] += len(data)
buffers[name].extend(data)
elif node.type == "channel":
name = node.db_name.split(":")[-1]
if name not in counter_names:
continue
if event_data:
data = event_data.data
else:
data = node.get_as_array(indices[name], -1)
indices[name] += len(data)
buffers[name].extend(data)
nyield = min(len(v) for v in buffers.values())
if nyield:
for i in range(nyield):
yield {name: values[i] for name, values in buffers.items()}
buffers = {name: values[nyield:] for name, values in buffers.items()}
if event_type == event_type.END_SCAN:
break
def last_lima_image(db_name: str) -> ArrayLike:
"""Get last lima image from memory"""
node = _get_node(db_name, "lima")
node.from_stream = True
dataview = node.get(-1)
try:
image = dataview.get_last_live_image()
except AttributeError:
image = None
if image is None or image.data is None:
raise RuntimeError("Cannot get last image from lima")
return image.data
@retrymod.retry()
def _get_node(db_name: str, node_type: str):
node = get_node(db_name)
if node is None:
raise retrymod.RetryError(f"Redis node {db_name} does not exist")
if node.type != node_type:
raise RuntimeError(f"Not a Redis {node_type} node")
return node
@retrymod.retry()
def _get_lima_dataview(node, start_index: int):
dataview = node.get(start_index, -1)
try:
if dataview.status_event.proxy is None:
raise retrymod.RetryError("Lima proxy not known (yet)")
except Exception:
raise retrymod.RetryError("Lima proxy not known (yet)")
return dataview
from numbers import Number
from typing import List, Optional
from numpy.typing import ArrayLike
from blissdata.beacon.data import BeaconData
from blissdata.redis_engine.store import DataStore
from blissdata.redis_engine.scan import Scan
from blissdata.redis_engine.scan import ScanState
from blissdata.redis_engine.stream import StreamingClient
from blissdata.redis_engine.exceptions import EndOfStream
from blissdata.redis_engine.exceptions import IndexNoMoreThereError
from blissdata.lima.client import lima_client_factory
def iter_bliss_scan_data_from_memory(
db_name: str,
lima_names: List[str],
counter_names: List[str],
retry_timeout: Optional[Number] = None,
retry_period: Optional[Number] = None,
):
data_store = _get_data_store()
scan = data_store.load_scan(db_name, scan_cls=Scan)
buffers = {name: list() for name in lima_names + counter_names}
while scan.state < ScanState.PREPARED:
scan.update()
lima_streams = dict()
lima_clients = dict()
counter_streams = dict()
for name, stream in scan.streams.items():
if stream.encoding["type"] == "json" and "lima" in stream.info["format"]:
if name.split(":")[-2] in lima_names:
lima_streams[name] = stream
lima_clients[name] = lima_client_factory(data_store, stream.info)
elif name.split(":")[-1] in counter_names:
counter_streams[name] = stream
client = StreamingClient({**lima_streams, **counter_streams})
while True:
try:
output = client.read()
except EndOfStream:
break
else:
for stream, data in output.items():
_, payload = data # drop index
if stream.name in lima_streams:
lima_client = lima_clients[stream.name]
prev_len = len(lima_client)
lima_client.update(**payload[-1])
try:
data = lima_client[prev_len - 1 :]
except IndexNoMoreThereError:
continue
buffers[stream.name.split(":")[-2]].extend(data)
else:
buffers[stream.name.split(":")[-1]].extend(payload)
nyield = min(len(v) for v in buffers.values())
if nyield:
for i in range(nyield):
yield {name: values[i] for name, values in buffers.items()}
buffers = {name: values[nyield:] for name, values in buffers.items()}
def last_lima_image(channel_info: dict) -> ArrayLike:
"""Get last lima image from memory"""
data_store = _get_data_store()
lima_client = lima_client_factory(data_store, channel_info)
return lima_client.get_last_live_image().array
def _get_data_store() -> None:
redis_url = BeaconData().get_redis_data_db()
return DataStore(redis_url)
from numbers import Number
from typing import List, Optional
from numpy.typing import ArrayLike
from blissdata.redis_engine.scan import Scan
from blissdata.redis_engine.stream import StreamingClient
from blissdata.redis_engine.models import ScanState
from blissdata.redis_engine.exceptions import EndOfStream
from blissdata.lima.client import lima_client_factory
from blissdata.beacon.data import BeaconData
from blissdata import redis_engine
def iter_bliss_scan_data_from_memory(
db_name: str,
lima_names: List[str],
counter_names: List[str],
retry_timeout: Optional[Number] = None,
retry_period: Optional[Number] = None,
):
_ensure_redis()
scan = Scan.load(db_name)
buffers = {name: list() for name in lima_names + counter_names}
while scan.state < ScanState.PREPARED:
scan.update()
if scan.state is ScanState.ABORTED:
return
lima_streams = dict()
lima_clients = dict()
counter_streams = dict()
for name, stream in scan.streams.items():
if stream.encoding["type"] == "json" and "lima" in stream.info["format"]:
if name.split(":")[-2] in lima_names:
lima_streams[name] = stream
lima_clients[name] = lima_client_factory(stream.info)
elif name.split(":")[-1] in counter_names:
counter_streams[name] = stream
client = StreamingClient({**lima_streams, **counter_streams})
while True:
try:
output = client.read()
except EndOfStream:
break
else:
for stream, data in output.items():
_, payload = data # drop index
if stream.name in lima_streams:
lima_client = lima_clients[stream.name]
prev_len = len(lima_client)
lima_client.update(**payload[-1])
data = lima_client[prev_len - 1 :]
buffers[stream.name.split(":")[-2]].extend(data)
else:
buffers[stream.name.split(":")[-1]].extend(payload)
nyield = min(len(v) for v in buffers.values())
if nyield:
for i in range(nyield):
yield {name: values[i] for name, values in buffers.items()}
buffers = {name: values[nyield:] for name, values in buffers.items()}
def last_lima_image(channel_info: dict) -> ArrayLike:
"""Get last lima image from memory"""
_ensure_redis()
lima_client = lima_client_factory(channel_info)
return lima_client.get_last_live_image().array
def _ensure_redis() -> None:
url = BeaconData().get_redis_data_db()
current_url = _current_redis_url()
if current_url:
if url == current_url:
return
raise RuntimeError("The Redis URL has changed. Restart the Nexus writer.")
redis_engine.set_redis_url(url)
def _current_redis_url() -> Optional[str]:
if redis_engine._redis is None:
return
kwargs = redis_engine._redis.connection_pool.connection_kwargs
if "path" in kwargs:
return f"unix://{kwargs['path']}"
else:
return f"redis://{kwargs['host']}:{kwargs['port']}"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment