Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • bliss/tango-servers/sample_changer
1 result
Show changes
Commits on Source (14)
Showing
with 515 additions and 426 deletions
*.pyc
/dist
/build
/src/*.egg-info
/.*
stages:
- test
test:
stage: test
image: condaforge/mambaforge:latest
before_script:
- eval "$(/opt/conda/bin/conda shell.posix hook)" # Enable activate/deactivate
- mamba create --name=test python=3.10 pytango
- conda activate test
- pip install .[test]
script:
- pytest tests -s
variables:
PIP_CACHE_DIR: /opt/cache/pip
# SampleChanger
Handles the sample changer robot for micro tomography.
The robot can be installed on different tomgraphy end stations and aligns
itself with the set-up used.
## Robot controller
The class can use few controllers from the [StaubLink project](https://gitlab.esrf.fr/watier/StaubLink).
to handle all functionality of the robot.
- `id19Controller.py`
- `bm05Controller2.py`
## TomoSampleChanger
The TomoSampleChanger have to be defined with the following properties.
- `robot_controller`: One of `bm05` or `id19`
- `robot_host`: The host name or IP of the robot
[build-system]
# Minimum requirements for the build system to execute.
requires = ["setuptools>=61.0", "wheel"] # PEP 508 syntax
build-backend = "setuptools.build_meta"
[project]
name = "samplechanger"
version = "0.2.0"
authors = [
{ name="Clemence Muzelle", email="clemence.muzelle@esrf.fr" },
]
description = "Sample changer software"
requires-python = ">=3.8"
classifiers = [
"Development Status :: 2 - Pre-Alpha",
"Intended Audience :: Developers",
"Natural Language :: English",
"Programming Language :: Python :: 3",
"License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)",
]
license = {text = "GNU Lesser General Public License v3"}
readme = "README.md"
dependencies = []
[project.optional-dependencies]
dev = ["black"]
test = ["pytest", "mock", "pytest-mock", "nosqltangodb"]
[project.scripts]
TomoSampleChanger = "samplechanger.sample_changer_ds.TomoSampleChanger:main"
TomoSamples = "samplechanger.samples_ds.TomoSamples:main"
[project.urls]
"Homepage" = "https://gitlab.esrf.fr/bliss/sample_changer"
"Bug Tracker" = "https://gitlab.esrf.fr/bliss/sample_changer/issues"
[tool.flake8]
# Check that this is aligned with your other tools like Black
max-line-length = 120
exclude = [
# No need to traverse our git directory
".git",
# There's no value in checking cache directories
"__pycache__"
]
# Use extend-ignore to add to already ignored checks which are anti-patterns like W503.
extend-ignore = [
# PEP 8 recommends to treat : in slices as a binary operator with the lowest priority, and to leave an equal
# amount of space on either side, except if a parameter is omitted (e.g. ham[1 + 1 :]).
# This behaviour may raise E203 whitespace before ':' warnings in style guide enforcement tools like Flake8.
# Since E203 is not PEP 8 compliant, we tell Flake8 to ignore this warning.
# https://black.readthedocs.io/en/stable/the_black_code_style/current_style.html#slices
"E203"
]
\ No newline at end of file
......@@ -5,5 +5,5 @@ exclude = docs
# Define setup.py command aliases here
test = pytest
[tool:pytest]
addopts = --cov SampleChanger --cov-report html:tests/htmlcov --cov-report term -v tests/
#[tool:pytest]
#addopts = --cov SampleChanger --cov-report html:tests/htmlcov --cov-report term -v tests/
# -*- coding: utf-8 -*-
import setuptools
"""The setup script."""
import sys
from setuptools import setup, find_packages
TESTING = any(x in sys.argv for x in ['test', 'pytest'])
with open('README.rst') as readme_file:
readme = readme_file.read()
requirements=[]
setup_requirements = ['pytest-runner', 'pytest'] if TESTING else []
test_requirements = ['pytest-cov', 'mock']
setup(
author="Clemence Muzelle",
author_email='clemence.muzelle@esrf.fr',
classifiers=[
'Development Status :: 2 - Pre-Alpha',
'Intended Audience :: Developers',
'License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)',
'Natural Language :: English',
"Programming Language :: Python :: 2",
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.6',
],
description="ID19 sample changer software",
install_requires=requirements,
license="GNU Lesser General Public License v3",
long_description=readme,
include_package_data=True,
keywords='samplechanger',
name='samplechanger',
packages=find_packages(include=['samplechanger']),
setup_requires=setup_requirements,
test_suite='tests',
tests_require=test_requirements,
url='https://gitlab.esrf.fr/id19/SampleChanger',
version='0.1.0',
zip_safe=False,
)
setuptools.setup()
File moved
......@@ -19,13 +19,13 @@ import math
class OffsetsCalculation:
def __init__(self,resolution,imageDir,stand,borderType):
#monkey.patch_all(thread=False)
self.totalProcessingTime = 0
#self.sampleSizeX = sampleSize[0]
#self.sampleSizeY = sampleSize[1]
#self.sampleSizeZ = sampleSize[2]
......@@ -36,79 +36,79 @@ class OffsetsCalculation:
self.pathImg = '/users/muzelle/Desktop/success2/007_HA-300_6.5_NUS2__004_/007_HA-300_6.5_NUS2__004_0000.edf'
self.pathFlat = '/users/muzelle/Desktop/success2/007_HA-300_6.5_NUS2__004_/refHST0000.edf'
self.pathDark = '/users/muzelle/Desktop/success2/007_HA-300_6.5_NUS2__004_/dark.edf'
self.resolution = resolution
self.stand = stand
self.borderType = borderType
self.img = None
self.flat = None
self.dark = None
self.dispImg = None
self.intImg = None
self.centImgY = None
self.centImgX = None
self.centerY = None
self.centerX = None
self.leftBorder = None
self.rightBorder = None
self.borderDetected = "false"
def imagesOpening(self):
start = time.time()
self.img = fabio.open(self.pathImg)
self.img = fabio.open(self.pathImg)
self.flat = fabio.open(self.pathFlat)
self.dark = fabio.open(self.pathDark)
self.centImgY = int(self.img.data.shape[0]/2)
self.centImgX = int(self.img.data.shape[1]/2)
imagesOpeningTime = time.time()-start
self.totalProcessingTime += imagesOpeningTime
print("Time to open images: "+str(imagesOpeningTime))
def imageContrastEnhancement(self):
start = time.time()
filtImg = np.subtract(self.img.data,self.dark.data).astype(np.float32)
filtImgFlat = np.subtract(self.flat.data,self.dark.data).astype(np.float32)
corrImg = np.divide(filtImg,filtImgFlat).astype(np.float32)
corrImg[corrImg < 0] = 0
enhImg = cv2.normalize(corrImg, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8)
self.img = corrImg
imageContrastEnhancementTime = time.time()-start
self.totalProcessingTime += imageContrastEnhancementTime
print("Time to enhance image contrast: "+str(imageContrastEnhancementTime))
def imageFiltering(self):
start = time.time()
blurImg = cv2.bilateralFilter(self.img,9,75,75)
blurImg = cv2.normalize(blurImg, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8)
self.img = blurImg
self.dispImg = blurImg
imageFilteringTime = time.time()-start
self.totalProcessingTime += imageFilteringTime
print("Time to filter image: "+str(imageFilteringTime))
def edgesDetection(self):
start = time.time()
......@@ -122,16 +122,16 @@ class OffsetsCalculation:
thr = np.median(edges)*1.33
edges = np.uint8(edges < -thr)
#measures = cv2.connectedComponentsWithStats(edges, 8, cv2.CV_32S)
#labImg = measures[1]
#areas = measures[2][:,cv2.CC_STAT_AREA]
labImg = measure.label(edges, neighbors=8, background=0)
labImg += 1
labImg += 1
props = measure.regionprops(labImg)
......@@ -142,149 +142,149 @@ class OffsetsCalculation:
labImg = morphology.remove_small_objects(labImg,noise,in_place=True)
edges = np.uint8(labImg > 0)
#self.dispImg[edges > 0] = 0
#savedImg = fabio.edfimage.EdfImage()
#savedImg.data = self.dispImg
#savedImg.write('/segfs/tango/tmp/clemence/SampleAlignment/edgesDetected.edf')
self.img = labImg
edgesDetectionTime = time.time()-start
self.totalProcessingTime += edgesDetectionTime
print("Time to find edges: "+str(edgesDetectionTime))
def sampleCenterXCalculation(self):
start = time.time()
props = measure.regionprops(self.img,self.dispImg)
centroids = [(p.centroid[0],p.centroid[1]) for p in props if p.max_intensity > 200]
if len(centroids) != 0:
self.borderDetected = "true"
if self.stand == 'absent':
if self.borderType == 'straight':
self.leftBorder = np.min(centroids,axis=0)[1]
self.rightBorder = np.max(centroids,axis=0)[1]
self.centerX = int((self.rightBorder - self.leftBorder)/2+self.leftBorder)
self.centerX = int((self.rightBorder - self.leftBorder)/2+self.leftBorder)
if self.borderType == 'irregular':
edges = self.img > 0
self.leftBorder = np.min(centroids,axis=0)[1]
print('left border: '+str(self.leftBorder))
self.rightBorder = np.max(centroids,axis=0)[1]
print('right border: '+str(self.rightBorder))
self.centerX = int(np.mean([(np.min(np.where(edges[raw,:])[0])+np.max(np.where(edges[raw,:])[0]))/2
for raw in range(edges.shape[0])
if len(np.where(edges[raw,:])[0]) != 0
for raw in range(edges.shape[0])
if len(np.where(edges[raw,:])[0]) != 0
and (np.max(np.where(edges[raw,:])[0])-np.min(np.where(edges[raw,:])[0])) > 200]
))
if self.borderType == 'oblique':
edges = self.img > 0
areas = [p.area for p in props]
fullBorder = [p.label for p in props if p.area == np.max(areas)][0]
centerRawByRaw = np.array([((np.min(np.where(edges[raw,:])[0])+np.max(np.where(edges[raw,:])[0]))/2,raw)
for raw in range(edges.shape[0])
if len(np.where(edges[raw,:])[0]) != 0 and
for raw in range(edges.shape[0])
if len(np.where(edges[raw,:])[0]) != 0 and
(np.max(np.where(edges[raw,:])[0])-np.min(np.where(edges[raw,:])[0])) > 100])
meanCenter = np.median(centerRawByRaw,axis=0)
rows = np.where(self.img == fullBorder)[0]
cols = np.where(self.img == fullBorder)[1]
ya = np.mean(rows[np.where(cols == np.min(cols))])
yb = np.mean(rows[np.where(cols == np.max(cols))])
xb = np.max(cols)
xa = np.min(cols)
slope = (yb-ya)/(xb-xa)
self.centerX = (self.centImgY-meanCenter[1])/slope + meanCenter[0]
offsetSX = (self.centImgX-self.centerX)*self.resolution*10**-3
centerXCalculationTime = time.time()-start
self.totalProcessingTime += centerXCalculationTime
print("Time to calculate sample center X: "+str(centerXCalculationTime))
return offsetSX
def sampleCenterYCalculation(self):
pass
def localTomoCenterXCalculation(self):
start = time.time()
props = measure.regionprops(self.img,self.intImg)
areas = [p.area for p in props]
border = [p.centroid for p in props if p.area == np.max(areas) and p.max_intensity > 200]
if len(border) == 0:
offsetSX = self.img.shape[1]
else:
ret,binImg = cv2.threshold(self.dispImg,0,1,cv2.THRESH_BINARY_INV+cv2.THRESH_OTSU)
labImg = measure.label(binImg, neighbors=8, background=0)
labImg += 1
labImg += 1
props = measure.regionprops(labImg)
areas = [p.area for p in props]
sample = [p.centroid for p in props if p.area == np.max(areas)][0]
if sample[1] > border[0][1]:
self.borderDetected = "right"
else:
else:
self.borderDetected = "left"
self.centerX = border[0][1]
offsetSX = (self.centImgX-self.centerX)*self.resolution*10**-3
centerXCalculationTime = time.time()-start
self.totalProcessingTime += centerXCalculationTime
print("Time to calculate sample center X: "+str(centerXCalculationTime))
return offsetSX
def displayResults(self):
try:
self.centerX
except NameError:
......@@ -293,26 +293,26 @@ class OffsetsCalculation:
self.centerY
except NameError:
self.centerY = 0
print('\n')
print('total processing time: '+str(self.totalProcessingTime))
print('\n')
print('\n')
print('Sample center detected: '+str(self.centerX)+' in X and: '+str(self.centerY)+' in Y')
print('Image center: '+str(self.centImgX)+' in X and: '+str(self.centImgY)+' in Y')
def displayImage(self):
edgesDetection = fabio.open('/segfs/tango/tmp/clemence/SampleAlignment/edgesDetected.edf')
qapp = qt.QApplication([])
plot = Plot2D()
plot.addImage(edgesDetection.data, legend='image')
plot.show()
qapp.exec_()
if __name__ == '__main__':
test = OffsetsCalculation(12.57,"/segfs/tango/tmp/clemence/SampleAlignment/","absent","straight")
test.imagesOpening()
test.imageContrastEnhancement()
......@@ -322,6 +322,4 @@ if __name__ == '__main__':
#test.sampleCenterXCalculation()
#test.displayResults()
#inst = OffsetsCalculation(0,"","","")
#inst.displayImage()
#inst.displayImage()
......@@ -43,35 +43,35 @@ class TomoSampleAlign(Device):
"""
__metaclass__ = DeviceMeta
# PROTECTED REGION ID(TomoSampleAlign.class_variable) ENABLED START #
green_mode = PyTango.GreenMode.Gevent
#
# Blocking operation to find sample center
#
def alignment_task(self):
try:
self.info_stream("started alignment")
self.is_error = False
self.is_moving = True
# alignment code
print ("Alignment start\n")
#gevent.sleep(3)
align = OffsetsCalculation(self.attr_Resolution_read,self.attr_ImageDirectory_read,self.attr_StandDetection_read,self.attr_BorderType_read)
align.imagesOpening()
align.imageContrastEnhancement()
align.imageFiltering()
align.edgesDetection()
if self.attr_AlignX_read:
if self.attr_LocalTomo_read == True:
offsetSX = align.localTomoCenterXCalculation()
self.attr_BorderDetected_read = align.borderDetected
print(self.attr_BorderDetected_read)
else:
offsetSX = align.sampleCenterXCalculation()
......@@ -81,22 +81,22 @@ class TomoSampleAlign(Device):
offsetSY = align.sampleCenterYCalculation()
else:
offsetSY = 0
align.displayResults()
print('Offset detected: '+str(int(offsetSX/(self.attr_Resolution_read*10**-3)))+' in X and: '+str(int(offsetSY/(self.attr_Resolution_read*10**-3)))+' in Y')
print('Relative SY moving of '+str(offsetSX))
print('Relative SZ moving of '+str(offsetSY))
self.attr_AlignmentOffsets_read = [offsetSX,offsetSY]
self.is_moving = False
self.info_stream("\nfinished alignment")
# catch all exceptions, print the traceback to the console and
# add the information to the device status until acknowledgement
except:
......@@ -106,10 +106,10 @@ class TomoSampleAlign(Device):
except PyTango.DevFailed as ex:
self.task_error = ex
PyTango.Except.print_exception(ex)
self.is_error = True
self.is_moving = False
# PROTECTED REGION END # // TomoSampleAlign.class_variable
# ----------
......@@ -183,13 +183,13 @@ class TomoSampleAlign(Device):
Device.init_device(self)
# PROTECTED REGION ID(TomoSampleAlign.init_device) ENABLED START #
# init flags for state and status
self.is_moving = False
self.is_error = False
self.running_task = None
self.task_error = None
self.attr_SampleSizeZ_read = 0.0
self.attr_SampleSizeY_read = 0.0
self.attr_SampleSizeX_read = 0.0
......@@ -201,7 +201,7 @@ class TomoSampleAlign(Device):
self.attr_AlignY_read = False
self.attr_AlignmentOffsets_read = [0.0,0.0]
print("done!")
# PROTECTED REGION END # // TomoSampleAlign.init_device
def always_executed_hook(self):
......@@ -416,8 +416,8 @@ class TomoSampleAlign(Device):
_state = PyTango.DevState.MOVING
else:
_state = PyTango.DevState.ON
self.set_state(_state)
self.set_state(_state)
return _state
# PROTECTED REGION END # // TomoSampleAlign.State
......@@ -425,7 +425,7 @@ class TomoSampleAlign(Device):
def dev_status(self):
# PROTECTED REGION ID(TomoSampleAlign.Status) ENABLED START #
state = self.dev_state()
self._status = "The device state is "
if state == PyTango.DevState.FAULT:
self._status += "FAULT"
......@@ -435,7 +435,7 @@ class TomoSampleAlign(Device):
if self.attr_ImageDirectory_read == "None":
self._status += "\nAccess path to images to process is not specified. Please enter it"
if self.attr_StandDetection_read == "None":
self._status += "\nPresence or absence of stand in the field of view is not specified. Please enter it"
self._status += "\nPresence or absence of stand in the field of view is not specified. Please enter it"
if self.attr_BorderType_read == "None":
self._status += "\nNo borders type specified. Please enter the type of borders to be detected"
if self.attr_Resolution_read == None:
......@@ -444,7 +444,7 @@ class TomoSampleAlign(Device):
self._status += "MOVING"
else:
self._status += "ON"
self.set_status(self._status)
return self._status
# PROTECTED REGION END # // TomoSampleAlign.Status
......@@ -454,12 +454,12 @@ class TomoSampleAlign(Device):
@DebugIt()
def AlignSample(self):
# PROTECTED REGION ID(TomoSampleAlign.AlignSample) ENABLED START #
self.running_task = gevent.spawn(self.alignment_task)
print("Running started")
self.is_moving = True
# PROTECTED REGION END # // TomoSampleAlign.AlignSample
def is_AlignSample_allowed(self):
......@@ -474,10 +474,10 @@ class TomoSampleAlign(Device):
def main(args=None, **kwargs):
# PROTECTED REGION ID(TomoSampleAlign.main) ENABLED START #
# Enable gevents for the server
kwargs.setdefault("green_mode", PyTango.GreenMode.Gevent)
return run((TomoSampleAlign,), args=args, **kwargs)
# PROTECTED REGION END # // TomoSampleAlign.main
......
......@@ -110,7 +110,7 @@ class TomoSample(Device):
def init_device(self):
Device.init_device(self)
# PROTECTED REGION ID(TomoSample.init_device) ENABLED START #
self.attr_Name_read = "None"
self.attr_Position_read = "None"
self.attr_SizeX_read = 0.0
......@@ -120,13 +120,13 @@ class TomoSample(Device):
self.attr_AlignSY_read = 0.0
self.attr_AlignSZ_read = 0.0
self.attr_Aligned_read = False
#
# Get sample changer position from the device name
dev_name = self.get_name()
dom, fam, member = dev_name.split("/")
self.attr_Position_read = member
# PROTECTED REGION END # // TomoSample.init_device
def always_executed_hook(self):
......@@ -236,13 +236,13 @@ class TomoSample(Device):
@DebugIt()
def dev_state(self):
# PROTECTED REGION ID(TomoSample.State) ENABLED START #
if self.attr_Name_read == "None":
_state = PyTango.DevState.DISABLE
else:
_state = PyTango.DevState.ON
self.set_state(_state)
self.set_state(_state)
return _state
# PROTECTED REGION END # // TomoSample.State
......@@ -251,7 +251,7 @@ class TomoSample(Device):
@DebugIt()
def Reset(self):
# PROTECTED REGION ID(TomoSample.Reset) ENABLED START #
myself = PyTango.DeviceProxy (self.get_name())
myself.write_attribute_asynch("Name" , "None")
myself.write_attribute_asynch("SizeX" , 0)
......@@ -261,7 +261,7 @@ class TomoSample(Device):
myself.write_attribute_asynch("AlignSY" , 0)
myself.write_attribute_asynch("AlignSZ" , 0)
myself.write_attribute_asynch("Aligned" , False)
# PROTECTED REGION END # // TomoSample.Reset
# ----------
......
......@@ -27,7 +27,7 @@ from PyTango import AttrWriteType, PipeWriteType
# PROTECTED REGION ID(TomoSamples.additionnal_import) ENABLED START #
import sys
import TomoSample
from . import TomoSample
# PROTECTED REGION END # // TomoSamples.additionnal_import
......@@ -39,7 +39,7 @@ class TomoSamples(Device):
"""
__metaclass__ = DeviceMeta
# PROTECTED REGION ID(TomoSamples.class_variable) ENABLED START #
# PROTECTED REGION END # // TomoSamples.class_variable
# -----------------
......@@ -47,7 +47,7 @@ class TomoSamples(Device):
# -----------------
SamplePath = device_property(
dtype='str', default_value="id19/tomo-sample/"
dtype='str', default_value="{{prefix}}/tomo-sample/"
)
ProtectedSampleNames = device_property(
......@@ -95,33 +95,37 @@ class TomoSamples(Device):
def init_device(self):
Device.init_device(self)
# PROTECTED REGION ID(TomoSamples.init_device) ENABLED START #
self.sample_devices = {}
self.tomo_samples = {}
self.active_sample_name = "None"
self.active_sample_position = "None"
SamplePath = self.SamplePath
prefix = self.get_name().split("/")[0]
SamplePath = SamplePath.replace("{{prefix}}", prefix)
for col in ["A", "B", "C"]:
for n in range(1,19):
pos = "%s%d" % (col, n)
# The samples A7 and A8 do not exist on the sample changer shelf
if pos == "A7" or pos == "A8" or pos == "B7" or pos == "B8" or pos == "C7" or pos == "C8":
continue
dev_name = self.SamplePath + pos
dev_name = SamplePath + pos
dev_proxy = PyTango.DeviceProxy (dev_name)
self.sample_devices[pos] = dev_proxy
sample_name = dev_proxy.read_attribute("Name").value
if sample_name != "None":
self.tomo_samples[pos] = sample_name
print(self.tomo_samples)
# PROTECTED REGION END # // TomoSamples.init_device
def always_executed_hook(self):
......@@ -160,26 +164,26 @@ class TomoSamples(Device):
def read_ActiveAligned(self):
# PROTECTED REGION ID(TomoSamples.ActiveAligned_read) ENABLED START #
if self.active_sample_position == "None":
err_msg = "No active sample defined!"
PyTango.Except.throw_exception("ReadError", err_msg, "read_ActiveAligned")
aligned = False
aligned = self.sample_devices[self.active_sample_position].read_attribute("Aligned").value
return aligned
# PROTECTED REGION END # // TomoSamples.ActiveAligned_read
def write_ActiveAligned(self, value):
# PROTECTED REGION ID(TomoSamples.ActiveAligned_write) ENABLED START #
if self.active_sample_position == "None":
err_msg = "No active sample defined!"
PyTango.Except.throw_exception("WriteError", err_msg, "write_ActiveAligned")
self.sample_devices[self.active_sample_position].write_attribute("Aligned", value)
# PROTECTED REGION END # // TomoSamples.ActiveAligned_write
def is_ActiveAligned_allowed(self, attr):
......@@ -192,18 +196,18 @@ class TomoSamples(Device):
def read_Samples(self):
# PROTECTED REGION ID(TomoSamples.Samples_read) ENABLED START #
numbermap = {'A1': 1, 'A2': 2, 'A3': 3, 'A4': 4, 'A5': 5, 'A6': 6, 'A9': 7, 'A10': 8, 'A11': 9, 'A12': 10, 'A13': 11, 'A14': 12, 'A15': 13, 'A16': 14, 'A17': 15, 'A18': 16, \
'B1': 17, 'B2': 18, 'B3': 19, 'B4': 20, 'B5': 21, 'B6': 22, 'B9': 23, 'B10': 24, 'B11': 25, 'B12': 26, 'B13': 27, 'B14': 28, 'B15': 29, 'B16': 30, 'B17': 31, 'B18': 32, \
'C1': 33, 'C2': 34, 'C3': 35, 'C4': 36, 'C5': 37, 'C6': 38, 'C9': 39, 'C10': 40, 'C11': 41, 'C12': 42, 'C13': 43, 'C14': 44, 'C15': 45, 'C16': 46, 'C17': 47, 'C18': 48}
samplelist = []
#for pos in sorted(self.tomo_samples.keys()):
for pos in sorted(self.tomo_samples, key=numbermap.__getitem__):
samplelist.append(pos)
samplelist.append(self.tomo_samples[pos])
return samplelist
# PROTECTED REGION END # // TomoSamples.Samples_read
def is_Samples_allowed(self, attr):
......@@ -213,30 +217,30 @@ class TomoSamples(Device):
def read_ActiveSampleSize(self):
# PROTECTED REGION ID(TomoSamples.ActiveSampleSize_read) ENABLED START #
if self.active_sample_position == "None":
err_msg = "No active sample defined!"
PyTango.Except.throw_exception("ReadError", err_msg, "read_ActiveSampleSize")
sample_size = [0.0, 0.0, 0.0]
sample_size[0] = self.sample_devices[self.active_sample_position].read_attribute("SizeX").value
sample_size[1] = self.sample_devices[self.active_sample_position].read_attribute("SizeY").value
sample_size[2] = self.sample_devices[self.active_sample_position].read_attribute("SizeZ").value
return sample_size
# PROTECTED REGION END # // TomoSamples.ActiveSampleSize_read
def write_ActiveSampleSize(self, value):
# PROTECTED REGION ID(TomoSamples.ActiveSampleSize_write) ENABLED START #
if self.active_sample_position == "None":
err_msg = "No active sample defined!"
PyTango.Except.throw_exception("WriteError", err_msg, "write_ActiveSampleSize")
self.sample_devices[self.active_sample_position].write_attribute("SizeX", value[0])
self.sample_devices[self.active_sample_position].write_attribute("SizeY", value[1])
self.sample_devices[self.active_sample_position].write_attribute("SizeZ", value[2])
# PROTECTED REGION END # // TomoSamples.ActiveSampleSize_write
def is_ActiveSampleSize_allowed(self, attr):
......@@ -249,30 +253,30 @@ class TomoSamples(Device):
def read_ActiveSampleAlignPos(self):
# PROTECTED REGION ID(TomoSamples.ActiveSampleAlignPos_read) ENABLED START #
if self.active_sample_position == "None":
err_msg = "No active sample defined!"
PyTango.Except.throw_exception("ReadError", err_msg, "read_ActiveSampleAlignPos")
align_pos = [0.0, 0.0, 0.0]
align_pos[0] = self.sample_devices[self.active_sample_position].read_attribute("AlignSX").value
align_pos[1] = self.sample_devices[self.active_sample_position].read_attribute("AlignSY").value
align_pos[2] = self.sample_devices[self.active_sample_position].read_attribute("AlignSZ").value
return align_pos
# PROTECTED REGION END # // TomoSamples.ActiveSampleAlignPos_read
def write_ActiveSampleAlignPos(self, value):
# PROTECTED REGION ID(TomoSamples.ActiveSampleAlignPos_write) ENABLED START #
if self.active_sample_position == "None":
err_msg = "No active sample defined!"
PyTango.Except.throw_exception("WriteError", err_msg, "write_ActiveSampleAlignPos")
self.sample_devices[self.active_sample_position].write_attribute("AlignSX", value[0])
self.sample_devices[self.active_sample_position].write_attribute("AlignSY", value[1])
self.sample_devices[self.active_sample_position].write_attribute("AlignSZ", value[2])
# PROTECTED REGION END # // TomoSamples.ActiveSampleAlignPos_write
def is_ActiveSampleAlignPos_allowed(self, attr):
......@@ -291,45 +295,45 @@ class TomoSamples(Device):
@DebugIt()
def dev_state(self):
# PROTECTED REGION ID(TomoSamples.State) ENABLED START #
if len(self.tomo_samples) == 0:
_state = PyTango.DevState.DISABLE
else:
_state = PyTango.DevState.ON
self.set_state(_state)
self.set_state(_state)
return _state
# PROTECTED REGION END # // TomoSamples.State
@DebugIt()
def dev_status(self):
# PROTECTED REGION ID(TomoSamples.Status) ENABLED START #
state = self.dev_state()
self._status = "The device state is "
if state == PyTango.DevState.DISABLE:
self._status += "DISABLED"
self._status += "\nNo samples defined! Add samples first."
else:
self._status += "ON"
self.set_status(self._status)
return self._status
# PROTECTED REGION END # // TomoSamples.Status
@command(
dtype_in=('str',),
doc_in="Sampe position and sample name",
dtype_in=('str',),
doc_in="Sampe position and sample name",
)
@DebugIt()
def AddSample(self, argin):
# PROTECTED REGION ID(TomoSamples.AddSample) ENABLED START #
position = argin[0].upper()
# check for valid sample name
# Column must be A, B or C
column = position[0]
......@@ -337,35 +341,35 @@ class TomoSamples(Device):
(column != "A" and column != "B" and column != "C"):
err_msg = "Wrong sample name. The sample column in the shelf must be A, B or C"
PyTango.Except.throw_exception("WriteError", err_msg, "AddSample")
#row number must be between 1 and 18
row = int(position[1:3])
if row < 1 or row > 18:
err_msg = "Wrong sample name. The sample row in the shelf must be between 1 and 18"
PyTango.Except.throw_exception("WriteError", err_msg, "AddSample")
# the samples A7 and A8 don't exist!!!
if row == 7 or row == 8:
err_msg = "The samples A,B,C 7 and 8 do not exist on the sample changer shelf!"
PyTango.Except.throw_exception("WriteError", err_msg, "Addample")
if position in self.tomo_samples.keys():
err_msg = "A sample is already defined at position %s" % position
PyTango.Except.throw_exception("WriteError", err_msg, "AddSample")
self.sample_devices[position].write_attribute("Name", argin[1])
self.tomo_samples[position] = argin[1]
# PROTECTED REGION END # // TomoSamples.AddSample
@command(
dtype_in='str',
doc_in="Sample position or sample name",
dtype_in='str',
doc_in="Sample position or sample name",
)
@DebugIt()
def DeleteSample(self, argin):
# PROTECTED REGION ID(TomoSamples.DeleteSample) ENABLED START #
position = "None"
if argin.upper() in self.tomo_samples.keys():
position = argin.upper()
......@@ -374,7 +378,7 @@ class TomoSamples(Device):
if self.tomo_samples[i].lower() == argin.lower():
position = i
break
if position != "None":
self.sample_devices[position].Reset()
del self.tomo_samples[position]
......@@ -384,7 +388,7 @@ class TomoSamples(Device):
else:
err_msg = "Sample %s not found" % argin
PyTango.Except.throw_exception("WriteError", err_msg, "DeleteSample")
# PROTECTED REGION END # // TomoSamples.DeleteSample
def is_DeleteSample_allowed(self):
......@@ -397,22 +401,22 @@ class TomoSamples(Device):
@DebugIt()
def Reset(self):
# PROTECTED REGION ID(TomoSamples.Reset) ENABLED START #
for i in list(self.tomo_samples.keys()):
for i,_ in list(self.tomo_samples.items()):
# do not reset samples with protected names
found = False
for psn in self.ProtectedSampleNames:
if psn.lower() == self.tomo_samples[i].lower():
found = True;
break;
if found == False:
self.sample_devices[i].Reset()
del self.tomo_samples[i]
self.active_sample_position = "None"
self.active_sample_name = "None"
# PROTECTED REGION END # // TomoSamples.Reset
def is_Reset_allowed(self):
......@@ -421,13 +425,13 @@ class TomoSamples(Device):
# PROTECTED REGION END # // TomoSamples.is_Reset_allowed
@command(
dtype_in='str',
doc_in="Sample position or sample name",
dtype_in='str',
doc_in="Sample position or sample name",
)
@DebugIt()
def ActivateSample(self, argin):
# PROTECTED REGION ID(TomoSamples.ActivateSample) ENABLED START #
position = "None"
if argin.upper() in self.tomo_samples.keys():
position = argin.upper()
......@@ -436,7 +440,7 @@ class TomoSamples(Device):
if self.tomo_samples[i].lower() == argin.lower():
position = i
break
if position != "None":
self.active_sample_position = position
self.active_sample_name = self.tomo_samples[position]
......@@ -451,8 +455,8 @@ class TomoSamples(Device):
# PROTECTED REGION END # // TomoSamples.is_ActivateSample_allowed
@command(
dtype_in=('str',),
doc_in="Attribute name and its new value",
dtype_in=('str',),
doc_in="Attribute name and its new value",
)
@DebugIt()
def EditSample(self, argin):
......@@ -460,14 +464,18 @@ class TomoSamples(Device):
name = argin[0]
value = argin[1]
SamplePath = self.SamplePath
prefix = self.get_name().split("/")[0]
SamplePath = SamplePath.replace("{{prefix}}", prefix)
if name == "Position":
sample = []
sample.append(value)
sample.append(self.active_sample_name)
self.AddSample(sample)
prevSample = PyTango.DeviceProxy (self.SamplePath + self.active_sample_position)
newSample = PyTango.DeviceProxy (self.SamplePath + value)
prevSample = PyTango.DeviceProxy (SamplePath + self.active_sample_position)
newSample = PyTango.DeviceProxy (SamplePath + value)
newSample.write_attribute_asynch("SizeX" , prevSample.SizeX)
newSample.write_attribute_asynch("SizeY" , prevSample.SizeY)
newSample.write_attribute_asynch("SizeZ" , prevSample.SizeZ)
......@@ -478,28 +486,28 @@ class TomoSamples(Device):
self.init_device()
self.ActivateSample(newSample.Position)
elif name == "Name":
sample = PyTango.DeviceProxy (self.SamplePath + self.active_sample_position)
sample = PyTango.DeviceProxy (SamplePath + self.active_sample_position)
sample.write_attribute_asynch("Name" , value)
self.init_device()
elif name == "SizeX":
sample = PyTango.DeviceProxy (self.SamplePath + self.active_sample_position)
sample = PyTango.DeviceProxy (SamplePath + self.active_sample_position)
sample.write_attribute_asynch("SizeX" , float(value))
elif name == "SizeY":
sample = PyTango.DeviceProxy (self.SamplePath + self.active_sample_position)
sample = PyTango.DeviceProxy (SamplePath + self.active_sample_position)
sample.write_attribute_asynch("SizeY" , float(value))
elif name == "SizeZ":
sample = PyTango.DeviceProxy (self.SamplePath + self.active_sample_position)
sample = PyTango.DeviceProxy (SamplePath + self.active_sample_position)
sample.write_attribute_asynch("SizeZ" , float(value))
elif name == "AlignSX":
sample = PyTango.DeviceProxy (self.SamplePath + self.active_sample_position)
sample = PyTango.DeviceProxy (SamplePath + self.active_sample_position)
sample.write_attribute_asynch("AlignSX" , float(value))
elif name == "AlignSY":
sample = PyTango.DeviceProxy (self.SamplePath + self.active_sample_position)
sample = PyTango.DeviceProxy (SamplePath + self.active_sample_position)
sample.write_attribute_asynch("AlignSY" , float(value))
elif name == "AlignSZ":
sample = PyTango.DeviceProxy (self.SamplePath + self.active_sample_position)
sample = PyTango.DeviceProxy (SamplePath + self.active_sample_position)
sample.write_attribute_asynch("AlignSZ" , float(value))
# PROTECTED REGION END # // TomoSamples.EditSample
# ----------
......@@ -509,9 +517,9 @@ class TomoSamples(Device):
def main(args=None, **kwargs):
# PROTECTED REGION ID(TomoSamples.main) ENABLED START #
return run((TomoSample.TomoSample, TomoSamples,), args=args, **kwargs)
# PROTECTED REGION END # // TomoSamples.main
if __name__ == '__main__':
......