Commit 2856e3d3 authored by Lucas Felix's avatar Lucas Felix
Browse files

Add generic ClosedLoop to axis

parent ba1aef8d
......@@ -24,6 +24,7 @@ from bliss.common.logtools import log_debug, user_print, log_warning
from bliss.common.utils import rounder
from bliss.common.utils import autocomplete_property
from bliss.comm.exceptions import CommunicationError
from bliss.common.closed_loop import ClosedLoop
import enum
import gevent
......@@ -677,6 +678,10 @@ class Axis(Scannable, HasMetadataForDataset):
self._lock = gevent.lock.Semaphore()
self.__positioner = True
self._disabled = False
if config.get("closed_loop"):
self._closed_loop = ClosedLoop(self)
else:
self._closed_loop = None
try:
config.parent
......@@ -834,6 +839,14 @@ class Axis(Scannable, HasMetadataForDataset):
def backlash(self, backlash):
self.settings.set("backlash", backlash)
@property
@lazy_init
def closed_loop(self):
"""
Closed loop object associated to axis.
"""
return self._closed_loop
@property
def tolerance(self):
"""Current Axis tolerance in dial units (:obj:`float`)"""
......@@ -1268,7 +1281,16 @@ class Axis(Scannable, HasMetadataForDataset):
except Exception:
info_string += "ERROR: Unable to get encoder info\n"
else:
info_string += "ENCODER:\n None\n"
info_string += "ENCODER:\n None\n\n"
# CLOSED_LOOP
if self.closed_loop is not None:
try:
info_string += info(self.closed_loop)
except Exception:
info_string += "ERROR: Unable to get closed loop info\n"
else:
info_string += "CLOSED LOOP:\n None\n"
return info_string
......@@ -2262,6 +2284,9 @@ class Axis(Scannable, HasMetadataForDataset):
if reload:
self.config.reload()
if self._closed_loop is not None:
self._closed_loop.apply_config(reload)
if self.encoder is not None:
self.encoder.apply_config(reload)
......
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2015-2022 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
import enum
from functools import partial
from bliss.config.beacon_object import BeaconObject, EnumProperty
class ClosedLoopState(enum.Enum):
UNKNOWN = enum.auto()
ON = enum.auto()
OFF = enum.auto()
def fget_gen(key):
def f(key, self):
return self.axis.controller.get_closed_loop_param(self.axis, key)
fget = partial(f, key)
fget.__name__ = key
return fget
def fset_gen(key):
def f(key, self, value):
if self._setters_on:
return self.axis.controller.set_closed_loop_param(self.axis, key, value)
fset = partial(f, key)
fset.__name__ = key
return fset
class ClosedLoop(BeaconObject):
"""
config example:
- name: m1
steps_per_unit: 1000
velocity: 50
acceleration: 1
encoder: $m1enc
closed_loop:
state: on
kp: 1
ki: 2
kd: 3
settling_window: 0.1
settling_time: 3
"""
def __new__(cls, axis):
"""Make a class copy per instance to allow closed loop objects to own different properties"""
cls = type(cls.__name__, (cls,), {})
return object.__new__(cls)
def __init__(self, axis):
self._axis = axis
name = f"{axis.name}:closed_loop"
config = axis.config.config_dict
if isinstance(config, dict):
super().__init__(config.get("closed_loop"), name=name)
else:
super().__init__(config, name=name, path=["closed_loop"])
setattr(
self.__class__,
"_state",
EnumProperty("state", ClosedLoopState, must_be_in_config=True),
)
self._setters_on = False
self._init_properties()
def _init_properties(self):
"""Instantiate properties depending on the controller requirements"""
reqs = self.axis.controller.get_closed_loop_requirements()
for key in reqs:
if hasattr(self, key):
raise Exception(
f"Cannot create closed loop property '{key}', name already exists"
)
setattr(
self.__class__,
key,
BeaconObject.property(
fget=fget_gen(key), fset=fset_gen(key), must_be_in_config=True
),
)
def __info__(self):
info_str = "CLOSED LOOP:\n"
info_str += f" state: {self.state.name}\n"
for key in self.axis.controller.get_closed_loop_requirements():
info_str += f" {key}: {getattr(self, key)}\n"
return info_str
@property
def axis(self):
return self._axis
@property
def state(self):
return self._state
def _activate(self, onoff):
new_state = self.axis.controller.activate_closed_loop(self.axis, onoff)
if not isinstance(new_state, ClosedLoopState):
raise ValueError(
f"Controller expected to return ClosedLoopState, got {new_state}"
)
else:
self._state = new_state
def on(self):
self._activate(True)
def off(self):
self._activate(False)
def _activate_setters(self):
self._setters_on = True
......@@ -136,7 +136,15 @@ class EnumProperty(_Property):
doc: Documentation
"""
def __init__(self, name, enum_type, unknown_value=None, default=None, doc=None):
def __init__(
self,
name,
enum_type,
unknown_value=None,
default=None,
doc=None,
must_be_in_config=False,
):
def fget(self):
return self.settings.get(name, default)
......@@ -162,7 +170,12 @@ class EnumProperty(_Property):
return result
_Property.__init__(
self, fget=fget, fset=fset, doc=doc, set_unmarshalling=set_unmarshalling
self,
fget=fget,
fset=fset,
doc=doc,
set_unmarshalling=set_unmarshalling,
must_be_in_config=must_be_in_config,
)
......
......@@ -21,6 +21,7 @@ import bliss.common.motor_group as motor_group
from bliss.common.motor_config import MotorConfig
from bliss.common.motor_settings import ControllerAxisSettings, floatOrNone
from bliss.common.axis import Trajectory
from bliss.common.closed_loop import ClosedLoopState
from bliss.common import event
from bliss.controllers.counter import SamplingCounterController
from bliss.physics import trajectory
......@@ -353,6 +354,12 @@ class Controller(BlissController):
self.initialize_hardware_axis(axis)
axis.settings.check_config_settings()
axis.settings.init() # get settings, from config or from cache, and apply to hardware
if axis.closed_loop:
if axis.closed_loop.state != self.get_closed_loop_state(axis):
self.activate_closed_loop(
axis, axis.closed_loop.state == ClosedLoopState.ON
)
axis.closed_loop._activate_setters()
axis_initialized.value = 1
except BaseException:
......@@ -383,6 +390,22 @@ class Controller(BlissController):
def finalize_axis(self, axis):
raise NotImplementedError
def get_closed_loop_requirements(self):
"""Return the list of keys this controller expects in a closed loop config"""
raise NotImplementedError
def get_closed_loop_state(self, axis):
raise NotImplementedError
def activate_closed_loop(self, axis, onoff=True):
raise NotImplementedError
def set_closed_loop_param(self, axis, param, value):
raise NotImplementedError
def get_closed_loop_param(self, axis, param):
raise NotImplementedError
def get_class_name(self):
return self.__class__.__name__
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment