Commit 65e3487e authored by Marie Claire Lagier's avatar Marie Claire Lagier
Browse files

bronkhorstWago imports base classes from bliss

parent 6858283e
Pipeline #60343 failed
import click
import tabulate
from bliss.config import settings
from bliss.comm.util import get_tango_proxy
from bliss.common.utils import ColorTags, BOLD, GREEN, YELLOW
class Bronkhorst:
def __init__(self, name, config):
self._name = name
self._config = config
self._controller = get_tango_proxy(config)
nodeadd = self._controller.nodeadd
self._chan2index = {}
for ind in range(len(nodeadd)):
self._chan2index[int(nodeadd[ind])] = ind
self._nodes = {}
self._channels = []
nodes_config = self._config.get("nodes", None)
if nodes_config is None:
raise RuntimeError("Bronkhorst {self._name}: No nodes defined")
for node_config in nodes_config:
channel = node_config.get("channel")
self._channels.append(channel)
node = self._get_bronkhorst_node(
node_config,
self,
self._chan2index[channel]
)
self._nodes[channel] = node
def _set_node_attr(self, node):
setattr(self, node._fullname, node)
def _del_node_attr(self, node):
delattr(self, node._fullname)
def __info__(self):
line1 = [""]
line2 = ["Setpoint (%)"]
line3 = ["Measure (%)"]
line4 = ["Flow (ml/min)"]
for channel, node in self._nodes.items():
if node._enabled.get():
line1.append(node._fullname)
line2.append(f"{node.setpoint:.2f}")
line3.append(f"{node.measure:.2f}")
line4.append(f"{node.flow:.2f}")
mystr = tabulate.tabulate([line1, line2, line3, line4], tablefmt="plain")
mystr += "\n"
return mystr
def _get_bronkhorst_node(self,
node_config,
controller,
index
):
node = BronkhorstNode(node_config, controller=controller,index=index)
return node
def _get_meta_data(self):
meta_data = {}
for chan, node in self._nodes.items():
if node._enabled.get():
meta_data[node._fullname] = node._get_meta_data()
return meta_data
def _print_selection(self, motivation):
print(f"\nSelect Node to {motivation}:\n")
line = [BOLD(" MFC"), BOLD("Name")]
if motivation.startswith("calibrate"):
line.append(BOLD("A"))
line.append(BOLD("B"))
if motivation.startswith("enable"):
line.append(BOLD("Enabled"))
lines = [line]
for chan, node in self._nodes.items():
if node._connected and (node._enabled.get() or motivation.startswith("enable")):
line = []
chanB = BOLD(f" {node._index+1}")
name = node._setting["name"]
line.append(f"{chanB}")
line.append(f"{name}")
if motivation.startswith("calibrate"):
conva = node._setting["conva"]
convb = node._setting["convb"]
line.append(f"{conva}")
line.append(f"{convb}")
if motivation.startswith("enable"):
line.append(node._enabled.get())
lines.append(line)
lines.append([""])
qu = BOLD("q")
lines.append([BOLD(" q"), "Quit"])
mystr = tabulate.tabulate(lines, tablefmt="plain")
print(mystr)
rep = click.prompt(f"\nYour choice (type an MFC number or {qu} to quit)", default="q")
return rep
def _rename(self, node, old_name, new_name):
delattr(self, old_name)
setattr(self, new_name, node)
def rename(self):
end = False
while not end:
rep = self._print_selection("change name")
if rep == "q":
end = True
else:
rep = int(rep)
nodeadd = self._controller.nodeadd
chan = self._controller.nodeadd[rep-1]
if chan in self._nodes.keys():
node = self._nodes[chan]
name = node._setting["name"]
name = click.prompt(f" New Name", default=name)
node._rename(name)
def calibrate(self):
end = False
while not end:
rep = self._print_selection("calibrate")
if rep == "q":
end = True
else:
rep = int(rep)
nodeadd = self._controller.nodeadd
chan = self._controller.nodeadd[rep-1]
if chan in self._nodes.keys():
node = self._nodes[chan]
print("\nTheory : Flow = A * Measure + B")
conva = node._setting["conva"]
convb = node._setting["convb"]
print(f"Current: Flow = {conva} * Measure + {convb}\n")
conva = float(click.prompt(f"A = ", default=node._setting["conva"]))
convb = float(click.prompt(f"B = ", default=node._setting["convb"]))
node._calibrate(conva, convb)
def enable(self):
end = False
while not end:
rep = self._print_selection("enable/disable")
if rep == "q":
end = True
else:
rep = int(rep)
nodeadd = self._controller.nodeadd
chan = self._controller.nodeadd[rep-1]
if chan in self._nodes.keys():
node = self._nodes[chan]
if node._connected:
if node._enabled.get():
node._enabled.set(False)
self._del_node_attr(node)
else:
node._enabled.set(True)
self._set_node_attr(node)
def close_all(self):
for channel, node in self._nodes.items():
if node._enabled.get():
node.setpoint = 0.0
def reset_all(self):
for channel, node in self._nodes.items():
node._rename("gas")
node._setting["conva"] = 1.0
node._setting["convb"] = 0.0
@property
def setpoint_all(self):
return numpy.nan
@setpoint_all.setter
def setpoint_all(self, value):
for channel, node in self._nodes.items():
if node._enabled.get():
node.setpoint=value
class BronkhorstNode:
def __init__(self, config, controller=None, index=None):
if controller is None:
self._controller = config.get("controller", None)
else:
self._controller = controller
if index is None:
self._index = config.get("index", None)
else:
self._index = index
self._channel = config.get("channel", None)
self._defined = self._tango_channel_exist(self._channel)
setting_name = f"bronkhorst_{self._controller._name}_ch{self._channel}"
setting_default = {
"name": "gas",
"conva": 1.0,
"convb": 0.0
}
self._setting = settings.HashSetting(setting_name, default_values=setting_default)
name = self._setting["name"]
self._fullname = f"a{self._index+1:02d}_{name}"
self._enabled = settings.SimpleSetting(
f"bronkhorst_node_{self._channel}_enabled",
default_value=True
)
self._connect()
if self._enabled.get():
self._controller._set_node_attr(self)
def __info__(self):
name = self._setting["name"]
mystr = f"Name : {name}\n"
mystr += f"Setpoint: {self.setpoint:.2f} %\n"
mystr += f"Measure : {self.measure:.2f} %\n"
mystr += f"Flow : {self.flow:.2f} ml/min\n"
return mystr
def _tango_channel_exist(self, channel):
nodeadd = self._controller._controller.nodeadd
if channel in self._controller._controller.nodeadd:
return True
else:
return False
def _get_meta_data(self):
meta_data = {}
meta_data["setpoint"] = self.setpoint
meta_data["measure"] = self.measure
meta_data["flow"] = self.flow
return meta_data
def _rename(self, new_name):
old_name = self._fullname
self._setting["name"] = new_name
self._fullname = f"a{self._index+1:02d}_{new_name}"
self._controller._rename(self, old_name, self._fullname)
def _calibrate(self, conva, convb):
self._setting["conva"] = float(conva)
self._setting["convb"] = float(convb)
def _connect(self):
if self.setpoint == -1 and self.measure == -1:
self._connected = False
self._enabled.set(False)
else:
self._connected = True
@property
def measure(self):
return float(self._controller._controller.readcurrent(self._index))
@property
def _fluid_temperature(self):
return float(self._controller._controller.readfluidtemperature(self._index))
@property
def _orifice(self):
return float(self._controller._controller.readorifice(self._index))
@property
def _pressure_inlet(self):
return float(self._controller._controller.readpressureinlet(self._index))
@property
def _pressure_outlet(self):
return float(self._controller._controller.readpressureoutlet(self._index))
@property
def setpoint(self):
return float(self._controller._controller.readsetpoint(self._index))
@setpoint.setter
def setpoint(self, setpoint):
self._controller._controller.writesetpoint([self._index, setpoint])
@property
def flow(self):
return self._setting["conva"] * self.measure + self._setting["convb"]
@flow.setter
def flow(self, flow):
if self._setting["conva"] != 0:
setpoint = (flow - self._setting["convb"]) / self._setting["conva"]
if setpoint < 0:
setpoint = 0
self.setpoint = setpoint
@property
def _output(self):
return float(self._controller._controller.readvalveoutput(self._index))
from bliss.controllers.bronkhorst.bronkhorst import Bronkhorst, BronkhorstNode
class BronkhorstWago(Bronkhorst):
def __init__(self, name, config):
Bronkhorst.__init__(self, name, config)
def __info__(self):
line1 = [""]
line2 = ["Setpoint (%)"]
......@@ -318,20 +20,16 @@ class BronkhorstWago(Bronkhorst):
line4.append(f"{node.flow:.2f}")
if node.valve is None:
line5.append("None")
else:
else:
line5.append(node.valve._state())
mystr = tabulate.tabulate([line1, line2, line3, line4, line5], tablefmt="plain")
mystr += "\n"
return mystr
def _get_bronkhorst_node(self,
node_config,
controller,
index
):
def _get_bronkhorst_node(self, node_config, controller, index):
node = BronkhorstWagoNode(node_config, controller, index)
return node
def _get_meta_data(self):
meta_data = {}
for chan, node in self._nodes.items():
......@@ -339,60 +37,60 @@ class BronkhorstWago(Bronkhorst):
if node.valve is None or node.valve._state() == "OPEN":
meta_data[node._fullname] = node._get_meta_data()
return meta_data
def close_all(self):
Bronkhorst.close_all(self)
for channel, node in self._nodes.items():
if node.valve is not None:
node.valve.close()
def valves_open_all(self):
for channel, node in self._nodes.items():
if node.valve is not None and node._enabled:
node.valve.open()
def valves_close_all(self):
for channel, node in self._nodes.items():
if node.valve is not None and node._enabled:
node.valve.close()
class BronkhorstWagoNode(BronkhorstNode):
def __init__(self, config, controller, index):
BronkhorstNode.__init__(self, config, controller=controller, index=index)
self._wago = config.get("wago", None)
self._wago_ch = config.get("wago_channel", None)
self.valve = None
if self._wago is not None:
self.valve = BronkhorstWagoValve(self._wago, self._wago_ch)
def __info__(self):
mystr = BronkhorstNode.__info__(self)
mystr = BronkhorstNode.__info__(self)
if self.valve is not None:
mystr += f"Valve : {self.valve.__info__()}\n"
return mystr
class BronkhorstWagoValve:
def __init__(self, wago, channel):
self._wago = wago
self._channel = channel
def __info__(self):
return self._state()
def _state(self):
state = self._wago.get(self._channel)
if state == 1:
return "OPEN"
else:
return "CLOSED"
def open(self):
self._wago.set(self._channel, 1)
def close(self):
self._wago.set(self._channel, 0)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment