Commit 4a0c4166 authored by ahoms's avatar ahoms

* added global lima numpy module support

* using numpy array when reading/initing HwFrameInfo::frame_ptr
* tested Frelon::Interface callbacks in Python


git-svn-id: https://scm.blissgarden.org/svn/lima/trunk@154 45c4679d-1946-429d-baad-37912b19538b
parent 33225900
import sys
import lima
import processlib
import time
import numpy as N
Data_UNDEF = 0
Data_UINT8 = 1
Data_INT8 = 2
Data_UINT16 = 3
Data_INT16 = 4
Data_UINT32 = 5
Data_INT32 = 6
Data_UINT64 = 7
Data_INT64 = 8
Data_FLOAT = 9
Data_DOUBLE = 10
class SoftRoiCallback( processlib.TaskEventCallback ):
DataType2ImageType = {
N.int8: lima.Bpp8,
N.uint8: lima.Bpp8,
N.int16: lima.Bpp16,
N.uint16: lima.Bpp16,
N.int32: lima.Bpp32,
N.uint32: lima.Bpp32
}
def __init__(self, hw_inter, buffer_save, acq_finished):
processlib.TaskEventCallback.__init__(self)
self.m_hw_inter = hw_inter
self.m_buffer_save = buffer_save
self.m_acq_finished = acq_finished
def finished(self, data):
finfo, fdim = self.data2FrameInfo(data)
self.m_buffer_save.writeFrame(finfo)
print "Creating Espia.Dev"
edev = lima.Espia.Dev(0)
hw_sync = self.m_hw_inter.getHwCtrlObj(lima.HwCap.Sync)
nb_frames = hw_sync.getNbFrames()
if finfo.acq_frame_nb == nb_frames - 1:
self.m_acq_finished.signalFinished()
print "Creating Espia.Acq"
acq = lima.Espia.Acq(edev)
def data2FrameInfo(self, data):
arr = data.buffer
arr_type = arr.dtype.type
arr_height, arr_width = arr.shape
image_type = self.DataType2ImageType[arr_type]
acqstat = acq.getStatus()
print "Whether the Acquisition is running : ", acqstat.running
buffer_ctrl = self.m_hw_inter.getHwCtrlObj(lima.HwCap.Buffer)
start_ts = buffer_ctrl.getStartTimestamp()
print "Creating Espia.BufferMgr"
buffer_cb_mgr = lima.Espia.BufferMgr(acq)
fdim = lima.FrameDim(arr_width, arr_height, image_type)
timestamp = lima.Timestamp(data.timestamp)
valid_pixels = lima.Point(fdim.getSize()).getArea()
print "Creating BufferCtrlMgr"
buffer_mgr = lima.BufferCtrlMgr(buffer_cb_mgr)
finfo = lima.HwFrameInfoType(data.frameNumber, arr, fdim,
timestamp, valid_pixels)
return finfo, fdim
print "Creating Espia.SerialLine"
eser_line = lima.Espia.SerialLine(edev)
class TestFrameCallback( lima.HwFrameCallback ):
print "Creating Frelon.Camera"
cam = lima.Frelon.Camera(eser_line);
ImageType2DataType = {
lima.Bpp8: Data_UINT8,
lima.Bpp16: Data_UINT16,
lima.Bpp32: Data_UINT32
}
def __init__(self, hw_inter, soft_roi, buffer_save, acq_finished):
lima.HwFrameCallback.__init__(self)
self.m_hw_inter = hw_inter
self.m_soft_roi = soft_roi
self.m_acq_finished = acq_finished
self.m_roi_task = processlib.Tasks.SoftRoi()
self.m_roi_cb = SoftRoiCallback(hw_inter, buffer_save,
acq_finished)
print "Creating the Hw Interface ... "
hw_inter = lima.Frelon.Interface(acq, buffer_mgr, cam)
def newFrameReady(self, frame_info):
msg = 'acq_frame_nb=%d, ' % frame_info.acq_frame_nb
fdim = frame_info.frame_dim
size = fdim.getSize()
msg += 'frame_dim=%dx%dx%d, ' % \
(size.getWidth(), size.getHeight(), fdim.getDepth())
msg += 'frame_timestamp=%.6f, ' % frame_info.frame_timestamp
msg += 'valid_pixels=%d' % frame_info.valid_pixels
print 'newFrameReady:', msg
data = self.frameInfo2Data(frame_info)
if self.m_soft_roi.isActive():
pass
else:
self.m_roi_cb.finished(data)
return True
print "Creating BufferSave"
buffer_save = lima.BufferSave(lima.BufferSave.EDF, "img", 0, ".edf", True, 1)
def frameInfo2Data(self, frame_info):
data = processlib.Data()
data.buffer = frame_info.frame_ptr
data.frameNumber = frame_info.acq_frame_nb
data.timestamp = frame_info.frame_timestamp
return data
print "Getting HW detector info"
hw_det_info = hw_inter.getHwCtrlObj(lima.HwCap.DetInfo)
print type(hw_det_info)
print "Getting HW buffer"
hw_buffer = hw_inter.getHwCtrlObj(lima.HwCap.Buffer)
print type(hw_buffer)
def main(argv):
print "Creating Espia.Dev"
edev = lima.Espia.Dev(0)
print "Getting HW Sync"
hw_sync = hw_inter.getHwCtrlObj(lima.HwCap.Sync)
print type(hw_sync)
print "Creating Espia.Acq"
acq = lima.Espia.Acq(edev)
print "Getting HW Bin"
hw_bin = hw_inter.getHwCtrlObj(lima.HwCap.Bin)
print type(hw_bin)
acqstat = acq.getStatus()
print "Whether the Acquisition is running : ", acqstat.running
print "Getting HW RoI"
hw_roi = hw_inter.getHwCtrlObj(lima.HwCap.Roi)
print type(hw_roi)
print "Creating Espia.BufferMgr"
buffer_cb_mgr = lima.Espia.BufferMgr(acq)
class TestFrameCallback( lima.HwFrameCallback ):
def __init__(self, hw_inter, soft_roi, buffer_save, acq_finished):
lima.HwFrameCallback.__init__(self)
self.m_hw_inter = hw_inter
self.m_soft_roi = soft_roi
self.m_acq_finished = acq_finished
# self.m_roi_task = lima.SoftRoi()
# self.m_roi_cb = lima.SoftRoiCallback(hw_inter, buffer_save,
# acq_finished)
print "Creating BufferCtrlMgr"
buffer_mgr = lima.BufferCtrlMgr(buffer_cb_mgr)
def newFrameReady(self, frame_info):
hw_sync = self.m_hw_inter.getHwCtrlObj(lima.HwCap.Sync)
nb_frames = hw_sync.getNbFrames()
if frame_info.acq_frame_nb == nb_frames - 1:
self.m_acq_finished.signalFinished()
return True
print "Creating Espia.SerialLine"
eser_line = lima.Espia.SerialLine(edev)
print "Creating Frelon.Camera"
cam = lima.Frelon.Camera(eser_line);
print "Creating the Hw Interface ... "
hw_inter = lima.Frelon.Interface(acq, buffer_mgr, cam)
print "Creating BufferSave"
buffer_save = lima.BufferSave(lima.BufferSave.EDF, "img", 0, ".edf",
True, 1)
print "Getting HW detector info"
hw_det_info = hw_inter.getHwCtrlObj(lima.HwCap.DetInfo)
print "Getting HW buffer"
hw_buffer = hw_inter.getHwCtrlObj(lima.HwCap.Buffer)
print "Getting HW Sync"
hw_sync = hw_inter.getHwCtrlObj(lima.HwCap.Sync)
print "Getting HW Bin"
hw_bin = hw_inter.getHwCtrlObj(lima.HwCap.Bin)
print "Getting HW RoI"
hw_roi = hw_inter.getHwCtrlObj(lima.HwCap.Roi)
soft_roi = lima.Roi()
acq_finished = lima.AcqFinished()
print "Creating a TestFrameCallback"
cb = TestFrameCallback(hw_inter, soft_roi, buffer_save, acq_finished)
soft_roi = lima.Roi()
acq_finished = lima.AcqFinished()
print "Creating a TestFrameCallback"
cb = TestFrameCallback(hw_inter, soft_roi, buffer_save, acq_finished)
do_reset = False
if do_reset:
print "Reseting hardware ..."
hw_inter.reset(lima.HwInterface.HardReset)
print " Done!"
do_reset = False
if do_reset:
print "Reseting hardware ..."
hw_inter.reset(lima.HwInterface.HardReset)
print " Done!"
size = hw_det_info.getMaxImageSize()
image_type = hw_det_info.getCurrImageType()
frame_dim = lima.FrameDim(size, image_type)
size = hw_det_info.getMaxImageSize()
image_type = hw_det_info.getCurrImageType()
frame_dim = lima.FrameDim(size, image_type)
bin = lima.Bin(lima.Point(1))
hw_bin.setBin(bin)
bin = lima.Bin(lima.Point(1))
hw_bin.setBin(bin)
#Roi set_roi, real_roi;
#set_hw_roi(hw_roi, set_roi, real_roi, soft_roi);
#Roi set_roi, real_roi;
#set_hw_roi(hw_roi, set_roi, real_roi, soft_roi);
effect_frame_dim = lima.FrameDim(frame_dim) # was (frame_dim / bin)
hw_buffer.setFrameDim(effect_frame_dim)
hw_buffer.setNbBuffers(10)
hw_buffer.registerFrameCallback(cb)
effect_frame_dim = lima.FrameDim(frame_dim) # was (frame_dim / bin)
hw_buffer.setFrameDim(effect_frame_dim)
hw_buffer.setNbBuffers(10)
print "cb=", cb
hw_buffer.registerFrameCallback(cb)
hw_sync.setExpTime(2)
hw_sync.setNbFrames(3)
hw_sync.setExpTime(2)
hw_sync.setNbFrames(3)
print "Starting Acquisition"
acq_finished.resetFinished()
hw_inter.startAcq()
print "Starting Acquisition"
hw_inter.startAcq()
print "Waiting acq finished..."
acq_finished.waitFinished()
print "Acq finished!!"
print "Waiting acq finished..."
acq_finished.waitFinished()
#PoolThreadMgr::get().wait();
print "Stopping Acquisition"
hw_inter.stopAcq()
print "Stopping Acquisition"
hw_inter.stopAcq()
print "This is the End..."
print "This is the End..."
if __name__ == '__main__':
main(sys.argv)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment