Commit ae9ed718 authored by bliss administrator's avatar bliss administrator
Browse files

Remove gevent loop to listen redis data and reemit data

Reemit dark, ref and proj spectrums towards sequence
Add CalcChannel to produce sinogram data
parent 914c0ed3
......@@ -1077,7 +1077,12 @@ class Tomo(TomoParameters):
dark_scan.run()
if self.parameters.sinogram_active:
self.tomo_sino.dark = dark_scan
dark_spectrum_acq = dark_scan.acq_chain.get_node_from_devices(self.tomo_ccd.detector.roi_profiles.counters)
self.tomo_sino.attach_channel_to_sequence(
scan_sequence,
"dark_spectrum",
dark_spectrum_acq.channels[0]
)
# test if an error has occured
if len(capture.failed) > 0:
......@@ -1185,7 +1190,12 @@ class Tomo(TomoParameters):
ref_scan.run()
if self.parameters.sinogram_active:
self.tomo_sino.ref = ref_scan
ref_spectrum_acq = ref_scan.acq_chain.get_node_from_devices(self.tomo_ccd.detector.roi_profiles.counters)
self.tomo_sino.attach_channel_to_sequence(
scan_sequence,
"ref_spectrum",
ref_spectrum_acq.channels[0]
)
# test if an error has occured
if len(capture.failed) > 0:
......
......@@ -14,7 +14,7 @@ from bliss.scanning.acquisition.motor import MotorMaster, SoftwarePositionTrigge
from bliss.scanning.acquisition.lima import LimaAcquisitionMaster, RoiProfileAcquisitionSlave
from bliss.scanning.acquisition.timer import SoftwareTimerMaster
from bliss.scanning.chain import AcquisitionChain
from bliss.scanning.channel import AcquisitionChannel
from bliss.scanning.channel import AcquisitionChannel,duplicate_channel
from bliss.scanning.scan import Scan
import tomo
......@@ -260,6 +260,7 @@ class TomoScan:
dtype=np.float)
# add slow counters from the active measurement group
if self.tomo.counters is not None:
self.tomo.counters.add_counters(musst_master, chain, self.tomo.tomo_ccd.detector.name, exposure_time, nb_points)
......
......@@ -16,7 +16,9 @@ from bliss.common.standard import *
from bliss.common.cleanup import cleanup, axis as cleanup_axis
from bliss.shell.standard import umv, umvr
from bliss.scanning.group import Sequence
from bliss.scanning.acquisition.calc import CalcChannelAcquisitionSlave
from bliss.controllers.machinfo import WaitForRefillPreset
from bliss.scanning.channel import duplicate_channel
from tomo.TomoScan import TomoScan
from tomo.TopoScan import TopoScan
......@@ -31,6 +33,7 @@ from tomo.Presets import (
from tomo.TomoMusst import TomoMusst
from tomo.ScanDisplay import ScanDisplay
from tomo.TomoTools import TopoTools
from tomo.TomoSinogram import SinoCalc
from tomo.pcotomo.PcoTomoScan import PcoTomoScan
from tomo.pcotomo.Presets import PcoTomoShutterPreset
......@@ -353,26 +356,42 @@ class FastTomo:
self.tomo.list_proj_scans.append(proj_scan)
if self.tomo.parameters.sinogram_active:
self.gscan = gevent.spawn(proj_scan.run)
while proj_scan.node is None:
gevent.sleep(0.1)
self.greemit = self.tomo.tomo_sino.create_data_reemitter(
proj_scan.node.db_name, scan_sequence.sequence
proj_spectrum_acq = proj_scan.acq_chain.get_node_from_devices(self.tomo_ccd.detector.roi_profiles.counters)
self.tomo_sino.attach_channel_to_sequence(
scan_sequence,
"proj_spectrum",
proj_spectrum_acq.channels[0]
)
self.greemit.start()
try:
self.gscan.get()
except:
# stops scan and raises exception
self.gscan.kill()
self.greemit.stop()
raise
finally:
self.greemit.get()
else:
# run the proj scan
proj_scan.run()
tomo_range = self.tomo.parameters.end_pos-self.tomo.parameters.start_pos
npixels = self.tomo.tomo_ccd.detector.roi_profiles.get_rois()[0].width
def replicate(angles):
return np.repeat(angles%tomo_range, npixels)
rotation_acq = proj_scan.acq_chain.get_node_from_devices(self.tomo.rotation_axis)
rotation_channel = [channel for chan in rotation_acq.channels if self.tomo.rotation_axis.name][0]
rotation_replicate_channel, _ , _ = duplicate_channel(rotation_channel,"rotation",replicate,numpy.float)
self.tomo_sino.attach_channel_to_sequence(
scan_sequence,
"rotation",
rotation_replicate_channel)
sino_calc = SinoCalc(scan_sequence.custom_channels["dark_spectrum"],
scan_sequence.custom_channels["ref_spectrum"],
scan_sequence.custom_channels["proj_spectrum"],
scan_sequence.custom_channels["rotation"],
self.tomo.parameters.tomo_n,
scan_sequence)
calc_sino_acq = CalcChannelAcquisitionSlave("calc_sino_acq", [scan_sequence], sino_calc, ["translation"])
chain.add(proj_spectrum_acq,calc_sino_acq)
sino_translation_channel = [channel for chan in calc_sino_acq.channels if "translation"][0]
self.tomo_sino.attach_channel_to_sequence(
scan_sequence,
"translation",
sino_translation_channel)
proj_scan.run()
log_info(self, "projection_scan() leaving")
......@@ -437,22 +456,9 @@ class FastTomo:
seq = Sequence(title=self.tomo.sequence, scan_info=scan_info)
if self.tomo.parameters.sinogram_active:
npixels = self.tomo.tomo_ccd.detector.roi_profiles.get_rois()[0].width
self.tomo.tomo_sino.configure_data_reemission(
seq,
self.tomo.rotation_axis,
self.tomo.tomo_ccd.detector.roi_profiles.counters,
npixels,
self.tomo.parameters.end_pos-self.tomo.parameters.start_pos,
)
with seq.sequence_context() as scan_seq:
self.tomo.run_sequence(scan_seq)
if self.tomo.parameters.sinogram_active:
self.tomo.tomo_sino.save(self.tomo, seq.scan.scan_number)
class HalfTomo(FastTomo, TomoParameters):
"""
......@@ -638,22 +644,9 @@ class HalfTomo(FastTomo, TomoParameters):
seq = Sequence(title=self.tomo.sequence, scan_info=scan_info)
if self.tomo.parameters.sinogram_active:
npixels = self.tomo.tomo_ccd.detector.roi_profiles.get_rois()[0].width
self.tomo.tomo_sino.configure_data_reemission(
seq,
self.tomo.rotation_axis,
self.tomo.tomo_ccd.detector.roi_profiles.counters,
npixels,
self.tomo.parameters.end_pos-self.tomo.parameters.start_pos,
)
with seq.sequence_context() as scan_seq:
self.tomo.run_sequence(scan_seq)
if self.tomo.parameters.sinogram_active:
self.tomo.tomo_sino.save(self.tomo, seq.scan.scan_number)
def scan_cleanup(self):
"""
Restores all parameters that have been modified by the acquisition.
......@@ -1921,18 +1914,6 @@ class PcoTomo(FastTomo, TomoParameters):
seq = Sequence(title=self.tomo.sequence, scan_info=scan_info)
if self.tomo.parameters.sinogram_active:
npixels = self.tomo.tomo_ccd.detector.roi_profiles.get_rois()[0].width
self.tomo.tomo_sino.configure_data_reemission(
seq,
self.tomo.rotation_axis,
self.tomo.tomo_ccd.detector.roi_profiles.counters,
npixels,
self.tomo.parameters.end_pos - self.tomo.parameters.start_pos,
)
with seq.sequence_context() as scan_seq:
self.tomo.run_sequence(scan_seq)
if self.tomo.parameters.sinogram_active:
self.tomo.tomo_sino.save(self.tomo, seq.scan.scan_number)
......@@ -2,13 +2,13 @@ from bliss.setup_globals import *
from bliss import global_map, current_session
from bliss.common.logtools import log_info
import numpy
import gevent
from bliss.common.scans.scan_info import ScanInfoFactory
from bliss.scanning.scan_info import ScanInfo
from bliss.scanning.chain import AcquisitionChannel
import gevent
from bliss.config.streaming import DataStreamReaderStopHandler
from bliss.data.node import get_node
from silx.io.nxdata import save_NXdata
from bliss.scanning.acquisition.calc import CalcChannelAcquisitionSlave, CalcHook
from bliss.scanning.channel import attach_channels,duplicate_channel
class TomoSinogram:
"""
......@@ -29,7 +29,6 @@ class TomoSinogram:
self.log_name = name+'.add_sinogram'
global_map.register(self, tag=self.log_name)
log_info(self,"__init__() entering")
self.channelmap = {}
self.dark = None
self.ref = None
log_info(self,"__init__() leaving")
......@@ -69,149 +68,71 @@ class TomoSinogram:
scan_info.add_scatter_plot(x="translation", y="rotation", value="sinogram")
return scan_info
def configure_data_reemission(self, sequence, rotation, spectrum_counters, npixels, tomo_range):
"""
Adds acquisition channels to tomo sequence object to emit sinogram
data (axis X, axis Y and spectrum values).
"""
# Prepare group channels
def replicate(angles,event):
return numpy.repeat(angles%tomo_range, npixels)
def flatten(proj,event):
dark = self.dark.get_data()[event.node.name][0].astype(int)
ref = self.ref.get_data()[event.node.name][0].astype(int)
spectra = numpy.abs(proj-dark).astype(numpy.uint32)/numpy.abs(ref-dark).astype(numpy.uint32)
return numpy.array(spectra).flatten()
def translations(spectra,event):
return flatten([numpy.arange(npixels)] * len(spectra),event)
def attach_channel_to_sequence(self,sequence,channel_name,channel_source):
seq_channel= AcquisitionChannel(channel_name}, numpy.float, ())
sequence.add_custom_channel(seq_channel)
attach_channels(seq_channel,channel_source)
sequence.add_custom_channel(
AcquisitionChannel("translation", numpy.float, (), unit="px")
)
sequence.add_custom_channel(
AcquisitionChannel("rotation", numpy.float, (), unit="degree")
)
sequence.add_custom_channel(
AcquisitionChannel("sinogram", numpy.float, ())
)
self.channelmap[f"{rotation.name}"] = [{"name": "rotation", "process": replicate}]
class SinoCalc(CalcHook):
def __init__(self, dark_channel, ref_channel, proj_channel, rotation_channel, proj_per_turn, sequence, image_to_keep = 0, image_operation = numpy.mean):
self.dark_channel = dark_channel
self.ref_channel = ref_channel
self.proj_channel = proj_channel
self.rotation_channel = rotation_channel
self.dark_data = numpy.array([])
self.ref_data = numpy.array([])
self.rotation_data = numpy.array([])
self.image_to_keep = image_to_keep
self.image_operation = image_operation
self.image_corr = numpy.array([])
self.proj_per_turn = proj_per_turn
self.sequence = sequence
def compute(self, sender, data_dict):
dark_data = data_dict.get(self.dark_channel)
if dark_data is not None:
numpy.append(self.dark_data,dark_data)
ref_data = data_dict.get(self.ref_channel)
if ref_data is not None:
numpy.append(self.ref_data,ref_data)
for counter in spectrum_counters:
rotation_data = data_dict.get(self.rotation_channel)
if rotation_data is not None:
numpy.append(self.rotation_data,rotation_data)
proj_data = data_dict.get(self.proj_channel)
if proj_data is not None:
self.dark_data = self.image_operation(self.dark_data[self.image_to_keep],axis=0)
self.ref_data = self.image_operation(self.ref_data[self.image_to_keep],axis=0)
self.channelmap[counter.fullname] = [
{"name": "sinogram", "process": flatten},
{"name": "translation", "process": translations},
]
proj_data = [numpy.arange(len(proj_data[0]))] * len(proj_data)
image_corr = (proj_data - self.dark_data).astype(numpy.uint32) / (self.ref_data - self.dark_data).astype(numpy.uint32)
numpy.append(self.image_corr,image_corr)
if len(self.image_corr) % self.proj_per_turn:
gevent.spawn(self.saving, self.image_corr, self.rotation_data, len(proj_data[0]), numpy.arange(len(proj_data[0])), self.proj_per_turn)
return {"translation":image_corr.flatten()}
def create_data_reemitter(self, db_name, sequence):
"""
Creates object that will reemit sinogram data from redis in order
to build the scatter plot.
"""
return ScanReEmitter(db_name, sequence, self.channelmap)
def save(self, tomo, sequence_nb):
def saving(self, sino_data, y_data, len_y, x_data, len_x):
motor_name = getattr(tomo.rotation_axis, 'original_name', tomo.rotation_axis.name)
if type(tomo.get_projection_scan()) == list:
y_data = []
for proj_scan in tomo.get_projection_scan():
numpy.append(y_data,proj_scan.get_data()[f'{motor_name}'])
else:
y_data = tomo.get_projection_scan().get_data()[f'{motor_name}']
y_data = numpy.repeat(y_data,tomo.tomo_ccd.detector.image.width)
x_data = numpy.arange(tomo.tomo_ccd.detector.image.width)
x_data = numpy.tile(x_data,tomo.parameters.tomo_n)
sino_data = current_session.scans[-1].get_data()['sinogram']
y_data = numpy.repeat(y_data,len_y)
x_data = numpy.tile(x_data,len_x)
save_NXdata(filename=f"{current_session.scan_saving.get_path()}/sinogram.h5",
signal=sino_data,
signal_name="values",
axes=[x_data,
y_data],
axes_names=["x", "y"],
nxentry_name=f"sinogram_{sequence_nb}",
nxdata_name=f"data_{sequence_nb}")
class ScanReEmitter(gevent.Greenlet):
"""
Class for scan data reemission.
The class implements methods to listen data events on redis, catch
desired data and reemit then.
**Attributes**
db_name : string
full name of redis data node.
Reflects the position of the node in the tree of nodes.
sequence : Bliss sequence object
contains a group of scans
channelinfo : dict
contains mapping between data channel name and data processing function.
stop_handler : Bliss data stream object
used to stop data stream reading
filter : filter
used to filter redis events
"""
def __init__(self, db_name, sequence, channelinfo, filter=None):
self.db_name = db_name
self.sequence = sequence
self.channelinfo = channelinfo
self.stop_handler = None
self.filter = filter
super().__init__()
@property
def custom_channels(self):
"""
Returns channels attached to the sequence.
"""
return self.sequence.custom_channels
def stop(self, timeout=None):
"""
Stops data reemission task
"""
try:
self.stop_handler.stop()
except AttributeError:
pass
self.join(timeout=timeout)
def _run(self):
"""
Starts data reemission task
"""
self.stop_handler = DataStreamReaderStopHandler()
try:
it = get_node(self.db_name).iterator
for event in it.walk_events(
filter=self.filter, stop_handler=self.stop_handler
):
if event.type == event.type.END_SCAN:
break
elif event.type == event.type.NEW_DATA:
infos = self.channelinfo.get(event.node.name, [])
for info in infos:
self.reemit(event, info.get("name"), info.get("process"))
finally:
self.stop_handler = None
def reemit(self, event, name, process=None):
"""
Reemits data corresponding to channels name belonging to mapping dictionary
according to processing function.
"""
try:
channel = self.custom_channels.get(name)
if channel is None:
return
data = event.data.data
if callable(process):
data = process(data,event)
channel.emit(data)
except Exception as e:
raise ValueError(f"Error re-emitting {name}") from e
nxentry_name=f"sinogram_{self.sequence.scan.scan_number}",
nxdata_name=f"data_{self.sequence.scan.scan_number}")
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment