Commit 1c4347fa authored by Manuel Perez's avatar Manuel Perez

Add first working IcePAP trajectory controller and its test script

parent f457b509
This diff is collapsed.
"""Handle IcePAP devices"""
__all__ = ['system', 'groups', 'axis', 'types']
__all__ = ['system', 'groups', 'axis', 'types', 'trajectories']
from system import *
from groups import *
from axis import *
from types import *
# really needed?
#from deep import *
#from deep.device import *
#from deep.cmd import *
from system import *
from groups import *
from axis import *
from types import *
from trajectories import *
......@@ -109,10 +109,21 @@ def status_ismoving(stat):
# Any motion in progress
return False
def status_set_ismoving(stat, val=True):
return status_set_bit(stat, 10, val)
def status_set_isready(stat, val=True):
return status_set_bit(stat, 9, val)
def status_set_bit(stat, bit, val):
if val:
return (stat | (1<<bit)) & ((1<<32)-1)
else:
return (stat & ~(1<<bit)) & ((1<<32)-1)
def status_isready(stat):
"""
Returns True if the axis status given indicates
Returns True if the axis status given indicates
that the axis is ready to move
"""
......@@ -121,7 +132,7 @@ def status_isready(stat):
def status_lowlim(stat):
"""
Returns True if the axis status given indicates
Returns True if the axis status given indicates
a low limitswitch active
"""
......@@ -145,6 +156,8 @@ def status_home(stat):
return ((stat & (1<<20)) != 0)
#-------------------------------------------------------------------------
# Inteface function
#
......@@ -212,8 +225,8 @@ class Axis(object):
self._used_in_groups = []
#
self._commands = globals._known_commandlist[hostname]
self._commands = globals._known_commandlist[hostname]
# Update the library global resource
globals._known_axis[self._name] = self
......@@ -284,7 +297,7 @@ class Axis(object):
# Minimum check on command syntax
str_cmd = string.strip(str_cmd)
if str_cmd[0] == "#":
str_cmd = str_cmd[1:]
str_cmd = str_cmd[1:]
return self._system.ackcommand(self._addrprefix + str_cmd, in_data)
def status(self):
......@@ -303,7 +316,7 @@ class Axis(object):
#
def _parse_flags(flags):
"""
Parse the string with syntax "param=value..." and
Parse the string with syntax "param=value..." and
returns a dictionary
"""
......
......@@ -39,6 +39,10 @@ def name_to_group(name):
except:
raise ValueError("invalid group name \"%s\""%name)
def group_exists(name):
return name in globals._known_groups
#-------------------------------------------------------------------------
# Inteface function
#
......
This diff is collapsed.
"""IcePAP library"""
#-------------------------------------------------------------------------
# Standard modules
#
import string
import struct
import numpy
#-------------------------------------------------------------------------
# Library modules
#
import deep.log as log
#-------------------------------------------------------------------------
# Global definitions
#
PARAMETER = 'P'
POSITION = 'A'
SLOPE = 'S'
BYTE = 'BYTE'
WORD = 'WORD'
DWORD = 'DWORD'
LWORD = 'LWORD'
FLOAT = 'FLOAT'
DFLOAT = 'DFLOAT'
UBYTE = 'UBYTE'
UWORD = 'UWORD'
UDWORD = 'UDWORD'
ULWORD = 'ULWORD'
ADDRUNSET = 0xFF
#-------------------------------------------------------------------------
# Class definition
#
class vdata(object):
"""IcePAP data vector
# object creation
v = vdata()
# appending data
data = list(...)
addr = 3
type = vdata.PARAMETER | POSITION | SLOPE
v.append(data, addr, type)
v.append(data, addr, type, format=vdata.DWORD)
v.append(data, addr, type, incremental=True)
"""
"""Private menbers"""
__signature = 0xCAFE
__incremental = 0x8000
__type_code = {
PARAMETER:0x1000,
POSITION :0x2000,
SLOPE :0x4000
}
__type_desc = {
PARAMETER:'PARAMETER',
POSITION :'POSITION',
SLOPE :'SLOPE'
}
__format_code = {
BYTE :0x00,
WORD :0x01,
DWORD :0x02,
LWORD :0x03,
FLOAT :0x04,
DFLOAT :0x05,
UBYTE :0x10,
UWORD :0x11,
UDWORD :0x12,
ULWORD :0x13
}
__format_dtype = {
BYTE :'b',
WORD :'h',
DWORD :'l',
LWORD :'q',
FLOAT :'f',
DFLOAT :'d',
UBYTE :'B',
UWORD :'H',
UDWORD :'L',
ULWORD :'Q'
}
__header_format = '<HBBLLBBHd'
"""
from DSP source code:
typedef struct {
uint16_t signature; // Signature must be VDAT_SIGNATURE (0xCAFE)
uint16_t offset_vers; // High byte: data offset in dwords = 6 (0x0018)
// Low byte: version of the data vector format (0)
uint32_t v_size; // full vector size in dwords
uint32_t n_values; // number of values in the vector
uint16_t compr_dtype; // high byte: compression algorithm:
// 0=uncompressed, 1=lzapp
// low byte: data type
uint16_t flags_addr; // coding flags and board address
extdfloat_t firstvalue; // first vector value if incremental
} vdatheader_t;
"""
def __init__(self):
"""Object constructor"""
self._bytearray = bytearray(0)
def append(self, data, addr, type, format=DWORD, incremental=False):
"""Append data to current"""
# Minimum checks
if((addr < 0) or (addr > 255)):
raise ValueError("invalid address, must be [0:255]")
if(len(data) > 0xFFFF):
raise ValueError("too many data values, max: 0xFFFF")
if(type not in self.__type_code):
raise ValueError("invalid type specified")
if(format not in self.__format_code):
raise ValueError("invalid format specified")
#
dtype = self.__format_code[format]
flags = self.__type_code[type] + addr
dformat = '<' + str(len(data)) + self.__format_dtype[format]
if incremental:
firstval = data[0]
data = [0 if i == 0 else \
data[i]-data[i-1] for i in range(len(data))]
flags |= self.__incremental
else:
firstval = 0
header_size = struct.calcsize(self.__header_format)
full_size = header_size + struct.calcsize(dformat)
if full_size % 4: full_size += 4 - (full_size % 4)
#print("header_size", header_size)
#print("full_size", full_size)
#print("data format", dformat)
# build the byte array
bin_column = bytearray(full_size)
struct.pack_into(self.__header_format, bin_column, 0,
self.__signature, # vdata signature
0, # Version = 0
header_size / 4, # Data offset in dwords
full_size / 4, # Full vector size in dwords
len(data), # number of values in the vector
dtype, # Data type
0, # no compression
flags, # format + address
firstval # first data value for incremental coding
)
struct.pack_into(dformat, bin_column, header_size, *data)
# append the byte array
self._bytearray += bin_column
def bin(self):
"""Return an IcePAP binary compatible block"""
return numpy.array(self._bytearray, dtype=numpy.int8)
def type_to_str(self, flags):
"""Returns a string describing an encoded vector type"""
flags &= 0xff00
for type in self.__type_code:
if flags == self.__type_code[type]:
return self.__type_desc[type]
return 'unknown'
def addr_to_str(self, flags):
"""Returns the destination address from an encoded vector header"""
return flags & 0xff
def loginfo(self):
"""Print out on log information about the data vector"""
# brute dump of bytes
# TODO: replace the test by a log.istrace()
if log.level() >= log.DBG_DATA:
log.trace("data vector contains:")
n = self.bin()
print ' '.join('0x{0:02x}'.format(x & 0xff) for x in n)
# minimum check
header_size = struct.calcsize(self.__header_format)
if len(self._bytearray) < header_size:
return
# loop over all slices of data vector
idx = 0
cpt = 1
while len(self._bytearray[idx:]) >= header_size:
# extract header information
(
signature, version, header_sz, full_sz,
data_len, data_type, compression,
flags, firstval
) = struct.unpack_from(
self.__header_format,
bytes(self._bytearray[idx:]))
# minium check
if signature != self.__signature:
raise ValueError('corrupted data vector, missing signature')
# convert size from DWORD to bytes
full_sz *= 4
#
log.trace("#%d: data vector type: %s" %
(cpt, self.type_to_str(flags)))
log.trace("#%d: destination addr: %d" %
(cpt, self.addr_to_str(flags)))
log.trace("#%d: number of data : %d" %
(cpt, data_len))
log.trace("#%d: data vector size: %dbytes" %
(cpt, full_sz))
# jump to next slice
idx += full_sz
cpt += 1
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment