pi_hexa.py 6.98 KB
Newer Older
1 2 3 4 5 6 7 8
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2018 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.

import enum
Geoffrey Mant's avatar
Geoffrey Mant committed
9 10
import time
import gevent
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
from bliss.comm.util import get_comm, get_comm_type, TCP, SERIAL
from bliss.controllers.motor import Controller
from bliss.common.axis import AxisState
from .pi_gcs import get_error_str

"""
Bliss controller for controlling the Physik Instrumente hexapod
controllers 850 and 887.

The Physik Instrument Hexapod M850 is a hexapod controller with
a serial line interface.
The Physik Instrument Hexapod C887 is a hexapod controller with
a serial line and socket interfaces. Both of them can be used.

config example:
- class PI_HEXA
  model: 850 # 850 or 887 (optional)
  serial:
    url: ser2net://lid133:28000/dev/ttyR37
  axes:
    - name: hexa_x
      channel: X
    - name: hexa_y
      channel: Y
    - name: hexa_z
      channel: Z
    - name: hexa_u
      channel: U
    - name: hexa_v
      channel: V
    - name: hexa_w
      channel: W
"""

45

46
def _atomic_communication(fn):
47
    def f(self, *args, **kwargs):
48
        with self._cnx.lock:
49 50
            return fn(self, *args, **kwargs)

51
    return f
52

53

54
class PI_HEXA(Controller):
bliss administrator's avatar
bliss administrator committed
55 56 57
    COMMAND = enum.Enum(
        "PI_HEXA.COMMAND", "POSITIONS MOVE_STATE MOVE_SEP INIT STOP_ERROR"
    )
58

59 60 61 62 63 64 65 66 67 68 69
    def __init__(self, *args, **kwargs):
        Controller.__init__(self, *args, **kwargs)

        self._cnx = None
        self.controler_model = None
        self._commands = dict()

    def initialize(self):
        """
        Initialize the communication to the hexapod controller
        """
70 71 72 73
        # velocity and acceleration are not mandatory in config
        self.axis_settings.config_setting["velocity"] = False
        self.axis_settings.config_setting["acceleration"] = False

74
        comm_type = get_comm_type(self.config.config_dict)
75
        comm_option = {"timeout": 30.}
76
        if comm_type == TCP:
77 78 79
            comm_option["ctype"] = TCP
            comm_option.setdefault("port", 50000)
            controler_model = self.config.get("model", int, 887)
80
        elif comm_type == SERIAL:
81 82 83
            comm_option.setdefault("baudrate", 57600)
            comm_option["ctype"] = SERIAL
            controler_model = self.config.get("model", int, 850)
84
        else:
85 86 87
            raise ValueError(
                "PI_HEXA: communication of type (%s) " "not yet managed" % comm_type
            )
88 89 90

        model_list = [850, 887]
        if controler_model not in model_list:
91 92 93 94
            raise ValueError(
                "PI_HEXA: model %r not managed,"
                "only managed model %r" % (controler_model, model_list)
            )
95 96 97 98
        self.controler_model = controler_model

        self._cnx = get_comm(self.config.config_dict, **comm_option)

99 100 101 102 103 104 105
        commands = {
            850: {
                self.COMMAND.POSITIONS: "POS?",
                #                           self.COMMAND.MOVE_STATE : ("MOV?", lambda x: 0 if x == '1' else 1),
                self.COMMAND.MOVE_STATE: ("\5", lambda x: int(x)),
                self.COMMAND.MOVE_SEP: "",
                self.COMMAND.INIT: "INI X",
106
                self.COMMAND.STOP_ERROR: 2,
107 108 109
            },
            887: {
                self.COMMAND.POSITIONS: "\3",
Geoffrey Mant's avatar
Geoffrey Mant committed
110
                self.COMMAND.MOVE_STATE: ("\5", lambda x: int(x, 16)),
111 112
                self.COMMAND.MOVE_SEP: " ",
                self.COMMAND.INIT: "FRF X",
113
                self.COMMAND.STOP_ERROR: 10,
114 115
            },
        }
116 117 118 119 120 121 122 123 124 125 126 127 128

        self._commands = commands[controler_model]

    def finalize(self):
        if self._cnx is not None:
            self._cnx.close()

    def initialize_axis(self, axis):
        axis.channel = axis.config.get("channel", str)

    def read_position(self, axis):
        return self._read_all_positions()[axis.channel]

129
    @_atomic_communication
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
    def state(self, axis):
        cmd, test_func = self._commands[self.COMMAND.MOVE_STATE]
        moving_flag = test_func(self.command(cmd, 1))
        if moving_flag:
            self._check_error_and_raise()
            return AxisState("MOVING")
        return AxisState("READY")

    def home_state(self, axis):
        # home_search is blocking until the end,
        # so this is called when homing is done;
        # at the end of axis homing, all axes
        # have changed position => do a sync hard
        try:
            return self.state(axis)
        finally:
146
            for axis in self.axes.values():
147 148 149 150 151
                axis.sync_hard()

    def start_one(self, motion):
        self.start_all(motion)

152
    @_atomic_communication
153 154
    def start_all(self, *motions):
        sep = self._commands[self.COMMAND.MOVE_SEP]
155 156 157 158 159 160
        cmd = "MOV " + " ".join(
            [
                "%s%s%g" % (motion.axis.channel, sep, motion.target_pos)
                for motion in motions
            ]
        )
161 162 163 164 165 166
        self.command(cmd)
        self._check_error_and_raise()

    def stop(self, axis):
        self.stop_all()

167
    @_atomic_communication
168 169
    def stop_all(self, *motions):
        self.command("STP")
Geoffrey Mant's avatar
Geoffrey Mant committed
170
        self._check_error_and_raise(ignore_stop=True)
171

172
    def command(self, cmd, nb_line=None, **kwargs):
173 174 175 176
        """
        Send raw command to the controller
        """
        cmd = cmd.strip()
177
        need_reply = cmd.find("?") > -1 if nb_line is None else nb_line
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
178 179
        cmd += "\n"
        cmd = cmd.encode()
180
        if need_reply:
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
181 182 183 184 185
            if nb_line is not None and nb_line > 1:
                return [
                    r.decode()
                    for r in self._cnx.write_readlines(cmd, nb_line, **kwargs)
                ]
186
            else:
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
187 188 189
                return self._cnx.write_readline(cmd, **kwargs).decode()
        else:
            return self._cnx.write(cmd)
190

191
    @_atomic_communication
192 193 194 195 196 197 198
    def home_search(self, axis, switch):
        init_cmd = self._commands[self.COMMAND.INIT]
        self.command(init_cmd)
        self._check_error_and_raise(timeout=30.)

    def _read_all_positions(self):
        cmd = self._commands[self.COMMAND.POSITIONS]
199 200 201 202
        try:
            answer = self.command(cmd, nb_line=6)
        except:
            import traceback
203

204 205
            traceback.print_exc()
            raise
206 207
        positions = dict()
        try:
208
            for channel_name, ans in zip(["%s=" % x for x in "XYZUVW"], answer):
209 210 211 212 213 214 215 216 217
                if not ans.startswith(channel_name):
                    raise RuntimeError("PI_HEXA: error parsing position answer")
                positions[channel_name[0]] = float(ans[2:])
        except:
            self._cnx.flush()
            raise
        else:
            return positions

Geoffrey Mant's avatar
Geoffrey Mant committed
218
    def _check_error_and_raise(self, ignore_stop=False, **kwargs):
219
        err = int(self.command("ERR?", **kwargs))
220
        if err > 0:
bliss administrator's avatar
bliss administrator committed
221 222 223
            if (
                ignore_stop and err == self._commands[self.COMMAND.STOP_ERROR]
            ):  # stopped by user
Geoffrey Mant's avatar
Geoffrey Mant committed
224
                return
225 226 227
            human_error = get_error_str(err)
            errors = [self.name, err, human_error]
            raise RuntimeError("Device {0} error nb {1} => ({2})".format(*errors))