Commits (34)
stages:
- build
- test
- deploy
build-noarch:
......@@ -13,28 +12,7 @@ build-noarch:
tags:
- conda
test-pytest:
stage: test
image: continuumio/miniconda3:latest
before_script:
- echo ${CI_PROJECT_DIR}
- /opt/conda/bin/conda init && source /root/.bashrc
- conda config --env --add channels esrf-bcu
- conda config --env --append channels conda-forge
- conda config --env --append channels tango-controls
- conda install --yes pytest fabio pytango lima-core python==3.7
- conda install dist/noarch/lima-tango-server-*.tar.bz2
script:
- pytest tests
artifacts:
paths:
- dist/
tags:
- conda
deploy_devel:
deploy-devel:
stage: deploy
environment:
name: devel/$CI_COMMIT_REF_NAME
......@@ -53,7 +31,7 @@ deploy_devel:
- stable
when: manual
deploy_stable:
deploy-stable:
stage: deploy
environment:
name: production
......
......@@ -1894,7 +1894,7 @@ class LimaCCDs(PyTango.LatestDeviceImpl) :
@Core.DEB_MEMBER_FUNCT
def readImageSeq(self, frame_seq):
deb.Param('frame_seq=%s' % frame_seq)
frame_seq = map(int, frame_seq)
frame_seq = list(map(int, frame_seq))
start, end = frame_seq[:2]
step = 1
if len(frame_seq) > 2:
......
############################################################################
# This file is part of LImA, a Library for Image Acquisition
#
# Copyright (C) : 2009-2020
# Copyright (C) : 2009-2021
# European Synchrotron Radiation Facility
# BP 220, Grenoble 38043
# CS40220 38043 Grenoble Cedex 9
# FRANCE
#
# Contact: lima@esrf.fr
#
# This is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
......@@ -28,11 +30,13 @@ from Lima.Server.plugins.Utils import getDataFromFile,BasePostProcess
class BackgroundSubstractionDeviceServer(BasePostProcess) :
BACKGROUND_TASK_NAME = 'BackGroundTask'
GET_BACKGROUND_IMAGE = "TMP_GET_BACKGROUND_IMAGE"
Core.DEB_CLASS(Core.DebModApplication,'BackgroundSubstraction')
Core.DEB_CLASS(Core.DebModApplication,'BackgroundSubstractionDeviceServer')
@Core.DEB_MEMBER_FUNCT
def __init__(self,cl,name) :
self.__background_op = None
self.__backGroundImage = Core.Processlib.Data()
self.__backgroundFile = None
self.__backgroundImage = Core.Processlib.Data()
self.get_device_properties(self.get_device_class())
self.__deleteDarkAfterRead = False
self.__offset = 0
......@@ -40,6 +44,7 @@ class BackgroundSubstractionDeviceServer(BasePostProcess) :
BasePostProcess.__init__(self,cl,name)
BackgroundSubstractionDeviceServer.init_device(self)
@Core.DEB_MEMBER_FUNCT
def set_state(self,state) :
if(state == PyTango.DevState.OFF) :
if(self.__background_op) :
......@@ -66,7 +71,7 @@ class BackgroundSubstractionDeviceServer(BasePostProcess) :
self.__background_op = extOpt.addOp(Core.BACKGROUNDSUBSTRACTION,
self.BACKGROUND_TASK_NAME,
self._runLevel+1)
self.__background_op.setBackgroundImage(self.__backGroundImage)
self.__background_op.setBackgroundImage(self.__backgroundImage)
if self.__offset :
self.__background_op.setOffset(self.__offset)
except:
......@@ -77,16 +82,23 @@ class BackgroundSubstractionDeviceServer(BasePostProcess) :
PyTango.LatestDeviceImpl.set_state(self,state)
@Core.DEB_MEMBER_FUNCT
def read_delete_dark_after_read(self,attr) :
attr.set_value(self.__deleteDarkAfterRead)
@Core.DEB_MEMBER_FUNCT
def write_delete_dark_after_read(self,attr) :
data = attr.get_write_value()
self.__deleteDarkAfterRead = data
def is_delete_dark_after_read_allowed(self,mode):
return True
@Core.DEB_MEMBER_FUNCT
def read_offset(self,attr) :
attr.set_value(self.__offset)
@Core.DEB_MEMBER_FUNCT
def write_offset(self,attr) :
offset = attr.get_write_value()
self.__offset = offset
......@@ -96,20 +108,57 @@ class BackgroundSubstractionDeviceServer(BasePostProcess) :
except AttributeError:
self.__offset = 0
raise
def is_offset_allowed(self,mode):
return True
#------------------------------------------------------------------
# Read MaskFile attribute
#------------------------------------------------------------------
@Core.DEB_MEMBER_FUNCT
def read_BackgroundFile(self, attr):
if self.__backgroundFile is not None:
attr.set_value(self.__backgroundFile)
else:
attr.set_value("")
#------------------------------------------------------------------
# Write MaskFile attribute
#------------------------------------------------------------------
@Core.DEB_MEMBER_FUNCT
def write_BackgroundFile(self, attr):
filename = attr.get_write_value()
self.setBackgroundImageFile(filename)
def is_BackgroundFile_allowed(self,mode):
return True
#==================================================================
#
# Background command methods
#
#==================================================================
@Core.DEB_MEMBER_FUNCT
def setBackgroundImage(self,filepath) :
deb.Param('filepath=%s' % filepath)
image = getDataFromFile(filepath)
self.__backgroundFile = filepath
if self.__deleteDarkAfterRead:
os.unlink(filepath)
self._setBackgroundImage(image)
def _setBackgroundImage(self,image):
self.__backGroundImage = image
self.__backgroundImage = image
if(self.__background_op) :
self.__background_op.setBackgroundImage(image)
@Core.DEB_MEMBER_FUNCT
def setBackgroundFile(self,filepath) :
""" new command to fit with other correction plugin api
"""
self.setBackgroundImage(filepath)
@Core.DEB_MEMBER_FUNCT
def takeNextAcquisitionAsBackground(self):
self.__get_image_task.updateBackgroundImage()
......@@ -149,6 +198,9 @@ class BackgroundSubstractionDeviceServerClass(PyTango.DeviceClass) :
# Command definitions
cmd_list = {
'setBackgroundImage':
[[PyTango.DevString,"Full path of background image file"],
[PyTango.DevVoid,""]],
'setBackgroundFile':
[[PyTango.DevString,"Full path of background image file"],
[PyTango.DevVoid,""]],
'takeNextAcquisitionAsBackground':
......@@ -160,15 +212,15 @@ class BackgroundSubstractionDeviceServerClass(PyTango.DeviceClass) :
'Stop':
[[PyTango.DevVoid,""],
[PyTango.DevVoid,""]],
}
}
# Attribute definitions
attr_list = {
'RunLevel':
[[PyTango.DevLong,
PyTango.SCALAR,
PyTango.READ_WRITE]],
[[PyTango.DevLong,
PyTango.SCALAR,
PyTango.READ_WRITE]],
'delete_dark_after_read':
[[PyTango.DevBoolean,
PyTango.SCALAR,
......@@ -177,6 +229,10 @@ class BackgroundSubstractionDeviceServerClass(PyTango.DeviceClass) :
[[PyTango.DevLong,
PyTango.SCALAR,
PyTango.READ_WRITE]],
'BackgroundFile':
[[PyTango.DevString,
PyTango.SCALAR,
PyTango.READ_WRITE]],
}
......
......@@ -125,18 +125,22 @@ class BpmDeviceServer(BasePostProcess):
self._bpmManager = None
self._BVDataTask = None
elif(state == PyTango.DevState.ON) :
if not self._bpmManager:
ctControl = _control_ref()
extOpt = ctControl.externalOperation()
self._softOp = extOpt.addOp(Core.BPM,self.BPM_TASK_NAME,
self._runLevel+1)
# 'set_state' is called many times, even with same state
# so, we need to ensure tasks are not re-created for nothing
# /!\ caution: in case of BVDataTask, a separate thread is
# started and a weakref references the task from the thread,
# the task cannot be re-created without stopping the thread
# first, otherwise the weakref resolves to 'None' in the thread
# causing exceptions.
ctControl = _control_ref()
extOpt = ctControl.externalOperation()
if self.enable_bpm_calc and not self._bpmManager:
self._softOp = extOpt.addOp(Core.BPM,self.BPM_TASK_NAME, self._runLevel+1)
self._bpmManager = self._softOp.getManager()
if self.enable_tango_event:
self._BVDataTask = BVDataTask(self)
handler = extOpt.addOp(Core.USER_SINK_TASK,
self.BVDATA_TASK_NAME,self._runLevel+2)
handler.setSinkTask(self._BVDataTask)
if self.enable_tango_event and not self._BVDataTask:
self._BVDataTask = BVDataTask(self)
handler = extOpt.addOp(Core.USER_SINK_TASK, self.BVDATA_TASK_NAME,self._runLevel+2)
handler.setSinkTask(self._BVDataTask)
PyTango.LatestDeviceImpl.set_state(self,state)
......@@ -248,40 +252,52 @@ class BpmDeviceServer(BasePostProcess):
#==================================================================
#
def get_bpm_result(self, frameNumber=None, timestamp=None):
if frameNumber==None:
t = time.time()
result = self._bpmManager.getResult()
if self.enable_bpm_calc:
if frameNumber==None:
t = time.time()
result = self._bpmManager.getResult()
else:
t = timestamp
result = self._bpmManager.getResult(0,frameNumber)
dim = _control_ref().image().getImageDim().getSize()
max_width = dim.getWidth()
max_height = dim.getHeight()
if result.errorCode != self._bpmManager.OK:
x = -1
y = -1
intensity = -1
fwhm_x = 0
fwhm_y = 0
max_intensity = 0
else:
x = self.validate_number(result.beam_center_x, max_value=max_width)
x *= self.calibration[0]
y = self.validate_number(result.beam_center_y, max_value=max_height)
y *= self.calibration[1]
intensity = self.validate_number(result.beam_intensity)
fwhm_x = self.validate_number(result.beam_fwhm_x, fallback_value=0)
fwhm_x *= self.calibration[0]
fwhm_y = self.validate_number(result.beam_fwhm_y, fallback_value=0)
fwhm_y *= self.calibration[1]
max_intensity = self.validate_number(result.max_pixel_value, fallback_value=0)
try:
profile_x = result.profile_x.buffer.astype(numpy.int)
except:
profile_x = numpy.array([],dtype=numpy.int)
try:
profile_y = result.profile_y.buffer.astype(numpy.int)
except:
profile_y = numpy.array([],dtype=numpy.int)
else:
t = timestamp
result = self._bpmManager.getResult(0,frameNumber)
dim = _control_ref().image().getImageDim().getSize()
max_width = dim.getWidth()
max_height = dim.getHeight()
if result.errorCode != self._bpmManager.OK:
x = -1
y = -1
intensity = -1
fwhm_x = 0
fwhm_y = 0
max_intensity = 0
else:
x = self.validate_number(result.beam_center_x, max_value=max_width)
x *= self.calibration[0]
y = self.validate_number(result.beam_center_y, max_value=max_height)
y *= self.calibration[1]
intensity = self.validate_number(result.beam_intensity)
fwhm_x = self.validate_number(result.beam_fwhm_x, fallback_value=0)
fwhm_x *= self.calibration[0]
fwhm_y = self.validate_number(result.beam_fwhm_y, fallback_value=0)
fwhm_y *= self.calibration[1]
max_intensity = self.validate_number(result.max_pixel_value, fallback_value=0)
try:
profile_x = result.profile_x.buffer.astype(numpy.int)
except:
t = time.time()
x = -1
y = -1
intensity = -1
fwhm_x = 0
fwhm_y = 0
max_intensity = 0
profile_x = numpy.array([],dtype=numpy.int)
try:
profile_y = result.profile_y.buffer.astype(numpy.int)
except:
profile_y = numpy.array([],dtype=numpy.int)
acq_time=t
......@@ -473,6 +489,17 @@ class BpmDeviceServer(BasePostProcess):
def is_return_bpm_profiles_allowed(self,mode) :
return True
def read_enable_bpm_calc(self, attr):
attr.set_value(self.enable_bpm_calc)
def write_enable_bpm_calc(self, attr):
flag = attr.get_write_value()
self.enable_bpm_calc = bool(flag)
def is_enable_bpm_calc_allowed(self,mode) :
return True
#==================================================================
#
# BpmClass class definition
......@@ -523,6 +550,10 @@ class BpmDeviceServerClass(PyTango.DeviceClass):
[PyTango.DevVarLongArray,
"min/max value for manual scaling",
[0,65535] ],
'enable_bpm_calc':
[PyTango.DevBoolean,
"Enable/Disable bpm calculation",
True],
}
......@@ -575,6 +606,7 @@ class BpmDeviceServerClass(PyTango.DeviceClass):
'jpeg_quality': [[PyTango.DevLong, PyTango.SCALAR, PyTango.READ_WRITE]],
'min_max': [[PyTango.DevULong64, PyTango.SPECTRUM, PyTango.READ_WRITE, 2 ]],
'return_bpm_profiles': [[PyTango.DevBoolean, PyTango.SCALAR, PyTango.READ_WRITE]],
'enable_bpm_calc': [[PyTango.DevBoolean, PyTango.SCALAR, PyTango.READ_WRITE]],
}
......
############################################################################
# This file is part of LImA, a Library for Image Acquisition
#
# Copyright (C) : 2009-2011
# Copyright (C) : 2009-2021
# European Synchrotron Radiation Facility
# BP 220, Grenoble 38043
# CS40220 38043 Grenoble Cedex 9
# FRANCE
#
# Contact: lima@esrf.fr
#
# This is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
......@@ -26,16 +28,20 @@ from Lima.Server.plugins.Utils import getDataFromFile,BasePostProcess
class FlatfieldDeviceServer(BasePostProcess) :
FLATFIELD_TASK_NAME = 'FlatField'
Core.DEB_CLASS(Core.DebModApplication, 'FlatfieldDeviceServer')
@Core.DEB_MEMBER_FUNCT
def __init__(self,cl,name) :
self.__flatFieldTask = None
self.__normalize = True
self.__flatFieldFile = None
self.__flatFieldImage = Core.Processlib.Data()
BasePostProcess.__init__(self,cl,name)
FlatfieldDeviceServer.init_device(self)
@Core.DEB_MEMBER_FUNCT
def set_state(self,state) :
if(state == PyTango.DevState.OFF) :
if(self.__flatFieldTask) :
......@@ -53,22 +59,20 @@ class FlatfieldDeviceServer(BasePostProcess) :
self.__flatFieldTask.setFlatFieldImage(self.__flatFieldImage, self.__normalize)
PyTango.LatestDeviceImpl.set_state(self,state)
def setFlatFieldImage(self,filepath) :
self.__flatFieldImage = getDataFromFile(filepath)
if(self.__flatFieldTask) :
self.__flatFieldTask.setFlatFieldImage(self.__flatFieldImage)
#==================================================================
#
# FlatfieldDeviceServer read/write attribute methods
#
#==================================================================
#------------------------------------------------------------------
# normalize attribute
#------------------------------------------------------------------
@Core.DEB_MEMBER_FUNCT
def read_normalize(self,attr) :
attr.set_value(self.__normalize)
@Core.DEB_MEMBER_FUNCT
def write_normalize(self,attr) :
data = attr.get_write_value()
self.__normalize = data
......@@ -80,6 +84,52 @@ class FlatfieldDeviceServer(BasePostProcess) :
else:
return self.get_state() == PyTango.DevState.OFF
#------------------------------------------------------------------
# Read MaskFile attribute
#------------------------------------------------------------------
@Core.DEB_MEMBER_FUNCT
def read_FlatFieldFile(self, attr):
if self.__flatFieldFile is not None:
attr.set_value(self.__flatFieldFile)
else:
attr.set_value("")
#------------------------------------------------------------------
# Write MaskFile attribute
#------------------------------------------------------------------
@Core.DEB_MEMBER_FUNCT
def write_FlatFieldFile(self, attr):
filename = attr.get_write_value()
self.setFlatFieldFile(filename)
def is_FlatFieldFile_allowed(self,mode):
return True
#==================================================================
#
# Mask command methods
#
#==================================================================
@Core.DEB_MEMBER_FUNCT
def setFlatFieldImage(self,filepath) :
self.__flatFieldImage = getDataFromFile(filepath)
self.__flatFieldFile = filepath
if(self.__flatFieldTask) :
self.__flatFieldTask.setFlatFieldImage(self.__flatFieldImage)
@Core.DEB_MEMBER_FUNCT
def setFlatFieldFile(self, filepath):
""" new command to fit with other correction plugin api
"""
self.setFlatFieldImage(filepath)
#==================================================================
#
# FlatfieldDeviceServerClass class definition
#
#==================================================================
class FlatfieldDeviceServerClass(PyTango.DeviceClass) :
# Class Properties
class_property_list = {
......@@ -94,6 +144,9 @@ class FlatfieldDeviceServerClass(PyTango.DeviceClass) :
# Command definitions
cmd_list = {
'setFlatFieldImage':
[[PyTango.DevString,"Full path of flatfield image file"],
[PyTango.DevVoid,""]],
'setFlatFieldFile':
[[PyTango.DevString,"Full path of flatfield image file"],
[PyTango.DevVoid,""]],
'Start':
......@@ -115,7 +168,11 @@ class FlatfieldDeviceServerClass(PyTango.DeviceClass) :
[[PyTango.DevBoolean,
PyTango.SCALAR,
PyTango.READ_WRITE]],
}
'FlatFieldFile':
[[PyTango.DevString,
PyTango.SCALAR,
PyTango.READ_WRITE]],
}
#------------------------------------------------------------------
......
......@@ -357,7 +357,7 @@ class LimaTacoCCDs(PyTango.LatestDeviceImpl, object):
def DevCcdReadAll(self, frame_size):
deb.Param('frame_size=%s' % frame_size)
frame_dim = self.__getFrameDim()
nb_frames = frame_size / frame_dim.getMemSize()
nb_frames = frame_size // frame_dim.getMemSize()
control = _control_ref()
image = control.image()
image_type = image.getImageType()
......
############################################################################
# This file is part of LImA, a Library for Image Acquisition
#
# Copyright (C) : 2009-2011
# Copyright (C) : 2009-2021
# European Synchrotron Radiation Facility
# BP 220, Grenoble 38043
# CS40220 38043 Grenoble Cedex 9
# FRANCE
#
# Contact: lima@esrf.fr
#
# This is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
......@@ -27,9 +29,12 @@ from Lima.Server import AttrHelper
class MaskDeviceServer(BasePostProcess) :
MASK_TASK_NAME = 'MaskTask'
Core.DEB_CLASS(Core.DebModApplication,'MaskDeviceServer')
@Core.DEB_MEMBER_FUNCT
def __init__(self,cl,name) :
self.__maskTask = None
self.__maskFile = None
self.__maskImage = Core.Processlib.Data()
self.__Type = {'STANDARD' : 0,
......@@ -38,6 +43,7 @@ class MaskDeviceServer(BasePostProcess) :
BasePostProcess.__init__(self,cl,name)
MaskDeviceServer.init_device(self)
@Core.DEB_MEMBER_FUNCT
def set_state(self,state) :
if(state == PyTango.DevState.OFF) :
if(self.__maskTask) :
......@@ -55,6 +61,33 @@ class MaskDeviceServer(BasePostProcess) :
self.__maskTask.setMaskImage(self.__maskImage)
PyTango.LatestDeviceImpl.set_state(self,state)
#------------------------------------------------------------------
# Read MaskFile attribute
#------------------------------------------------------------------
@Core.DEB_MEMBER_FUNCT
def read_MaskFile(self, attr):
if self.__maskFile is not None:
attr.set_value(self.__maskFile)
else:
attr.set_value("")
#------------------------------------------------------------------
# Write MaskFile attribute
#------------------------------------------------------------------
@Core.DEB_MEMBER_FUNCT
def write_MaskFile(self, attr):
filename = attr.get_write_value()
self.setMaskFile(filename)
def is_MaskFile_allowed(self,mode):
return True
#==================================================================
#
# Mask command methods
#
#==================================================================
@Core.DEB_MEMBER_FUNCT
def setMaskImage(self,filepath) :
"""Set a mask image from a EDF filename.
......@@ -70,9 +103,16 @@ class MaskDeviceServer(BasePostProcess) :
"""
maskImage = getMaskFromFile(filepath)
self.__maskImage = maskImage
self.__maskFile = filepath
if self.__maskTask:
self.__maskTask.setMaskImage(self.__maskImage)
@Core.DEB_MEMBER_FUNCT
def setMaskFile(self, filepath):
""" new command to fit with other correction plugin api
"""
self.setMaskImage(filepath)
#------------------------------------------------------------------
# getAttrStringValueList command:
#
......@@ -105,6 +145,9 @@ class MaskDeviceServerClass(PyTango.DeviceClass) :
# Command definitions
cmd_list = {
'setMaskImage':
[[PyTango.DevString,"Full path of mask image file"],
[PyTango.DevVoid,""]],
'setMaskFile':
[[PyTango.DevString,"Full path of mask image file"],
[PyTango.DevVoid,""]],
'getAttrStringValueList':
......@@ -129,7 +172,11 @@ class MaskDeviceServerClass(PyTango.DeviceClass) :
[[PyTango.DevString,
PyTango.SCALAR,
PyTango.READ_WRITE]],
}
'MaskFile':
[[PyTango.DevString,
PyTango.SCALAR,
PyTango.READ_WRITE]],
}
#------------------------------------------------------------------
......
############################################################################
# This file is part of LImA, a Library for Image Acquisition
#
# Copyright (C) : 2009-2011
# Copyright (C) : 2009-2021
# European Synchrotron Radiation Facility
# BP 220, Grenoble 38043
# CS40220 38043 Grenoble Cedex 9
# FRANCE
#
# Contact: lima@esrf.fr
#
# This is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
......@@ -45,17 +47,24 @@ class Roi2spectrumDeviceServer(BasePostProcess) :
#--------- Add you global variables here --------------------------
ROI_SPECTRUM_TASK_NAME = "Roi2SpectrumTask"
Core.DEB_CLASS(Core.DebModApplication,'Roi2spectrumDeviceServer')
#------------------------------------------------------------------
# Device constructor
#------------------------------------------------------------------
@Core.DEB_MEMBER_FUNCT
def __init__(self,cl, name):
self.__roi2spectrumMgr = None
self.__roiName2ID = {}
self.__roiID2Name = {}
self.__currentRoiId = 0
self.__maskFile = None
self.__maskData = None
BasePostProcess.__init__(self,cl,name)
Roi2spectrumDeviceServer.init_device(self)
self.setMaskFile(self.MaskFile)
@Core.DEB_MEMBER_FUNCT
def set_state(self,state) :
if(state == PyTango.DevState.OFF) :
if(self.__roi2spectrumMgr) :
......@@ -70,6 +79,9 @@ class Roi2spectrumDeviceServer(BasePostProcess) :
self.__roi2spectrumMgr = extOpt.addOp(Core.ROI2SPECTRUM,
self.ROI_SPECTRUM_TASK_NAME,
self._runLevel)
self.__roi2spectrumMgr.setBufferSize(int(self.BufferSize))
if self.__maskData is not None:
self.__roi2spectrumMgr.setMask(self.__maskData)
self.__roi2spectrumMgr.clearCounterStatus()
PyTango.LatestDeviceImpl.set_state(self,state)
......@@ -77,32 +89,77 @@ class Roi2spectrumDeviceServer(BasePostProcess) :
#------------------------------------------------------------------
# Read BufferSize attribute
#------------------------------------------------------------------
@Core.DEB_MEMBER_FUNCT
def read_BufferSize(self, attr):
value_read = self.__roi2spectrumMgr.getBufferSize()
attr.set_value(value_read)
attr.set_value(self.BufferSize)
#------------------------------------------------------------------
# Write BufferSize attribute
#------------------------------------------------------------------
@Core.DEB_MEMBER_FUNCT
def write_BufferSize(self, attr):
data = attr.get_write_value()
self.__roi2spectrumMgr.setBufferSize(data)
self.BufferSize = int(data)
if self.__roi2spectrumMgr is not None:
self.__roi2spectrumMgr.setBufferSize(data)
def is_BufferSize_allowed(self,mode):
return True
#------------------------------------------------------------------
# Read MaskFile attribute
#------------------------------------------------------------------
def read_MaskFile(self, attr):
if self.__maskFile is not None:
attr.set_value(self.__maskFile)
else:
attr.set_value("")
#------------------------------------------------------------------
# Write MaskFile attribute
#------------------------------------------------------------------
def write_MaskFile(self, attr):
filename = attr.get_write_value()
self.setMaskFile(filename)
def is_MaskFile_allowed(self,mode):
return True
#------------------------------------------------------------------
# Read CounterStatus attribute
#------------------------------------------------------------------
@Core.DEB_MEMBER_FUNCT
def read_CounterStatus(self, attr):
value_read = self.__roi2spectrumMgr.getCounterStatus()
attr.set_value(value_read)
#------------------------------------------------------------------
# Read MaskFile attribute
#------------------------------------------------------------------
@Core.DEB_MEMBER_FUNCT
def read_MaskFile(self, attr):
if self._maskFile is not None:
attr.set_value(self._maskFile)
else:
attr.set_value("")
#------------------------------------------------------------------
# Write MaskFile attribute
#------------------------------------------------------------------
@Core.DEB_MEMBER_FUNCT
def write_MaskFile(self, attr):
filename = attr.get_write_value()
self.setMaskFile(filename)
def is_MaskFile_allowed(self,mode):
return True
#==================================================================
#
# Roi2spectrum command methods
#
#==================================================================
@Core.DEB_MEMBER_FUNCT
def addNames(self,argin):
roi_id = []
for roi_name in argin:
......@@ -115,6 +172,7 @@ class Roi2spectrumDeviceServer(BasePostProcess) :
roi_id.append(self.__roiName2ID[roi_name])
return roi_id
@Core.DEB_MEMBER_FUNCT
def removeRois(self,argin):
if self.__roi2spectrumMgr :
self.__roi2spectrumMgr.removeRois(argin)
......@@ -122,6 +180,7 @@ class Roi2spectrumDeviceServer(BasePostProcess) :
roi_id = self.__roiName2ID.pop(roi_name,None)
self.__roiID2Name.pop(roi_id,None)
@Core.DEB_MEMBER_FUNCT
def setRois(self,argin) :
if self.__roi2spectrumMgr is None:
raise RuntimeError('should start the device first')
......@@ -137,11 +196,13 @@ class Roi2spectrumDeviceServer(BasePostProcess) :
else:
raise AttributeError('should be a vector as follow [roi_id0,x0,y0,width0,height0,...')
@Core.DEB_MEMBER_FUNCT
def getNames(self):
if self.__roi2spectrumMgr is None:
raise RuntimeError('should start the device first')
return self.__roi2spectrumMgr.getNames()
@Core.DEB_MEMBER_FUNCT
def getRois(self,argin):
if self.__roi2spectrumMgr is None:
raise RuntimeError('should start the device first')
......@@ -159,6 +220,7 @@ class Roi2spectrumDeviceServer(BasePostProcess) :
roi_list.append((roi_id, x, y, w, h))
return list(itertools.chain(*roi_list))
@Core.DEB_MEMBER_FUNCT
def getRoiModes(self,argin) :
if self.__roi2spectrumMgr is None:
raise RuntimeError('should start the device first')
......@@ -177,6 +239,7 @@ class Roi2spectrumDeviceServer(BasePostProcess) :
roi_mode_list.append(roi_mode_map[roi_mode])
return roi_mode_list
@Core.DEB_MEMBER_FUNCT
def setRoiModes(self,argin) :
roi_mode_map = {
'COLUMN_SUM': Roi2SpectrumTask.COLUMN_SUM,
......@@ -185,22 +248,31 @@ class Roi2spectrumDeviceServer(BasePostProcess) :
rois_modes = [(n, roi_mode_map[m]) for n,m in grouper(2, argin)]
self.__roi2spectrumMgr.setRoiModes(rois_modes)
@Core.DEB_MEMBER_FUNCT
def clearAllRois(self):
self.__roi2spectrumMgr.clearAllRois()
@Core.DEB_MEMBER_FUNCT
def setMaskFile(self,argin) :
if len(argin):
try:
mask = getMaskFromFile(argin)
data = getMaskFromFile(argin)
except:
raise ValueError(f"Could read mask from {argin}")
self.__maskData = data
self.__maskFile = argin
if self.__roiCounterMgr is not None:
self.__roi2spectrumMgr.setMask(mask)
self.__roi2spectrumMgr.setMask(self.__maskData)
else:
if self.__roiCounterMgr is not None:
emptyData = Core.Processlib.Data()
self.__roiCounterMgr.setMask(emptyData)
if self.__maskData is not None:
# reset the mask if needed
if self.__roiCounterMgr is not None:
emptyData = Core.Processlib.Data()
self.__roiCounterMgr.setMask(emptyData)
self.__maskData = None
self.__maskFile = None
@Core.DEB_MEMBER_FUNCT
def readImage(self,argin) :
roiId,fromImageId = argin
roi_name = self.__roiID2Name.get(roiId,None)
......@@ -231,7 +303,13 @@ class Roi2spectrumDeviceServerClass(PyTango.DeviceClass):
# Device Properties
device_property_list = {
}
'BufferSize':
[PyTango.DevShort,
"Rois buffer size",[256]],
'MaskFile':
[PyTango.DevString,
"Mask file", ""],
}
# Command definitions
......@@ -281,6 +359,10 @@ class Roi2spectrumDeviceServerClass(PyTango.DeviceClass):
[[PyTango.DevLong,
PyTango.SCALAR,
PyTango.READ_WRITE]],
'MaskFile':
[[PyTango.DevString,
PyTango.SCALAR,
PyTango.READ_WRITE]],
'CounterStatus':
[[PyTango.DevLong,
PyTango.SCALAR,
......
############################################################################
# This file is part of LImA, a Library for Image Acquisition
#
# Copyright (C) : 2009-2021
# European Synchrotron Radiation Facility
# BP 220, Grenoble 38043
# FRANCE
#
# This is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
############################################################################
import itertools
import weakref
import PyTango
import sys
import numpy
import processlib
from Lima import Core
from Lima.Server.plugins.Utils import getDataFromFile,BasePostProcess
from Lima.Server import AttrHelper
def grouper(n, iterable, padvalue=None):
return zip(*[itertools.chain(iterable, itertools.repeat(padvalue, n-1))]*n)
class AcqCallback(Core.SoftCallback):
def __init__(self,container):
Core.SoftCallback.__init__(self)
self._container = container
def prepare(self) :
#New acquisition will start
self._container._roiCollectionMgr.prepare()
self._container._roiCollectionMgr.resetHistory()
#==================================================================
# RoiCollection Class Description:
#
#
#==================================================================
class RoiCollectionDeviceServer(BasePostProcess) :
Core.DEB_CLASS(Core.DebModApplication, 'RoiCollectionDeviceServer')
#--------- Add you global variables here --------------------------
ROI_COLLECTION_TASK_NAME = "RoiCollectionTask"
#------------------------------------------------------------------
# Device constructor
#------------------------------------------------------------------
@Core.DEB_MEMBER_FUNCT
def __init__(self,cl, name):
self._mgr = None
self._maskFile = None
self._maskData = None
self._roiCollectionMgr = Core.Processlib.Tasks.RoiCollectionManager()
self._roiCollectionTask = Core.Processlib.Tasks.RoiCollectionTask(self._roiCollectionMgr)
self._acq_callback = AcqCallback(self)
BasePostProcess.__init__(self,cl,name)
RoiCollectionDeviceServer.init_device(self)
# Set from properties
self.setMaskFile(self.MaskFile)
self._roiCollectionMgr.resizeHistory(self.BufferSize)
@Core.DEB_MEMBER_FUNCT
def set_state(self,state) :
if(state == PyTango.DevState.OFF) :
if(self._mgr) :
self._mgr = None
ctControl = _control_ref()
extOpt = ctControl.externalOperation()
extOpt.delOp(self.ROI_COLLECTION_TASK_NAME)
elif(state == PyTango.DevState.ON) :
if not self._mgr:
ctControl = _control_ref()
extOpt = ctControl.externalOperation()
self._mgr = extOpt.addOp(Core.USER_SINK_TASK,self.ROI_COLLECTION_TASK_NAME,
self._runLevel)
self._mgr.setSinkTask(self._roiCollectionTask)
self._mgr.registerCallback(self._acq_callback)
PyTango.LatestDeviceImpl.set_state(self,state)
#------------------------------------------------------------------
# Read BufferSize attribute
#------------------------------------------------------------------
@Core.DEB_MEMBER_FUNCT
def read_BufferSize(self, attr):
value_read = self._roiCollectionMgr.historySize()
attr.set_value(value_read)
#------------------------------------------------------------------
# Write BufferSize attribute
#------------------------------------------------------------------
@Core.DEB_MEMBER_FUNCT
def write_BufferSize(self, attr):
data = attr.get_write_value()
self._roiCollectionMgr.resizeHistory(data)
def is_BufferSize_allowed(self,mode):
return True
#------------------------------------------------------------------
# Read CounterStatus attribute
#------------------------------------------------------------------
@Core.DEB_MEMBER_FUNCT
def read_CounterStatus(self, attr):
value_read = self._roiCollectionMgr.lastFrameNumber()
attr.set_value(value_read)
#------------------------------------------------------------------
# Read MaskFile attribute
#------------------------------------------------------------------
@Core.DEB_MEMBER_FUNCT
def read_MaskFile(self, attr):
if self._maskFile is not None:
attr.set_value(self._maskFile)
else:
attr.set_value("")
#------------------------------------------------------------------
# Write MaskFile attribute
#------------------------------------------------------------------
@Core.DEB_MEMBER_FUNCT
def write_MaskFile(self, attr):
filename = attr.get_write_value()
self.setMaskFile(filename)
def is_MaskFile_allowed(self,mode):
return True
#==================================================================
#
# RoiCollection command methods
#
#==================================================================
@Core.DEB_MEMBER_FUNCT
def setMaskFile(self,argin) :
if len(argin):
try:
data = getMaskFromFile(argin)
except:
raise ValueError(f"Could read mask from {argin}")
self._roiCollectionMgr.setMask(data)
self._maskData = data
self._maskFile = argin
else:
if self._maskData is not None:
# reset the mask if needed
if self._roiCollectionMgr is not None:
emptyData = Core.Processlib.Data()
self._roiCollectionMgr.setMask(emptyData)
self._maskData = None
self._maskFile = None
@Core.DEB_MEMBER_FUNCT
def clearAllRois(self):
self._roiCollectionMgr.clearRois()
@Core.DEB_MEMBER_FUNCT
def setRois(self,argin):
if not len(argin) % 4:
roi_list = ((x,y,width,height) for x,y,width,height in grouper(4,argin))
self._roiCollectionMgr.setRois(list(roi_list))
else:
raise AttributeError('should be a vector as follow [x0,y0,width0,height0,...')
@Core.DEB_MEMBER_FUNCT
def readSpectrum(self,argin) :
result_counters = self._roiCollectionMgr.getHistory(argin)
if result_counters:
list_size = len(result_counters)
if list_size and result_counters[0].spectrum is not None:
spectrum_size = len(result_counters[0].spectrum)
first_frame_id = result_counters[0].frameNumber
returnArray = numpy.zeros(list_size * spectrum_size + 3,dtype = numpy.int)
returnArray[0:3] = (list_size,spectrum_size,first_frame_id)
indexArray = 3
for result in result_counters:
returnArray[indexArray:indexArray+spectrum_size] = result.spectrum
indexArray += spectrum_size
return returnArray
return numpy.array([],dtype = numpy.int)
#==================================================================
#
# RoiCollectionDeviceServerClass class definition
#
#==================================================================
class RoiCollectionDeviceServerClass(PyTango.DeviceClass):
Core.DEB_CLASS(Core.DebModApplication, 'RoiCollectionDeviceServerClass')
# Class Properties
class_property_list = {
}
# Device Properties
device_property_list = {
'BufferSize':
[PyTango.DevShort,
"Rois buffer size",[256]],
'MaskFile':
[PyTango.DevString,
"Mask file", ""],
}
# Command definitions
cmd_list = {
'setMaskFile':
[[PyTango.DevVarStringArray,"Full path of mask file"],
[PyTango.DevVoid,""]],
'clearAllRois':
[[PyTango.DevVoid,""],
[PyTango.DevVoid,""]],
'setRois':
[[PyTango.DevVarLongArray,"roi vector [x0,y0,width0,height0,x1,y1,width1,heigh1,...]"],
[PyTango.DevVoid,""]],
'readSpectrum':
[[PyTango.DevLong,"from which frame"],
[PyTango.DevVarLongArray,"number of spectrum,spectrum size,first frame id,spectrum0,spectrum1..."]],
'Start':
[[PyTango.DevVoid,""],
[PyTango.DevVoid,""]],
'Stop':
[[PyTango.DevVoid,""],
[PyTango.DevVoid,""]],
}
# Attribute definitions
attr_list = {
'BufferSize':
[[PyTango.DevLong,
PyTango.SCALAR,
PyTango.READ_WRITE]],
'MaskFile':
[[PyTango.DevString,
PyTango.SCALAR,
PyTango.READ_WRITE]],
'CounterStatus':
[[PyTango.DevLong,
PyTango.SCALAR,
PyTango.READ]],
'RunLevel':
[[PyTango.DevLong,
PyTango.SCALAR,
PyTango.READ_WRITE]],
}
#------------------------------------------------------------------
# RoiCollectionDeviceServerClass Constructor
#------------------------------------------------------------------
@Core.DEB_MEMBER_FUNCT
def __init__(self, name):
PyTango.DeviceClass.__init__(self, name)
self.set_type(name);
_control_ref = None
def set_control_ref(control_class_ref) :
global _control_ref
_control_ref= control_class_ref
def get_tango_specific_class_n_device() :
return RoiCollectionDeviceServerClass,RoiCollectionDeviceServer