Commit 1ba9bb36 authored by Matias Guijarro's avatar Matias Guijarro

comm/scpi, controllers/keithley: adapted to map and logging

parent 90eea7e8
"""
:term:`SCPI` helpers (:class:`~bliss.comm.scpi.SCPI` class and \
:func:`~bliss.comm.scpi.BaseDevice` )
:func:`~bliss.comm.scpi.BaseSCPIDevice` )
Example::
......@@ -36,13 +36,14 @@ Example::
import re
import inspect
import logging
from functools import partial
import numpy
from .util import get_interface
from .exceptions import CommunicationError, CommunicationTimeout
from bliss.common import session
from bliss.common.logtools import LogMixin
def decode_IDN(s):
......@@ -380,7 +381,7 @@ def sanitize_msgs(*msgs, **opts):
return commands, queries, eol.join(result) + eol
class SCPI(object):
class SCPI(LogMixin):
"""
:term:`SCPI` language helper.
......@@ -410,15 +411,15 @@ class SCPI(object):
print scpi['*IDN']
"""
def __init__(self, *args, **kwargs):
interface, args, kwargs = get_interface(*args, **kwargs)
def __init__(self, interface=None, commands=COMMANDS, **kwargs):
self.interface = interface
session.get_current().map.register(
self, parents_list=["comms"], children_list=[self.interface], tag=str(self)
)
self._strict_query = kwargs.get("strict_query", True)
self._logger = logging.getLogger(str(self))
self._debug = self._logger.debug
self._contexts = []
self._eol = interface._eol
self.commands = Commands(kwargs.get("commands", COMMANDS))
self.commands = Commands(commands)
def enter_context(self):
context = dict(commands=[], result=None)
......@@ -594,7 +595,9 @@ class SCPI(object):
try:
result = getf(result)
except:
self._debug("Failed to convert result. Details:", exc_info=1)
self._logger.debug(
"Failed to convert result. Details:", exc_info=1
)
results.append((query, result))
return results
......@@ -656,7 +659,7 @@ class SCPI(object):
return stack or None
class BaseDevice(object):
class BaseSCPIDevice(LogMixin):
"""Base SCPI device class"""
def __init__(self, *args, **kwargs):
......@@ -664,7 +667,9 @@ class BaseDevice(object):
commands = kwargs.pop("commands", {})
self.interface = interface
self.language = SCPI(interface=interface, commands=commands)
self._logger = logging.getLogger(str(self))
session.get_current().map.register(
self, children_list=[self.language], tag=str(self)
)
def __str__(self):
return "{0}({1})".format(type(self).__name__, self.language)
......@@ -688,158 +693,3 @@ class BaseDevice(object):
def __exit__(self, etype, evalue, etraceback):
return self.language.exit_context(etype, evalue, etraceback)
def main(argv=None):
"""
Start a SCPI console
The following example will start a SCPI console with one SCPI instrument
called *s*::
$ python -m bliss.comm.scpi gpib --pad=15 enet://gpibhost
scpi> print s['*IDN']
"""
import sys
import argparse
try:
import serial
except:
serial = None
parser = argparse.ArgumentParser(description=main.__doc__)
parser.add_argument(
"--log-level",
type=str,
default="info",
choices=["trace", "debug", "info", "warning", "error"],
help="global log level [default: info]",
)
parser.add_argument(
"--scpi-log-level",
type=str,
default="info",
choices=["trace", "debug", "info", "warning", "error"],
help="log level for scpi object[default: info]",
)
parser.add_argument(
"--gevent",
action="store_true",
default=False,
help="enable gevent in console [default: False]",
)
subparsers = parser.add_subparsers(
title="connection",
dest="connection",
description="valid type of connections",
help="choose one type of connection",
)
gpib_parser = subparsers.add_parser("gpib", help="GPIB connection")
add = gpib_parser.add_argument
add(
"url", type=str, help="gpib instrument url (ex: gpibhost, enet://gpibhost:5000)"
)
add("--pad", type=int, required=True, help="primary address")
add("--sad", type=int, default=0, help="secondary address [default: 0]")
add(
"--tmo",
type=int,
default=10,
help="gpib timeout (gpib tmo unit) [default: 10 (=300ms)]",
)
add("--eos", type=str, default="\n", help=r"end of string [default: '\n']")
add("--timeout", type=float, default=0.4, help="socket timeout [default: 0.4]")
tcp_parser = subparsers.add_parser("tcp", help="TCP connection")
add = tcp_parser.add_argument
add("url", type=str, help="tcp instrument url (ex: host:5000, socket://host:5000)")
add("--timeout", type=float, default=5, help="timeout")
add("--eol", type=str, default="\n", help=r"end of line [default: '\n']")
if serial:
serial_parser = subparsers.add_parser("serial", help="serial line connection")
add = serial_parser.add_argument
add(
"port",
type=str,
help="serial instrument port (ex: rfc2217://.., ser2net://..)",
)
add("--baudrate", type=int, default=9600, help="baud rate")
add(
"--bytesize",
type=int,
choices=[6, 7, 8],
default=serial.EIGHTBITS,
help="byte size",
)
add(
"--parity",
choices=serial.PARITY_NAMES.keys(),
default=serial.PARITY_NONE,
help="parity type",
)
add("--timeout", type=float, default=5, help="timeout")
add(
"--stopbits",
type=float,
choices=[1, 1.5, 2],
default=serial.STOPBITS_ONE,
help="stop bits",
)
add("--xonxoff", action="store_true", default=False, help="")
add("--rtscts", action="store_true", default=False, help="")
add("--write-timeout", dest="writeTimeout", type=float, default=None, help="")
add("--dsrdtr", action="store_true", default=False, help="")
add(
"--interchar-timeout",
dest="interCharTimeout",
type=float,
default=None,
help="",
)
add("--eol", type=str, default="\n", help="end of line [default: '\\n']")
args = parser.parse_args()
vargs = vars(args)
log_level = vargs.pop("log_level").upper()
scpi_log_level = vargs.pop("scpi_log_level").upper()
logging.basicConfig(
level=log_level, format="%(asctime)s %(levelname)s %(name)s: %(message)s"
)
gevent_arg = vargs.pop("gevent")
conn = vargs.pop("connection")
kwargs = {conn: vargs, "commands": COMMANDS}
mode = not gevent_arg and "interactive, no gevent" or "gevent"
scpi = SCPI(**kwargs)
scpi._logger.setLevel(scpi_log_level)
local = dict(s=scpi)
banner = "\nWelcome to SCPI console " "(connected to {0}) ({1})\n".format(
scpi, mode
)
sys.ps1 = "scpi> "
sys.ps2 = len(sys.ps1) * "."
if gevent_arg:
try:
from gevent.monkey import patch_sys
except ImportError:
mode = "no gevent"
else:
patch_sys()
import code
code.interact(banner=banner, local=local)
if __name__ == "__main__":
main()
......@@ -91,13 +91,13 @@ from bliss.config.settings import HashSetting
from bliss.comm.exceptions import CommunicationError
from bliss.comm.scpi import Cmd as SCPICmd
from bliss.comm.scpi import Commands as SCPICommands
from bliss.comm.scpi import BaseDevice as BaseDeviceSCPI
from bliss.comm.scpi import BaseSCPIDevice
from .keithley_scpi_mapping import COMMANDS as SCPI_COMMANDS
from .keithley_scpi_mapping import MODEL_COMMANDS as SCPI_MODEL_COMMANDS
class KeithleySCPI(BaseDeviceSCPI):
class KeithleySCPI(BaseSCPIDevice):
"""Keithley instrument through SCPI language. Can be used with any Keithley
SCPI capable device.
......@@ -856,190 +856,3 @@ def create_sensor(config, node):
with ctrl:
obj = Sensor(node, ctrl)
return obj
def main():
"""
Start a Keithley console.
The following example will start a Keithley console with one Keithley
instrument called *k*::
$ python -m bliss.controllers.keithley gpib --pad=15 enet://gpibhost
keithley> print( k['*IDN?'] )
"""
import sys
import logging
import argparse
try:
import serial
except:
serial = None
parser = argparse.ArgumentParser(description=main.__doc__)
parser.add_argument(
"--model",
type=str,
default=None,
help="keithley model (ex: 6482) [default: auto discover]",
)
parser.add_argument(
"--log-level",
type=str,
default="info",
choices=["debug", "info", "warning", "error"],
help="log level [default: info]",
)
parser.add_argument(
"--scpi-log-level",
type=str,
default="info",
choices=["trace", "debug", "info", "warning", "error"],
help="log level for scpi object [default: info]",
)
parser.add_argument(
"--keithley-log-level",
type=str,
default="info",
choices=["trace", "debug", "info", "warning", "error"],
help="log level for keithley object [default: info]",
)
parser.add_argument(
"--gevent",
action="store_true",
default=False,
help="enable gevent in console [default: False]",
)
subparsers = parser.add_subparsers(
title="object/connection",
dest="connection",
description="config object name or valid type of connections",
help="choose keithley config object name or type of connection",
)
config_parser = subparsers.add_parser("config", help="keithey config object")
config_parser.add_argument("name", help="config object name")
gpib_parser = subparsers.add_parser("gpib", help="GPIB connection")
add = gpib_parser.add_argument
add(
"url", type=str, help="gpib instrument url (ex: gpibhost, enet://gpibhost:5000)"
)
add("--pad", type=int, required=True, help="primary address")
add("--sad", type=int, default=0, help="secondary address [default: 0]")
add(
"--tmo",
type=int,
default=10,
help="GPIB timeout (GPIB tmo unit) [default: 11 (=1s)]",
)
add("--eos", type=str, default="\n", help=r"end of string [default: '\n']")
add("--timeout", type=float, default=1.1, help="socket timeout [default: 1.1]")
tcp_parser = subparsers.add_parser("tcp", help="TCP connection")
add = tcp_parser.add_argument
add("url", type=str, help="TCP instrument url (ex: keith6485:25000)")
if serial:
serial_parser = subparsers.add_parser("serial", help="serial line connection")
add = serial_parser.add_argument
add(
"port",
type=str,
help="serial instrument port (ex: rfc2217://.., ser2net://..)",
)
add("--baudrate", type=int, default=9600, help="baud rate")
add(
"--bytesize",
type=int,
choices=[6, 7, 8],
default=serial.EIGHTBITS,
help="byte size",
)
add(
"--parity",
choices=list(serial.PARITY_NAMES.keys()),
default=serial.PARITY_NONE,
help="parity type",
)
add("--timeout", type=float, default=5, help="timeout")
add(
"--stopbits",
type=float,
choices=[1, 1.5, 2],
default=serial.STOPBITS_ONE,
help="stop bits",
)
add("--xonxoff", action="store_true", default=False, help="")
add("--rtscts", action="store_true", default=False, help="")
add("--write-timeout", dest="writeTimeout", type=float, default=None, help="")
add("--dsrdtr", action="store_true", default=False, help="")
add(
"--interchar-timeout",
dest="interCharTimeout",
type=float,
default=None,
help="",
)
add("--eol", type=str, default="\n", help="end of line [default: '\\n']")
args = parser.parse_args()
vargs = vars(args)
model = vargs.pop("model", None)
log_level = getattr(logging, vargs.pop("log_level").upper())
keithley_log_level = vargs.pop("keithley_log_level").upper()
scpi_log_level = vargs.pop("scpi_log_level").upper()
logging.basicConfig(
level=log_level, format="%(asctime)s %(levelname)s %(name)s: %(message)s"
)
gevent_arg = vargs.pop("gevent")
conn = vargs.pop("connection")
local = {}
if conn == "config":
from bliss.config.static import get_config
config = get_config()
name = vargs["name"]
keithley = create_objects_from_config_node(config, config.get_config(name))[
name
]
if isinstance(keithley, Sensor):
sensor = keithley
keithley = sensor.controller
local["s"] = sensor
else:
kwargs = {conn: vargs, "model": model}
keithley = KeithleySCPI(**kwargs)
local["k"] = keithley
keithley._logger.setLevel(keithley_log_level)
keithley.language._logger.setLevel(scpi_log_level)
keithley.interface._logger.setLevel(scpi_log_level)
sys.ps1 = "keithley> "
sys.ps2 = len(sys.ps1) * "."
if gevent_arg:
try:
from gevent.monkey import patch_sys
except ImportError:
mode = "no gevent"
else:
patch_sys()
import code
mode = not gevent_arg and "interactive, no gevent" or "gevent"
banner = "\nWelcome to Keithley console " "(connected to {0}) ({1})\n".format(
keithley, mode
)
code.interact(banner=banner, local=local)
if __name__ == "__main__":
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment