Commit 38ab35d8 authored by Yoann Sallaz Damaz's avatar Yoann Sallaz Damaz
Browse files

add wagosimu

parent 40fb628f
"""
Monochromatic attenuators, controlled via wago - pneumatic actuators
yml configuration example:
plugin: bliss
name: matt
package: bm07.matt_fip
class: matt_fip
wago: $wcd07b
filters:
- att1_b0
- att1_b1
- att1_b2
- att1_b3
- att1_b4
"""
from bliss.common.logtools import *
import time
class matt_fip:
def __init__(self, name, config):
self.exec_timeout = 5
self.wago = config["wago"]
self.filters = config["filters"]
self.nb_filter = len(self.filters)
def mattin(self, filt):
log_debug(self, "In mattin(%s)", filt)
if filt >= self.nb_filter:
raise RuntimeError("Wrong filter number %d" % filt)
self.filter_set(filt, True)
def mattout(self, filt):
log_debug(self, "In mattout(%s)", filt)
if filt >= self.nb_filter:
raise RuntimeError("Wrong filter number %d" % filt)
self.filter_set(filt, False)
def mattsetall(self, flag):
log_debug(self, "In mattsetall(%s)", flag)
value = 0
if flag is True:
for i in range(self.nb_filter):
value += 1 << i
self.mattstatus_set(value)
def mattstatus_get(self):
log_debug(self, "In mattstatus_get()")
value = []
value.append(float(self.pos_read()))
return value
def mattstatus_set(self, value):
log_debug(self, "In mattstatus_set(%s)", value)
self.pos_write(value)
t0 = time.time()
check = self.pos_read()
while check != value:
time.sleep(0.5)
check = self.pos_read()
if time.time() - t0 > self.exec_timeout:
raise RuntimeError("Timeout while waiting for status to be %d" % value)
def _status_read(self):
mystr = ""
value = self.pos_read()
for i in range(self.nb_filter):
if value & (1 << i) > 0:
mystr += "%d " % i
return mystr
def pos_read(self):
log_debug(self, "In pos_read()")
ret = 0
for i in range(self.nb_filter):
ret += self.wago.get(self.filters[i]) << i
return ret
def pos_write(self, value):
log_debug(self, "In pos_write(%s)", value)
for i in range(self.nb_filter):
if value & (1 << i) > 0:
self.wago.set(self.filters[i], True)
else:
self.wago.set(self.filters[i], False)
def filter_set(self, filt, put_in):
log_debug(self, "In filter_set(%s, %s)", filt, put_in)
value = self.pos_read()
if value >= (1 << self.nb_filter):
raise RuntimeError("Filters in unknown position, exiting")
ff = 1 << filt
if put_in is True:
if (value & ff) == 0:
value += ff
else:
if (value & ff) != 0:
value &= ~ff
self.pos_write(value)
check = self.pos_read()
t0 = time.time()
while check != value:
time.sleep(0.5)
check = self.pos_read()
if time.time() - t0 > self.exec_timeout:
raise RuntimeError(
"Timeout while waiting for filter to be %s"
% ("in" if put_in is True else "out")
)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment