Commit e161a903 authored by Lucas Felix's avatar Lucas Felix
Browse files

Fix old PI_E712 emulator

parent a9a13f2e
......@@ -5,8 +5,8 @@
# Copyright (c) 2015-2022 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""Emulator :mod:`~bliss.controllers.emulator.Server` and \
:mod:`~bliss.controllers.emulator.BaseDevice`
"""Emulator :mod:`~tests.emulators.emulator.Server` and \
:mod:`~tests.emulators.emulator.BaseDevice`
Quick start
-----------
......@@ -24,7 +24,7 @@ To create a server use the following configuration as a starting point:
To start the server you can do something like::
$ python -m bliss.controllers.emulator my_emulator
$ python -m tests.emulators.emulator my_emulator
(bliss also provides a ``bliss-emulator`` script which basically does the same)
......@@ -97,7 +97,8 @@ class EmulatorServerMixin(object):
addr tuple): address (tuple of host, port)
"""
if self.newline == "\n" and not self.special_messages:
for line in file_obj:
for buf in file_obj:
line = buf.decode()
self.handle_line(sock, line)
else:
# warning: in this mode read will block even if client
......@@ -105,7 +106,7 @@ class EmulatorServerMixin(object):
buff = ""
finish = False
while not finish:
readout = file_obj.read(1)
readout = file_obj.read(1).decode()
if not readout:
return
buff += readout
......@@ -129,17 +130,13 @@ class EmulatorServerMixin(object):
sock (gevent.socket.socket): new socket resulting from an accept
addr (tuple): address (tuple of host, port)
line (str): line to be processed
Returns:
str: response to give to client or None if no response
"""
self.pause(len(line))
response = self.device.handle_line(line)
if response is not None:
self.pause(len(response))
sock.sendall(response)
return response
buf = response.encode()
self.pause(len(buf))
sock.sendall(buf)
def pause(self, nb_bytes):
"""
......@@ -416,7 +413,7 @@ def create_device(device_info):
name = device_info.pop("name", class_name)
if package_name is None:
package_name = "bliss.controllers.emulators." + module_name
package_name = "tests.emulators." + module_name
__import__(package_name)
package = sys.modules[package_name]
......
......@@ -24,7 +24,7 @@ a starting point:
To start the server you can do something like::
$ python -m bliss.controllers.emulator my_emulator
$ python -m tests.emulators.emulator my_emulator
A simple *nc* client can be used to connect to the instrument::
......@@ -37,6 +37,7 @@ A simple *nc* client can be used to connect to the instrument::
import weakref
from .scpi import SCPI
from bliss.comm.scpi import Commands, FuncCmd, IntCmdRO, IDNCmd, ErrCmd
class PI(SCPI):
......@@ -52,6 +53,7 @@ class PIAxis(object):
self.__pos = 0.0
self.__set_mov = 0.0
self.__set_sva = 0.0
self.__vel = 100.0
self.__vol = 0.0
self.__svo = 0
self.__ont = 1
......@@ -59,11 +61,11 @@ class PIAxis(object):
@property
def pos(self):
return " {0}".format(self.__pos)
return self.__pos
@property
def mov(self):
return " {0}".format(self.__set_mov)
return self.__set_mov
@mov.setter
def mov(self, new_pos):
......@@ -72,20 +74,28 @@ class PIAxis(object):
@property
def sva(self):
return " {0}".format(self.__set_sva)
return self.__set_sva
@sva.setter
def sva(self, new_pos):
self.__pos = new_pos
self.__set_sva = new_pos
@property
def vel(self):
return self.__vel
@vel.setter
def vel(self, new_vel):
self.__vel = new_vel
@property
def vol(self):
return "VOL={0}".format(self.__vol)
return self.__vol
@property
def svo(self):
return " {0}".format(self.__svo)
return self.__svo
@svo.setter
def svo(self, yesno):
......@@ -93,7 +103,7 @@ class PIAxis(object):
@property
def ont(self):
return " {0}".format(self.__ont)
return self.__ont
def cto(self, *args):
self.__cto = args
......@@ -105,8 +115,26 @@ class PI_E712(PI):
with capacitive sensors
"""
# special messages are sent without '\r'
special_messages = set([chr(9)]) # Get Wave Generator Status
def __init__(self, name, axes=None, **opts):
model = opts.pop("model", "E-712.3CD")
opts["commands"] = Commands(
{
"ERR": ErrCmd(),
"POS": FuncCmd(),
"VEL": FuncCmd(),
"MOV": FuncCmd(),
"SVA": FuncCmd(),
"SVO": FuncCmd(),
"ONT": FuncCmd(),
"CTO": FuncCmd(),
"CCL": IntCmdRO(value=0),
"*IDN": IDNCmd(),
"IFC": FuncCmd(),
}
)
super(PI_E712, self).__init__(name, **opts)
self._model = model
axes_dict = {}
......@@ -119,29 +147,61 @@ class PI_E712(PI):
for k, v in list(opts.items()):
setattr(self, "_" + k, v)
def wave_generator_status(self):
return 0
def handle_special_msg(self, line):
if line == chr(9):
return str(self.wave_generator_status())
else:
raise ValueError(f"Unknown special message '{line}'")
def err(self):
return "0"
def pos(self, channel):
return self._axes[int(channel)].pos
return f"{channel}={self._axes[int(channel)].pos}"
def vel(self, is_query, channel, new_vel=None):
axis = self._axes[int(channel)]
if is_query:
return f"{channel}={axis.vel}"
axis.vel = new_vel
def mov(self, is_query, channel, new_pos=None):
axis = self._axes[int(channel)]
if is_query:
return axis.mov
return f"{channel}={axis.mov}"
axis.mov = new_pos
def sva(self, is_query, channel, new_pos=None):
axis = self._axes[int(channel)]
if is_query:
return axis.sva
return f"{channel}={axis.sva}"
axis.sva = new_pos
def svo(self, is_query, channel, yesno=None):
axis = self._axes[int(channel)]
if is_query:
return axis.svo
return f"{channel}={axis.svo}"
axis.svo = yesno
def ont(self, channel):
return self._axes[int(channel)].ont
return f"{channel}={self._axes[int(channel)].ont}"
def cto(self, channel, trig_mode, min_threshold, max_threshold, polarity):
self._axes[int(channel)].cto(trig_mode, min_threshold, max_threshold, polarity)
def ifc(self, is_query, *args):
if is_query:
result = []
for arg in args:
if arg == "IPADR":
result.append("IPADR=123.45.67.89")
elif arg == "MACADR":
result.append("MACADR=aa:bb:cc:dd:ee:ff")
elif arg == "IPSTART":
result.append("IPSTART=0") # 0->STATIC 1->DHCP
return "\n".join(result)
else:
raise NotImplementedError
......@@ -22,7 +22,7 @@ a starting point:
To start the server you can do something like::
$ python -m bliss.controllers.emulator my_emulator
$ python -m bliss.emulators.emulator my_emulator
A simple *nc* client can be used to connect to the instrument::
......@@ -85,6 +85,9 @@ class SCPI(BaseDevice):
cmd_info["value"] = cmd_info["default"]
def handle_line(self, line):
if line in self.special_messages:
return self.handle_special_msg(line) + "\n"
self._log.info("processing line %r", line)
line = line.strip()
responses = []
......
- class: PI_E712
tcp:
url: localhost:50712
axes:
- name: pi712
channel: 1
velocity: 100
acceleration: 1.
steps_per_unit: 1
closed_loop:
state: on
- name: my_emulator
devices:
- class: PI_E712
module: pi
model: 6CD # optional (default is 3CD)
transports:
- type: tcp
url: :50712
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment