Restructure python code

- restructure ct2 python package
- Add CT2Device concept
- Add tango as a sub-package of ct2
- Update examples according to the new structure
- Update tests according to the new structure
- Add setup.py, manifest, licence and readme
- Update code header
parent fce70000
## CT2 (P201/C208) ESRF counter card
CT2 (P201/C208) ESRF counter card TANGO device
CT2 (P201/C208) ESRF counter card library and TANGO device
## Requirement
- PyTango >= 8.1.6
- devicetest (for using tests)
- python >= 2.6
- sphinx (for building sphinx documentation)
- PyTango >= 8.1.6 (for running the TANGO device server)
## Installation
......@@ -17,10 +17,3 @@ run python setup.py build_sphinx
IUf you want to pass the tests,
run python setup.py test
## Usage
Now you can start your device server in any
Terminal or console by calling it :
CT2 instance_name
# -*- coding: utf-8 -*-
#
# This file is part of the CT2 project
#
# Copyright (c) : 2015
# Beamline Control Unit, European Synchrotron Radiation Facility
# BP 220, Grenoble 38043
# FRANCE
#
# Distributed under the terms of the GNU Lesser General Public License,
# either version 3 of the License, or (at your option) any later version.
# See LICENSE.txt for more info.
"""
The python module for the ct2 (P201/C208) ESRF PCI counter card
"""
from .ct2 import *
from .device import *
from . import release
__version__ = release.version
__version_info__ = release.version_info
__author__ = release.author
__doc__ = release.description
__copyright__ = release.copyright
del release
# -*- coding: utf-8 -*-
#
# This file is part of the CT2 project
#
# Copyright (c) : 2015
# Beamline Control Unit, European Synchrotron Radiation Facility
# BP 220, Grenoble 38043
# FRANCE
#
# Distributed under the terms of the GNU Lesser General Public License,
# either version 3 of the License, or (at your option) any later version.
# See LICENSE.txt for more info.
"""
The python module for the ct2 (P201/C208) ESRF PCI counter card
......@@ -22,8 +33,8 @@ import functools
try:
import enum
except:
import enum34 as enum
except ImportError:
from . import enum34 as enum
# low level pread and pwrite calls for the p201/c208 driver.
......@@ -3248,7 +3259,7 @@ class BaseCard:
self.set_counters_software_enable(ct)
class P201(BaseCard):
class P201Card(BaseCard):
"""
P201 card class
"""
......@@ -3269,16 +3280,16 @@ class P201(BaseCard):
FIFO_SIZE = 2048 * CT2_REG_SIZE
def C208():
def C208Card():
raise NotImplementedError
def ct2(card_type, name):
def CT2Card(card_type, name):
if "201" in card_type:
klass = P201
klass = P201Card
name = name and name or "/dev/p201"
else:
klass = C208
klass = C208Card
name = name and name or "/dev/c208"
return klass(name)
......@@ -3370,7 +3381,7 @@ def create_card_from_configure(config):
:return: a new instance of :class:`P201`
:rtype: :class:`P201`
"""
return ct2(config.get("class", "P201"), config.get("address"))
return CT2Card(config.get("class", "P201"), config.get("address"))
def configure_card(card, config):
......@@ -3497,7 +3508,7 @@ def main():
sys.stdout.write(msg)
sys.stdout.flush()
p201 = P201()
p201 = P201Card()
p201.request_exclusive_access()
p201.reset()
p201.software_reset()
......
# -*- coding: utf-8 -*-
#
# This file is part of the CT2 project
#
# Copyright (c) : 2015
# Beamline Control Unit, European Synchrotron Radiation Facility
# BP 220, Grenoble 38043
# FRANCE
#
# Distributed under the terms of the GNU Lesser General Public License,
# either version 3 of the License, or (at your option) any later version.
# See LICENSE.txt for more info.
import numpy
from gevent import select
from louie import dispatcher
try:
import enum
except:
from . import enum34 as enum
from . import ct2
ErrorSignal = "error"
StopSignal = "stop"
PointNbSignal = "point_nb"
class AcqMode(enum.Enum):
Internal = 0
class BaseCT2Device(object):
internal_timer_counter = 11
internal_point_nb_counter = 12
def __init__(self, config, name):
self.__config = config
self.__name = name
# helper methods to fire events
def _send_error(self, error):
dispatcher.send(ErrorSignal, self, error)
def _send_point_nb(self, point_nb):
dispatcher.send(PointNbSignal, self, point_nb)
def _send_stop(self):
dispatcher.send(StopSignal, self)
@property
def config(self):
return self.__config
@property
def card_config(self):
return self.config.get_config(self.__name)
@property
def name(self):
return self.__name
@property
def _device(self):
raise NotImplementedError
@property
def use_mmap(self):
return self._device.use_mmap
@use_mmap.setter
def use_mmap(self, use_mmap):
self._device.use_mmap = use_mmap
@property
def acq_mode(self):
return self._device.acq_mode
@acq_mode.setter
def acq_mode(self, acq_mode):
self._device.acq_mode = acq_mode
@property
def acq_nb_points(self):
return self._device.acq_nb_points
@acq_nb_points.setter
def acq_nb_points(self, acq_nb_points):
self._device.acq_nb_points = acq_nb_points
@property
def acq_expo_time(self):
return self._device.acq_expo_time
@acq_expo_time.setter
def acq_expo_time(self, acq_expo_time):
self._device.acq_expo_time = acq_expo_time
@property
def acq_channels(self):
return self._device.acq_channels
@acq_channels.setter
def acq_channels(self, acq_channels):
self._device.acq_channels = acq_channels
def reset(self):
self._device.reset()
def prepare_acq(self):
self._device.prepare_acq()
def start_acq(self):
self._device.start_acq()
def apply_config():
self._device.apply_config()
def read_data(self):
self._device.read_data(use_mmap)
@property
def counters(self):
return self._device.counters
@property
def latches(self):
return self._device.latches
class CT2Device(BaseCT2Device):
"""
Helper for a locally installed CT2 card (P201/C208).
"""
def __init__(self, config, name):
BaseCT2Device.__init__(self, config, name)
self.__buffer = []
self.__card = self.config.get(self.name)
self.__acq_mode = AcqMode.Internal
self.__acq_expo_time = 1.0
self.__acq_nb_points = 1
self.__acq_channels = ()
def run_forever(self):
card = self.__card
while True:
read, write, error = select.select((card,), (), (card,))
if error:
self._send_error("ct2 select error on {0}".format(error))
if read:
(counters, channels, dma, fifo_half_full, err), tstamp = \
card.acknowledge_interrupt()
if err:
self._send_error("ct2 error")
if dma:
data, fifo_status = card.read_fifo()
self.__buffer.append(data)
point_nb = data[-1][-1]
self._send_point_nb(point_nb)
if self.__acq_mode == AcqMode.Internal:
if self.internal_point_nb_counter in counters:
self._send_stop()
@property
def _device(self):
return self
@property
def card(self):
return self.__card
def reset(self):
self.card.software_reset()
self.card.reset()
self.__buffer = []
def __configure_internal_mode(self):
card = self.__card
timer_ct = self.internal_timer_counter
point_nb_ct = self.internal_point_nb_counter
timer_inc_stop = getattr(ct2.CtClockSrc, 'INC_CT_{0}_STOP'.format(timer_ct))
timer_stop_source = getattr(ct2.CtHardStopSrc, 'CT_{0}_EQ_CMP_{0}'.format(timer_ct))
point_nb_stop_source = getattr(ct2.CtHardStopSrc, 'CT_{0}_EQ_CMP_{0}'.format(point_nb_ct))
point_nb_gate = getattr(ct2.CtGateSrc, 'CT_{0}_GATE_ENVELOP'.format(point_nb_ct))
point_nb_start_source = getattr(ct2.CtHardStartSrc, 'CT_{0}_START'.format(point_nb_ct))
# configure counter 11 as "timer"
ct_config = ct2.CtConfig(clock_source=ct2.CtClockSrc.CLK_100_MHz,
gate_source=point_nb_gate,
hard_start_source=point_nb_start_source,
hard_stop_source=timer_stop_source,
reset_from_hard_soft_stop=True,
stop_from_hard_stop=False)
card.set_counter_config(timer_ct, ct_config)
card.set_counter_comparator_value(timer_ct, int(self.acq_expo_time * 1E8))
# configure counter 12 as "nb. points"
ct_config = ct2.CtConfig(clock_source=timer_inc_stop,
gate_source=ct2.CtGateSrc.GATE_CMPT,
hard_start_source=ct2.CtHardStartSrc.SOFTWARE,
hard_stop_source=point_nb_stop_source,
reset_from_hard_soft_stop=True,
stop_from_hard_stop=True)
card.set_counter_config(point_nb_ct, ct_config)
card.set_counter_comparator_value(point_nb_ct, self.acq_nb_points)
# dma transfer and error will trigger DMA; also counter 12 stop
# should trigger an interrupt (this way we know that the
# acquisition has finished without having to query the
# counter 12 status)
card.set_interrupts(counters=(point_nb_ct,), dma=True, error=True)
# make master enabled by software
card.set_counters_software_enable((timer_ct, point_nb_ct))
# ... and now for the slave channels
channels = tuple(self.acq_channels)
all_channels = channels + (timer_ct, point_nb_ct)
# change the start and stop sources for the active channels
for ch_nb in channels:
ct_config = card.get_counter_config(ch_nb)
ct_config.hard_start_source = point_nb_start_source
ct_config.hard_stop_source = timer_stop_source
card.set_counter_config(ch_nb, ct_config)
# counter 11 will latch all active counters/channels
latch_sources = dict([(ct, timer_ct) for ct in all_channels])
card.set_counters_latch_sources(latch_sources)
# counter 11 counter-to-latch signal will trigger DMA; at each DMA
# trigger, all active counters (including counters 11 (timer)
# and 12 (point_nb)) are stored to FIFO
card.set_DMA_enable_trigger_latch((timer_ct,), all_channels)
card.set_counters_software_enable(channels)
def apply_config(self):
configure_card(self.card, self.card_config)
def prepare_acq(self):
if self.acq_mode == AcqMode.Internal:
self.__configure_internal_mode()
else:
raise NotImplementedError
def start_acq(self):
if self.acq_mode == AcqMode.Internal:
counters = self.internal_timer_counter, self.internal_point_nb_counter
self.card.set_counters_software_start(counters)
else:
raise NotImplementedError
def trigger_latch(self, counters):
self.card.trigger_counters_software_latch(counters)
@property
def acq_mode(self):
return self.__acq_mode
@acq_mode.setter
def acq_mode(self, acq_mode):
self.__acq_mode = AcqMode(acq_mode)
@property
def acq_nb_points(self):
return self.__acq_nb_points
@acq_nb_points.setter
def acq_nb_points(self, acq_nb_points):
self.__acq_nb_points = acq_nb_points
@property
def acq_expo_time(self):
return self.__acq_expo_time
@acq_expo_time.setter
def acq_expo_time(self, acq_expo_time):
self.__acq_expo_time = acq_expo_time
@property
def acq_channels(self):
return self.__acq_channels
@acq_channels.setter
def acq_channels(self, acq_channels):
self.__acq_channels = acq_channels
@property
def counters(self):
return self.card.get_counters_values()
@property
def latches(self):
return self.card.get_latches_values()
def read_data(self):
b = self.__buffer
if b:
self.__buffer = []
data = numpy.vstack(b)
else:
data = numpy.array([[]], dtype=numpy.uint32)
# -*- coding: utf-8 -*-
#
# This file is part of the CT2 project
#
# Copyright (c) : 2015
# Beamline Control Unit, European Synchrotron Radiation Facility
# BP 220, Grenoble 38043
# FRANCE
#
# Distributed under the terms of the GNU Lesser General Public License,
# either version 3 of the License, or (at your option) any later version.
# See LICENSE.txt for more info.
"""Release information for CT2 project"""
name = """CT2"""
package_name = """ct2"""
version = "0.2.0"
version_info = version.split(".")
description = """The ESRF CT2 (P201/C208) counter card project"""
license = """LGPL"""
author = "T. Coutinho, A. Homs and BCU team (ESRF)"
url = """http://gitlab.esrf.fr/driver/p201"""
org_name = """Beamline Control Unit"""
org_abbrev = """BCU"""
org_url = """www.blissgarden.org"""
copyright = """\
Copyright (c) : 2015
Beamline Control Unit, European Synchrotron Radiation Facility
BP 220, Grenoble 38043
FRANCE"""
# -*- coding: utf-8 -*-
#
# This file is part of the CT2 project
#
# Copyright (c) : 2015
# Beamline Control Unit, European Synchrotron Radiation Facility
# BP 220, Grenoble 38043
# FRANCE
#
# Distributed under the terms of the GNU Lesser General Public License,
# either version 3 of the License, or (at your option) any later version.
# See LICENSE.txt for more info.
# -*- coding: utf-8 -*-
#
# This file is part of the CT2 project
#
# Copyright (c) : 2015
# Beamline Control Unit, European Synchrotron Radiation Facility
# BP 220, Grenoble 38043
# FRANCE
#
# Distributed under the terms of the GNU Lesser General Public License,
# either version 3 of the License, or (at your option) any later version.
# See LICENSE.txt for more info.
import numpy
import PyTango.gevent
from ..device import BaseCT2Device, AcqMode
PyTango.requires_pytango('8.1.8')
class CT2Device(BaseCT2Device):
"""
Helper for a remote TANGO device CT2 card (P201/C208).
"""
def __init__(self, config, name):
BaseCT2Device.__init__(self, config, name)
device_name = self.card_config['tango name']
self.__tango_device = PyTango.gevent.DeviceProxy(device_name)
self.__tango_device.subscribe_event("data",
PyTango.EventType.DATA_READY_EVENT,
self.__on_point_nb)
self.__tango_device.subscribe_event("last_error",
PyTango.EventType.CHANGE_EVENT,
self.__on_error)
def __on_point_nb(self, event):
point_nb = event.ctr
if point_nb < 0:
self._send_stop()
else:
self._send_point_nb(point_nb)
def __on_error(self, event):
self._send_error(event.attr_value.value)
@property
def _device(self):
return self.__tango_device
@property
def acq_mode(self):
return AcqMode(self._device.acq_mode)
@acq_mode.setter
def acq_mode(self, acq_mode):
self._device.acq_mode = AcqMode(acq_mode).name
def apply_config(self):
self.card_config.save()
BaseCT2Device.apply_config(self)
def read_data(self):
data = Ct2Device.read_data(self)
if data is None:
data = numpy.array([[]], dtype=numpy.uint32)
return data
# -*- coding: utf-8 -*-
#
# This file is part of the CT2 project
#
# Copyright (c) : 2015
# Beamline Control Unit, European Synchrotron Radiation Facility
# BP 220, Grenoble 38043
# FRANCE
#
# Distributed under the terms of the GNU Lesser General Public License,
# either version 3 of the License, or (at your option) any later version.
# See LICENSE.txt for more info.
"""CT2 (P201/C208) ESRF counter card
CT2 (P201/C208) ESRF counter card TANGO device
"""
__all__ = ["CT2", "main"]
import PyTango
from PyTango.server import Device, DeviceMeta
from PyTango.server import attribute, command
from PyTango.server import class_property, device_property
from PyTango import AttrQuality, AttrWriteType, DispLevel, DevState
import gevent
from gevent import select
from louie import dispatcher
from beacon.static import get_config
from ...device import CT2Device, AcqMode, ErrorSignal, PointNbSignal, StopSignal
def switch_state(tg_dev, state=None, status=None):
"""Helper to switch state and/or status and send event"""
if state is not None:
tg_dev.set_state(state)
tg_dev.push_change_event("state")
if state in (DevState.ALARM, DevState.UNKNOWN, DevState.FAULT):
msg = "State changed to " + str(state)
if status is not None:
msg += ": " + status
#logging.getLogger(tg_dev.get_name()).error(msg)
if status is not None:
tg_dev.set_status(status)
tg_dev.push_change_event("status")
class CT2(Device):
"""
CT2 (P201/C208) ESRF counter card TANGO device
"""
__metaclass__ = DeviceMeta
card_name = device_property(dtype='str', default_value="p201")
def __init__(self, *args, **kwargs):
Device.__init__(self, *args, **kwargs)
def init_device(self):
Device.init_device(self)
for attr in ("state", "status", "last_error", "acq_mode",
"acq_expo_time", "acq_nb_points", "acq_channels"):
self.set_change_event(attr, True, False)
attr_map = self.get_device_attr()
data_attr = attr_map.get_attr_by_name("data")
data_attr.set_data_ready_event(True)
self.__last_error = ""