Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
ID01
id01
Commits
24bee068
Commit
24bee068
authored
Feb 20, 2020
by
Roberto Arturo Homs-Regojo
Browse files
moved nd287 from bliss to id01
parent
b1f3324e
Changes
1
Hide whitespace changes
Inline
Side-by-side
id01/controllers/nd287.py
0 → 100644
View file @
24bee068
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
# Copyright (c) 2015-2019 Beamline Control Unit, ESRF
# Distributed under the GNU LGPLv3. See LICENSE for more info.
"""HEIDENHAIN ND287 2-channels encoder controller
Example YAML_ controller configuration:
.. code-block:: yaml
plugin: bliss
class: ND287
name: nd287
tcp:
url: nd287id011
counters:
-
counter_name: ndenc1
channel: 1
factor: 1
-
counter_name: ndenc2
channel: 2
factor: 1
Example YAML_ session configuration with the counters
exported with alias:
.. code-block:: yaml
name: rh
class: Session
config-objects:
- nd287
aliases:
- original_name: nd287:ndenc1
alias_name: ndenc1
export_to_globals: True
- original_name: nd287:ndenc2
alias_name: ndenc2
export_to_globals: True
Usage::
"""
import
re
import
time
import
numpy
as
np
from
bliss.comm.util
import
get_comm
,
TCP
from
bliss.common
import
session
from
bliss.common.logtools
import
*
from
bliss.controllers.counter
import
counter_namespace
,
SamplingCounterController
from
bliss.common.counter
import
SamplingCounter
from
bliss.common.counter
import
SamplingMode
class
ND287enc
(
SamplingCounter
):
def
__init__
(
self
,
name
,
controller
,
channel
=
1
):
SamplingCounter
.
__init__
(
self
,
name
,
controller
)
# ref to the controller
# the reading of many counters depending on the same
# controller will be performed using controller.read_all() function
self
.
channel
=
channel
self
.
factor
=
controller
.
factor
[
channel
]
def
__info__
(
self
):
info_string
=
(
f
"ND287 counter[
{
self
.
name
}
] ch[
{
self
.
channel
}
] factor[
{
self
.
factor
}
]"
)
return
info_string
class
ND287
(
SamplingCounterController
):
def
__init__
(
self
,
name
,
config
):
SamplingCounterController
.
__init__
(
self
,
name
)
# self.name = name
self
.
bliss_config
=
config
self
.
factor
=
{}
# port number is fixed for this device.
self
.
comm
=
get_comm
(
config
,
TCP
,
eol
=
"
\n
"
,
port
=
1050
)
self
.
comm_id
=
0
self
.
counters_list
=
list
()
for
counter_conf
in
config
.
get
(
"counters"
,
list
()):
cnt_name
=
counter_conf
[
"counter_name"
]
if
not
(
"channel"
in
counter_conf
):
raise
RuntimeError
(
f
"ND287 missing parameter in counter[
{
cnt_name
}
] - [channel]"
)
else
:
try
:
ch
=
int
(
counter_conf
[
"channel"
])
except
ValueError
:
raise
RuntimeError
(
f
"ND287 invalid parameter in counter[
{
cnt_name
}
] - channel[
{
counter_conf
[
'channel'
]
}
]"
)
if
(
ch
<
1
)
or
(
ch
>
2
):
raise
RuntimeError
(
f
"ND287 invalid value in counter[
{
cnt_name
}
] - channel[
{
ch
}
]"
)
if
not
(
"factor"
in
counter_conf
):
factor
=
1.0
else
:
try
:
factor
=
float
(
counter_conf
[
"factor"
])
except
ValueError
:
raise
RuntimeError
(
"ND287 invalid parameter in counter[{cnt_name}] - [factor]"
)
self
.
factor
[
ch
]
=
factor
counter
=
ND287enc
(
cnt_name
,
self
,
ch
)
counter
.
mode
=
SamplingMode
.
SINGLE
# self.counters_list.append(counter)
self
.
_counters
[
counter
.
name
]
=
counter
# @property
# def counters(self):
# return counter_namespace(self.counters_list)
def
__info__
(
self
):
"""
Return info string used by BLISS shell.
"""
ver
=
self
.
get_version
()
info_string
=
"ND287 controller info:
\n
"
info_string
+=
"- FW ver [0x%08x][%d]
\n
"
%
(
ver
,
ver
)
# info_string += "\n".join(self.get_info())
log_info
(
self
,
info_string
)
return
info_string
def
read_all
(
self
,
*
counters
):
vlist
=
list
()
# Counters filling.
for
counter
in
counters
:
ch
=
counter
.
channel
if
(
type
(
ch
)
==
type
(
1
))
and
(
ch
>=
1
)
and
(
ch
<=
2
):
vlist
.
append
(
self
.
read_pos
(
ch
))
else
:
vlist
.
append
(
-
1
)
return
vlist
def
read_pos
(
self
,
ch
):
_id
=
self
.
get_new_id
()
cmd
=
(
self
.
int_to_bytes
(
0x501
,
np
.
uint32
)
# 4byte cmd
+
self
.
int_to_bytes
(
13
,
np
.
uint32
)
# 4byte lg frame
+
self
.
int_to_bytes
(
_id
,
np
.
uint32
)
# 4byte id
+
self
.
int_to_bytes
(
ch
,
np
.
uint8
)
# 1byte channel
)
self
.
write_raw_bytes
(
cmd
)
ans
=
self
.
read_raw_all_bytes
()
rlen
=
self
.
get_header
(
ans
)
status
=
self
.
bytes_to_int
(
ans
,
12
,
np
.
uint16
)
if
not
(
status
&
1
):
log_error
(
self
,
"ND287 ERROR ch[%d] status[0x%04x]
\n
"
%
(
ch
,
status
))
# [status] [LSW pos] [ MSL pos ]
# 4b 4b 16b
lsw
=
self
.
bytes_to_int
(
ans
,
14
,
np
.
uint16
)
msl
=
self
.
bytes_to_int
(
ans
,
16
,
np
.
uint32
)
pos0
=
(
msl
<<
16
)
+
lsw
factor
=
1.0e-4
*
self
.
factor
[
ch
]
neg_mask
=
0x1
<<
(
6
*
8
-
1
)
# MSbit of 6 bytes
neg_off
=
neg_mask
<<
1
if
pos0
&
neg_mask
:
pos
=
np
.
float
(
factor
*
(
pos0
-
neg_off
))
else
:
pos
=
np
.
float
(
factor
*
pos0
)
return
pos
def
get_new_id
(
self
):
self
.
comm_id
=
(
self
.
comm_id
+
1
)
&
0x7fffffff
if
self
.
comm_id
==
0
:
self
.
comm_id
=
1
return
self
.
comm_id
def
get_header
(
self
,
ans
):
rcmd
=
self
.
bytes_to_int
(
ans
,
0
,
np
.
uint32
)
rlen
=
self
.
bytes_to_int
(
ans
,
4
,
np
.
uint32
)
rid
=
self
.
bytes_to_int
(
ans
,
8
,
np
.
uint32
)
if
(
not
(
rcmd
&
1
<<
(
4
*
8
-
1
)))
or
(
rcmd
&
1
<<
(
4
*
8
-
2
)):
log_error
(
self
,
"ND287 HEADER ERROR rcmd[0x%08x] rlen[%d]"
%
(
rcmd
,
rlen
))
log_debug
(
self
,
"HEADER rcmd[0x%08x][0x%08x] rlen[%d] rid[%d]
\n
"
%
(
rcmd
,
rcmd
&
0x7fffffff
,
rlen
,
rid
),
)
return
(
rcmd
,
rlen
,
rid
)
def
get_version
(
self
):
_id
=
self
.
get_new_id
()
cmd
=
(
self
.
int_to_bytes
(
0x401
,
np
.
uint32
)
# 4byte cmd
+
self
.
int_to_bytes
(
12
,
np
.
uint32
)
# 4byte lg frame
+
self
.
int_to_bytes
(
_id
,
np
.
uint32
)
# 4byte id
)
self
.
write_raw_bytes
(
cmd
)
ans
=
self
.
read_raw_all_bytes
()
ret
=
self
.
get_header
(
ans
)
lg
=
ret
[
1
]
if
lg
!=
16
:
log_error
(
self
,
"ND287 ERROR invalid version length rlen[%d]
\n
"
%
(
lg
,))
return
0
ver
=
self
.
bytes_to_int
(
ans
,
12
,
np
.
uint16
)
return
ver
def
get_error
(
self
):
cmd
=
(
self
.
int_to_bytes
(
0x605
,
np
.
uint32
)
# 4byte cmd
+
self
.
int_to_bytes
(
12
,
np
.
uint32
)
# 4byte lg frame
+
self
.
int_to_bytes
(
_id
,
np
.
uint32
)
# 4byte id
)
self
.
write_raw_bytes
(
cmd
)
ans
=
self
.
read_raw_all_bytes
()
ret
=
self
.
get_header
(
ans
)
err
=
ans
[
12
:].
split
(
b
"
\x00
"
)[
0
].
decode
()
if
not
len
(
err
):
return
"NO ERROR"
return
err
def
send_keystroke_bstr
(
self
,
bs
):
keys
=
b
"s0123456789C-.NEUDabcd"
lg
=
len
(
bs
)
val
=
list
()
for
i
in
range
(
lg
):
k
=
bs
[
i
]
if
k
in
keys
:
val
.
append
(
keys
.
find
(
k
)
-
1
)
else
:
raise
RuntimeError
(
"invalid key[%c][%d] valid[%s]"
%
(
k
,
k
,
keys
.
decode
())
)
print
(
val
)
for
i
in
range
(
lg
):
k
=
val
[
i
]
if
k
<
0
:
time
.
sleep
(.
1
)
else
:
status
=
self
.
send_keystroke_one
(
k
)
if
status
:
raise
RuntimeError
(
"ND287 ERROR in the key[%c][%d] (insufficient delay?)"
%
(
k
,
k
)
)
return
def
send_keystroke_one
(
self
,
k
):
_id
=
self
.
get_new_id
()
cmd
=
(
self
.
int_to_bytes
(
0x601
,
np
.
uint32
)
# 4byte cmd
+
self
.
int_to_bytes
(
13
,
np
.
uint32
)
# 4byte lg frame
+
self
.
int_to_bytes
(
_id
,
np
.
uint32
)
# 4byte id
+
self
.
int_to_bytes
(
k
,
np
.
uint8
)
# 1byte k
)
self
.
write_raw_bytes
(
cmd
)
ans
=
self
.
read_raw_all_bytes
()
ret
=
self
.
get_header
(
ans
)
status
=
self
.
bytes_to_int
(
ans
,
12
,
np
.
uint8
)
if
status
:
log_error
(
self
,
f
"ND287 ERROR key cmd [
{
k
}
] - timing?"
)
return
status
def
read_raw_all_bytes
(
self
):
b
=
self
.
comm
.
raw_read
(
1000
,
1
)
log_debug
(
self
,
f
"READ [
{
b
}
]"
)
return
b
def
write_raw_str
(
self
,
message
):
"""
Send <message> to the controller.
* type of <message> must be 'str'
* converts <message> into 'bytes'
* no terminator char ???
* send command to the device
* NO answer is read from controller
"""
self
.
comm
.
write
(
message
.
encode
())
def
write_raw_bytes
(
self
,
message
):
self
.
comm
.
write
(
message
)
log_debug
(
self
,
f
"WRITE [
{
message
}
]"
)
def
int_to_bytes
(
self
,
val
,
ty
):
res
=
np
.
array
([
val
],
dtype
=
ty
).
tobytes
()
return
res
def
bytes_to_int
(
self
,
ans
,
off
,
ty
):
if
(
ty
==
np
.
uint32
)
or
(
ty
==
np
.
int32
):
dsize
=
4
elif
(
ty
==
np
.
uint16
)
or
(
ty
==
np
.
int16
):
dsize
=
2
elif
(
ty
==
np
.
uint8
)
or
(
ty
==
np
.
int8
):
dsize
=
1
else
:
raise
RuntimeError
(
"ND287 ERROR invalid np.type"
)
val
=
np
.
int
(
np
.
frombuffer
(
ans
[
off
:
off
+
dsize
],
dtype
=
ty
))
return
val
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment