linked.py 6.55 KB
Newer Older
1 2 3 4 5 6 7
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2017 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.

8 9 10 11 12 13 14
from bliss.common.axis import (
    Axis,
    NoSettingsAxis,
    DEFAULT_POLLING_TIME,
    lazy_init,
    Motion,
)
15
from . import _ackcommand, _command
16
import gevent
17

18 19 20 21

class LinkedAxis(Axis):
    def __init__(self, name, controller, config):
        Axis.__init__(self, name, controller, config)
22 23
        if config.get("address") is None:
            self.config.set("address", name)
24 25 26 27 28 29
        self.__config = config.deep_copy()
        self.real_axes = list()

    def _init_hardware(self):
        linked_axis = self.controller.get_linked_axis()
        if linked_axis.get(self.address) is None:
30 31 32 33 34
            raise RuntimeError(
                "Linked axis named %s doesn't exist"
                "linked axis configure in the system are : %s"
                % (self.address, linked_axis.keys())
            )
35 36

    def _init_software(self):
37
        # check if real motors are also defined in the config
38 39 40
        linked_axis = self.controller.get_linked_axis()
        mot_addresses = linked_axis.get(self.address)

41 42 43 44 45 46 47
        for name, axis in self.controller.axes.iteritems():
            if axis.config.get("address", lambda x: x) in mot_addresses:
                raise RuntimeError(
                    "Cannot initialize linked axis '%s',"
                    " real axis '%s' found in controller configuration"
                    % (self.name, axis.name)
                )
48 49

        for address in mot_addresses:
50 51 52 53 54 55 56 57
            mot_name = _command(self.controller._cnx, "%d:?NAME" % address)
            config_dict = {
                "autopower": False,
                "steps_per_unit": self.steps_per_unit,
                "acceleration": self.acceleration,
                "velocity": self.velocity,
            }
            real_motor = NoSettingsAxis(mot_name, self.controller, config_dict)
58 59 60
            real_motor.address = address
            real_motor.no_offset = True
            self.controller._Controller__initialized_axis[real_motor] = True
61
            setattr(self, mot_name, real_motor)
62 63 64 65 66 67 68 69
            self.real_axes.append(real_motor)

    @property
    @lazy_init
    def real_motor_names(self):
        return [x.name for x in self.real_axes]

    @lazy_init
70
    def sync(self, position):
71 72 73 74 75 76 77 78 79 80 81 82 83
        """
        Synchronizes all real linked axes members of the given virtual axis
        to the give position. No motion will take place.
        The position is given in user units of the virtual axis.
        """
        dial_position = self.user2dial(position)
        for slave_axis in self.real_axes:
            slave_axis.position(dial_position)

        self.acceleration(self.acceleration())
        self.velocity(self.velocity())
        # Reset control encoder
        # TODO: if any?
84
        _ackcommand(self.controller._cnx, "CTRLRST %s" % self.address)
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
        # switch power on (should re-enable the closed loop)
        self.on()
        return self.position(position)

    @lazy_init
    def get_info(self):
        """
        Return information about the given virtual axis configured 
        as a motor in the current SPEC session.
        The list of real axes linked together should not be configured 
        but can be accessed through dedicated commands (linkedmv, linkedhome, etc).
        """

        cnx = self.controller._cnx
        linked_axis = self.controller.get_linked_axis()

        r = "Virtual axis          : %s\n" % self.name
102 103
        r += "POWER                 : %s\n" % _command(cnx, "?POWER %s" % self.address)
        real_axes = linked_axis.get(self.address, list())
104
        r += "Real linked axes      : %r\n" % real_axes
105
        position = int(_command(cnx, "?POS %s" % self.address))
106 107 108 109
        r += "Indexer steps         : %s\n" % position
        r += "Indexer in user unit  : %s\n" % (position / self.steps_per_unit)

        for add in real_axes:
110
            pre_cmd = "%s:" % add
111
            r += "\n"
112 113
            r += "Real axis             : %s\n" % _command(cnx, pre_cmd + "?NAME")
            has = "NO" if _command(cnx, pre_cmd + "?CFG HOMESRC") == "NONE" else "YES"
114
            r += "Home switch           : %s\n" % has
115 116 117 118 119
            r += (
                "Homing                : %s\n"
                % _command(cnx, pre_cmd + "?HOMESTAT").split()[0]
            )
            has = "NO" if _command(cnx, pre_cmd + "?CFG CTRLENC") == "NONE" else "YES"
120
            r += "Control encoder       : %s\n" % has
121
            pos = int(_command(cnx, pre_cmd + "?POS"))
122
            r += "Indexer steps         : %s\n" % pos
123
            measure = int(_command(cnx, pre_cmd + "?POS MEASURE"))
124 125
            r += "Encoder steps         : %s\n" % measure
            try:
126
                homepos = _command(cnx, pre_cmd + "?HOMEPOS")
127 128 129 130 131
            except RuntimeError:
                homepos = "unavailable"
            r += "Homepos steps         : %s\n" % homepos
            r += "Indexer user unit     : %s\n" % (pos / self.steps_per_unit)
            r += "Encoder user unit     : %s\n" % (measure / self.steps_per_unit)
132
            r += "Closed loop           : %s\n" % _command(cnx, pre_cmd + "?PCLOOP")
133 134 135
        return r

    @lazy_init
136
    def home(self, switch=1, wait=True, polling_time=DEFAULT_POLLING_TIME):
137
        with self._lock:
138
            if self.is_moving:
139
                raise RuntimeError("axis %s state is %r" % (self.name, "MOVING"))
140 141

            # create motion object for hooks
142
            motion = Motion(self, switch, None, "homing")
143
            self._Axis__execute_pre_move_hook(motion)
144

145 146
            def start_one(controller, motions):
                cnx = controller._cnx
147 148 149 150 151 152 153 154 155
                home_dir = "+1" if motions[0].target_pos > 0 else "-1"
                cmd = "#HOME STRICT %s" % " ".join(
                    (str(rm.address) + " " + home_dir for rm in self.real_axes)
                )
                _command(
                    cnx,
                    cmd,
                    pre_cmd="#DISPROT ALL %s ; "
                    % " ".join((str(rm.address) for rm in self.real_axes)),
156
                )
157 158
                # IcePAP status is not immediately MOVING after home search command is sent
                gevent.sleep(0.2)
159

160 161
            def stop_one(controller, motions):
                controller.stop(motions[0].axis)
162

163 164 165 166 167 168 169 170
            self._group_move.move(
                {self.controller: [motion]},
                start_one,
                stop_one,
                "_wait_home",
                wait=False,
                polling_time=polling_time,
            )
171

172 173
        if wait:
            self.wait_move()