Commit 710bd412 authored by Matias Guijarro's avatar Matias Guijarro
Browse files

Merge branch 'matt_fix' into 'master'

Fix bug in mapping for 570-530 module. Rename class matt to Matt.

See merge request !3915
parents 17880886 2248fc18
Pipeline #51741 passed with stages
in 104 minutes and 14 seconds
......@@ -67,33 +67,40 @@ The 750-436 and 740-430 Digital Input modules have 8 channels, while
the 750-402 and 750-408 Digital Input module has 4 channels only.
"""
from gevent import Timeout, sleep
from bliss.controllers.wago.wago import WagoController, ModulesConfig
from bliss.comm.util import get_comm
import time
from bliss.common.utils import wrap_methods
from bliss.common.logtools import log_debug
from bliss import global_map
class MattWagoMapping:
def __init__(self, nb_filter, att_type, att_alternate, stat_m, ctrl_m):
"""Create standard attenuators WAGO mapping"""
def __init__(self, nb_filter, att_alternate, stat_m, ctrl_m):
self.nb_filter = nb_filter
self.att_type = att_type
# self.att_type = att_type
self.att_alternate = att_alternate
self.mapping = []
self.generate_mapping(att_type, att_alternate, stat_m, ctrl_m)
self.generate_mapping(att_alternate, stat_m, ctrl_m)
def __repr__(self):
return "\n".join(self.mapping)
def generate_mapping(self, att_type, att_alternate, stat_m, ctrl_m):
def generate_mapping(self, att_alternate, stat_m, ctrl_m):
"""Generat the attenuators wago mapping.
Args:
att_alternate (int): alternate control/status cards.
stat_m (str): Name of the status module(s)
ctrl_m (str): Name of the control module(s).
"""
nstat = 2
nctrl = 1
STATUS_MODULE = "%s" % stat_m + ",%s"
CONTROL_MODULE = "%s" % ctrl_m + ",%s"
n_mod = 1
if stat_m == "750-402" or stat_m == "750-408":
if stat_m in ("750-402", "750-408"):
nstat = 1
n_mod = nstat * 2
......@@ -102,7 +109,7 @@ class MattWagoMapping:
STATUS = ["attstatus"] * nstat
if nctrl == 2:
CONTROL = ["attctrl,_"] * nctrl
CONTROL = ["attctrl,_"]
else:
CONTROL = ["attctrl"] * nctrl
......@@ -157,6 +164,8 @@ class MattWagoMapping:
class MattControl:
"""Control the monochriomatic attenuators"""
def __init__(
self,
wago_ip,
......@@ -166,6 +175,7 @@ class MattControl:
stat_m="750-436",
ctrl_m="750-530",
):
self.__wago = None
self.wago_ip = wago_ip
self.nb_filter = nb_filter
self.att_type = att_type
......@@ -173,21 +183,27 @@ class MattControl:
self.stat_m = stat_m
self.ctrl_m = ctrl_m
self.exec_timeout = 5
self.mapping = []
global_map.register(self)
@property
def wago(self):
"""Get the WAGO object.
Raises:
RuntimeError: Not connected to a Wago
"""
try:
return self.__wago
except AttributeError:
raise Exception("Matt is not connected to Wago")
raise RuntimeError("Not connected to a Wago")
def connect(self):
"""Connect to a WAGO. Generate the mapping."""
log_debug(self, "In connect()")
mapping = MattWagoMapping(
self.nb_filter, self.att_type, self.att_alternate, self.stat_m, self.ctrl_m
self.mapping = MattWagoMapping(
self.nb_filter, self.att_alternate, self.stat_m, self.ctrl_m
)
modules_config = ModulesConfig(str(mapping), ignore_missing=True)
modules_config = ModulesConfig(str(self.mapping), ignore_missing=True)
log_debug(self, "Trying to connect to Wago")
conf = {"modbustcp": {"url": self.wago_ip}}
......@@ -200,10 +216,15 @@ class MattControl:
global_map.register(self, children_list=[self.wago])
def exit(self):
"""Close the connectionn to the WAGO."""
log_debug(self, "In exit()")
self.wago.close()
def pos_read(self):
"""Read the position of the filters.
Returns:
(int): The value, representing the addition of the active bits.
"""
log_debug(self, "In pos_read()")
ret = 0
nstat = 2
......@@ -223,6 +244,10 @@ class MattControl:
return ret
def read_1posbit(self, stat):
"""Read the value for 1 position bit configuration.
Returns:
(int): The value, representing the addition of the active bits.
"""
log_debug(self, "In read_1posbit(%s)", stat)
ret = 0
for i in range(self.nb_filter):
......@@ -230,6 +255,10 @@ class MattControl:
return ret
def read_2posbit_odd(self, stat):
"""Read the value for 2 position odd bits configuration.
Returns:
(int): The value, representing the addition of the active bits.
"""
log_debug(self, "In read_2posbit_odd(%s)", stat)
ret = 0
nstat = 2
......@@ -242,6 +271,10 @@ class MattControl:
return ret
def read_2posbit(self, stat):
"""Read the value for 2 position bits configuration.
Returns:
(int): The value, representing the addition of the active bits.
"""
log_debug(self, "In read_2posbit(%s)", stat)
ret = 0
nstat = 2
......@@ -256,6 +289,11 @@ class MattControl:
return ret
def read_2posbit_alternate(self, stat):
"""Read the value for 2 position bits, alternatingn status modules
configuration.
Returns:
(int): The value, representing the addition of the active bits.
"""
log_debug(self, "In read_2posbit_alternate(%s)", stat)
ret = 0
nstat = 2
......@@ -267,6 +305,10 @@ class MattControl:
return ret
def pos_write(self, value):
"""Write a position value to the control module(s).
Args:
(int): The value, representing the addition of the active bits.
"""
log_debug(self, "In pos_write(%s)", value)
valarr = [False] * self.nb_filter
......@@ -274,10 +316,13 @@ class MattControl:
if value & (1 << i) > 0:
valarr[i] = True
valarr.insert(0, "attctrl")
self.wago.set(valarr)
self.wago.set("attctrl", valarr)
def _status_read(self):
"""Read the status of the WAGO, Retunt the the active modules only.
Returns:
(str): String with the active modules. Empty string if none active.
"""
mystr = ""
value = self.pos_read()
......@@ -287,6 +332,10 @@ class MattControl:
return mystr
def status_read(self):
"""Read the status of the attenuators - human readbel representation.
Retuns:
(list): List of 2 strings - filter names, filter status.
"""
log_debug(self, "In status_read()")
stat = []
mystr = ""
......@@ -310,118 +359,156 @@ class MattControl:
stat.append(mystr)
return stat
def matt_set(self, val):
log_debug(self, "In matt_set(%s)", val)
def matt_set(self, value):
"""Write the bit value in the WAGO. Check if the status corresponds
to the value. First insert the new filters, than extract the rest.
Args:
value(int): The value, representing the addition of the active bits
Raises:
RuntimeError: Filters in unknown position
Timeout waiting for status to be the same as value.
"""
log_debug(self, "In matt_set(%s)", value)
oldvalue = self.pos_read()
if oldvalue >= (1 << self.nb_filter):
raise RuntimeError("Filters in unknown position, exiting")
raise RuntimeError("Filters in unknown position")
if oldvalue == val:
if oldvalue == value:
return oldvalue
# first insert the new filters, leave untouched the old ones
if (~val & oldvalue) and (val & ~oldvalue):
self.pos_write(val | oldvalue)
time.sleep(0.25)
if (~value & oldvalue) and (value & ~oldvalue):
self.pos_write(value | oldvalue)
sleep(0.25)
# than redo to extract the old filters
self.pos_write(val)
self.pos_write(value)
check = self.pos_read()
t0 = time.time()
while check != val:
time.sleep(0.5)
check = self.pos_read()
if time.time() - t0 > self.exec_timeout:
raise RuntimeError("Timeout while waiting for filters to be %d" % val)
with Timeout(
self.exec_timeout,
RuntimeError("Timeout waiting for status to be %d" % value),
):
while check != value:
sleep(0.2)
check = self.pos_read()
def filter_set(self, filt, put_in):
"""Insert/extract a filter.
Args:
filt (int): Filter number (start from 0).
put_in (bool): True if to be put in, False otherwise.
Raises:
RuntimeError: Filters in unknown position
Timeout waiting for filter to be inserted/extracted.
"""
log_debug(self, "In filter_set(%s, %s)", filt, put_in)
value = self.pos_read()
if value >= (1 << self.nb_filter):
raise RuntimeError("Filters in unknown position, exiting")
ff = 1 << filt
raise RuntimeError("Filters in unknown position")
_ff = 1 << filt
if put_in is True:
if (value & ff) == 0:
value += ff
if (value & _ff) == 0:
value += _ff
else:
if (value & ff) != 0:
value &= ~ff
if (value & _ff) != 0:
value &= ~_ff
self.pos_write(value)
check = self.pos_read()
t0 = time.time()
while check != value:
time.sleep(0.5)
check = self.pos_read()
if time.time() - t0 > self.exec_timeout:
raise RuntimeError(
"Timeout while waiting for filter to be %s"
% ("in" if put_in is True else "out")
)
with Timeout(
self.exec_timeout,
RuntimeError(
"Timeout while waiting for filter to be %s"
% ("in" if put_in is True else "out")
),
):
while check != value:
sleep(0.2)
check = self.pos_read()
def mattin(self, filt):
"""Insert a filter.
Args:
filt (int): Filter number (start from 0).
"""
log_debug(self, "In mattin(%s)", filt)
if filt >= self.nb_filter:
raise RuntimeError("Wrong filter number %d" % filt)
self.filter_set(filt, True)
def mattout(self, filt):
"""Extract a filter.
Args:
filt (int): Filter number (start from 0).
"""
log_debug(self, "In mattout(%s)", filt)
if filt >= self.nb_filter:
raise RuntimeError("Wrong filter number %d" % filt)
self.filter_set(filt, False)
def mattsetall(self, flag):
log_debug(self, "In mattsetall(%s)", flag)
def mattsetall(self, put_in):
"""Insert/extract al the filters.
Args:
put_in (bool): True if to be put in, False otherwise.
"""
log_debug(self, "In mattsetall(%s)", put_in)
value = 0
if flag is True:
if put_in:
for i in range(self.nb_filter):
value += 1 << i
self.mattstatus_set(value)
def mattstatus_get(self):
"""Read the bit value in the WAGO. The status represents the
list of the active bits.
Returns:
(list): The status value as a list
Raises:
RuntimeError: Timeout waiting for status to be the same as value.
"""
log_debug(self, "In mattstatus_get()")
value = []
value.append(float(self.pos_read()))
value.append(self.pos_read())
return value
def mattstatus_set(self, value):
"""Write the bit value in the WAGO. Check if the status corresponds
to the value.
Args:
value(int): The value, representing the addition of the active bits
Raises:
RuntimeError: Timeout waiting for status to be the same as value.
"""
log_debug(self, "In mattstatus_set(%s)", value)
self.pos_write(value)
t0 = time.time()
check = self.pos_read()
while check != value:
time.sleep(0.5)
check = self.pos_read()
if time.time() - t0 > self.exec_timeout:
raise RuntimeError("Timeout while waiting for status to be %d" % value)
with Timeout(
self.exec_timeout,
RuntimeError("Timeout waiting for status to be %d" % value),
):
while check != value:
sleep(0.2)
check = self.pos_read()
class matt:
class Matt:
"""Controller class"""
def __init__(self, name, config):
self.name = name
wago_ip = config["controller_ip"]
nb_filter = config["nb_filter"]
try:
# attenuator type (0,1 or 2, default is 0)
att_type = config["att_type"]
except:
att_type = 0
try:
# wago card alternation (True or False, default False)
wago_alternate = config["wago_alternate"]
except:
wago_alternate = False
try:
# wago status module (default value "750-436")
stat_m = config["status_module"]
except:
stat_m = "750-436"
try:
# wago control module (default value "750-530")
ctrl_m = config["control_module"]
except:
ctrl_m = "750-530"
# attenuator type (0,1 or 2, default is 0)
att_type = config.get("att_type", 0)
# wago card alternation (True or False, default False)
wago_alternate = config.get("wago_alternate", False)
# wago status module (default value "750-436")
stat_m = config.get("status_module", "750-436")
# wago control module (default value "750-530")
ctrl_m = config.get("control_module", "750-530")
self.exec_timeout = int(config.get("timeout", 5))
self.__control = MattControl(
wago_ip, nb_filter, att_type, wago_alternate, stat_m, ctrl_m
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment