emulator.py 14.5 KB
Newer Older
1
2
3
4
# -*- coding: utf-8 -*-
#
# This file is part of the bliss project
#
5
# Copyright (c) 2015-2022 Beamline Control Unit, ESRF
6
7
# Distributed under the GNU LGPLv3. See LICENSE for more info.

Lucas Felix's avatar
Lucas Felix committed
8
9
"""Emulator :mod:`~tests.emulators.emulator.Server` and \
:mod:`~tests.emulators.emulator.BaseDevice`
10
11
12

Quick start
-----------
13
14
15
16
17
18
19
20
21
22
23
24

To create a server use the following configuration as a starting point:

.. code-block:: yaml

    name: my_emulator
    devices:
        - class: SCPI
          transports:
              - type: tcp
                url: :25000

25
To start the server you can do something like::
26

Lucas Felix's avatar
Lucas Felix committed
27
    $ python -m tests.emulators.emulator my_emulator
28

29
(bliss also provides a ``bliss-emulator`` script which basically does the same)
30

31
An emulator how-to is available :ref:`here <bliss-emulator-how-to>`.
32
33
34
35
36
37
38
39
40
41
42
43
44
45
"""


import os
import pty
import sys
import logging
import weakref

import gevent
from gevent.baseserver import BaseServer
from gevent.server import StreamServer
from gevent.fileobject import FileObject

46
_log = logging.getLogger("emulator")
47

48
49
50
51
52
53
54
55
56
__all__ = [
    "Server",
    "BaseDevice",
    "EmulatorServerMixin",
    "SerialServer",
    "TCPServer",
    "main",
    "create_server_from_config",
]
57

58
59
60
61
62
63
64
65
66
67
68
69
70

class EmulatorServerMixin(object):
    """
    Mixin class for TCP/Serial servers to handle line based commands.
    Internal usage only
    """

    def __init__(self, device=None, newline=None, baudrate=None):
        self.device = device
        self.baudrate = baudrate
        self.newline = device.newline if newline is None else newline
        self.special_messages = set(device.special_messages)
        self.connections = {}
71
72
73
74
75
76
77
78
79
80
        name = "{0}({1}, device={2})".format(
            type(self).__name__, self.address, device.name
        )
        self._log = logging.getLogger("{0}.{1}".format(_log.name, name))
        self._log.info(
            "listening on %s (newline=%r) (baudrate=%s)",
            self.address,
            self.newline,
            self.baudrate,
        )
81
82

    def handle(self, sock, addr):
83
        file_obj = sock.makefile(mode="rb")
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
        self.connections[addr] = file_obj, sock
        try:
            return self.__handle(sock, file_obj)
        finally:
            file_obj.close()
            del self.connections[addr]

    def __handle(self, sock, file_obj):
        """
        Handle new connection and requests

        Arguments:
            sock (gevent.socket.socket): new socket resulting from an accept
            addr tuple): address (tuple of host, port)
        """
99
        if self.newline == "\n" and not self.special_messages:
Lucas Felix's avatar
Lucas Felix committed
100
101
            for buf in file_obj:
                line = buf.decode()
102
103
104
105
                self.handle_line(sock, line)
        else:
            # warning: in this mode read will block even if client
            # disconnects. Need to find a better way to handle this
106
            buff = ""
107
108
            finish = False
            while not finish:
Lucas Felix's avatar
Lucas Felix committed
109
                readout = file_obj.read(1).decode()
110
111
112
113
                if not readout:
                    return
                buff += readout
                if buff in self.special_messages:
114
115
                    lines = (buff,)
                    buff = ""
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
                else:
                    lines = buff.split(self.newline)
                    buff, lines = lines[-1], lines[:-1]
                for line in lines:
                    if not line:
                        return
                    self.handle_line(sock, line)

    def handle_line(self, sock, line):
        """
        Handle a single command line. Emulates a delay if baudrate is defined
        in the configuration.

        Arguments:
            sock (gevent.socket.socket): new socket resulting from an accept
            addr (tuple): address (tuple of host, port)
            line (str): line to be processed
        """
        self.pause(len(line))
        response = self.device.handle_line(line)
        if response is not None:
Lucas Felix's avatar
Lucas Felix committed
137
138
139
            buf = response.encode()
            self.pause(len(buf))
            sock.sendall(buf)
140
141
142
143
144
145
146
147
148
149
150
151
152

    def pause(self, nb_bytes):
        """
        Emulate a delay simulating the transport of the given number of bytes,
        correspoding to the baudrate defined in the configuration

        Arguments:
            nb_bytes (int): number of bytes to transport
        """
        # emulate baudrate
        if not self.baudrate:
            return
        byterate = self.baudrate / 10.0
153
154
        sleep_time = nb_bytes / byterate
        gevent.sleep(sleep_time)
155
156

    def broadcast(self, msg):
Vincent Michel's avatar
Vincent Michel committed
157
        for _, (_, sock) in list(self.connections.items()):
158
159
160
            try:
                sock.sendall(msg)
            except:
161
                self._log.exception("error in broadcast")
162
163
164
165


class SerialServer(BaseServer, EmulatorServerMixin):
    """
166
    Serial line emulation server. It uses :func:`pty.opentpy` to open a
167
168
169
170
    pseudo-terminal simulating a serial line.
    """

    def __init__(self, *args, **kwargs):
171
172
173
174
175
        device = kwargs.pop("device")
        self.link_name = kwargs.pop("url")
        e_kwargs = dict(
            baudrate=kwargs.pop("baudrate", None), newline=kwargs.pop("newline", None)
        )
176
177
178
        BaseServer.__init__(self, None, *args, **kwargs)
        EmulatorServerMixin.__init__(self, device, **e_kwargs)

179
180
181
182
183
184
185
186
187
    def __del__(self):
        try:
            print("Removing pseudo terminal link : %s" % self.link_name)
            os.remove(self.link_name)
        except:
            print("pseudo terminal link no more present ?")

    def terminate(self):
        try:
188
189
190
191
            print(
                "terminate of SerialServer : Removing pseudo terminal link : %s"
                % self.link_name
            )
192
193
194
195
            os.remove(self.link_name)
        except:
            print("pseudo terminal link no more present ?")

196
197
    def set_listener(self, listener):
        """
198
199
        Override of :meth:`~gevent.baseserver.BaseServer.set_listener` to
        initialize a pty and properly fill the address
200
201
        """
        if listener is None:
202
203
204
205
            (
                self.master,
                self.slave,
            ) = (
206
207
                pty.openpty()
            )  # returns the 2 file descriptors within the current process.
208
209
        else:
            self.master, self.slave = listener
210

211
212
213
214
215
216
        self.address = os.ttyname(
            self.slave
        )  # /dev/pts/N : slave pts to use to communicate with emulator.
        self.fileobj = FileObject(
            self.master, mode="rb"
        )  # <FileObjectPosix <SocketAdapter at 0x1723490 (7, 'rb')>>
217
218

        # Make a link to the randomly named pseudo-terminal with a known name.
219
        link_path, link_fname = os.path.split(self.link_name)
220
221
222
223
        try:
            os.remove(self.link_name)
        except:
            pass
224
225
        if not os.path.exists(link_path):
            os.makedirs(link_path)
226
        os.symlink(self.address, self.link_name)
227
228
229
230
        print(
            'Created symbolic link "%s" to emulator pseudo terminal "%s" '
            % (self.link_name, self.address)
        )
231
232
233
234

    @property
    def socket(self):
        """
235
236
        Override of :meth:`~gevent.baseserver.BaseServer.socket` to return a
        socket object for the pseudo-terminal file object
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
        """
        return self.fileobj._sock

    def _do_read(self):
        # override _do_read to properly handle pty
        try:
            self.do_handle(self.socket, self.address)
        except:
            self.loop.handle_error(([self.address], self), *sys.exc_info())
            if self.delay >= 0:
                self.stop_accepting()
                self._timer = self.loop.timer(self.delay)
                self._timer.start(self._start_accepting_if_started)
                self.delay = min(self.max_delay, self.delay * 2)


class TCPServer(StreamServer, EmulatorServerMixin):
    """
    TCP emulation server
    """

    def __init__(self, *args, **kwargs):
259
        listener = kwargs.pop("url")
260
261
        if isinstance(listener, list):
            listener = tuple(listener)
262
263
264
265
        device = kwargs.pop("device")
        e_kwargs = dict(
            baudrate=kwargs.pop("baudrate", None), newline=kwargs.pop("newline", None)
        )
266
267
268
269
270
        StreamServer.__init__(self, listener, *args, **kwargs)
        EmulatorServerMixin.__init__(self, device, **e_kwargs)

    def handle(self, sock, addr):
        info = self._log.info
271
        info("new connection from %s", addr)
272
        EmulatorServerMixin.handle(self, sock, addr)
273
        info("client disconnected %s", addr)
274
275
276
277
278
279
280
281


class BaseDevice(object):
    """
    Base intrument class. Override to implement an emulator for a specific
    device
    """

282
    DEFAULT_NEWLINE = "\n"
283
284
285

    special_messages = set()

286
    def __init__(self, name, newline=None, **kwargs):
287
        self.name = name
288
        self.newline = self.DEFAULT_NEWLINE if newline is None else newline
289
        self._log = logging.getLogger("{0}.{1}".format(_log.name, name))
290
        self.__transports = weakref.WeakKeyDictionary()
291
        if kwargs:
292
            self._log.warning(
Vincent Michel's avatar
Vincent Michel committed
293
                "constructor keyword args ignored: %s", ", ".join(list(kwargs.keys()))
294
            )
295
296
297
298

    @property
    def transports(self):
        """the list of registered transports"""
Vincent Michel's avatar
Vincent Michel committed
299
        return list(self.__transports.keys())
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332

    @transports.setter
    def transports(self, transports):
        self.__transports.clear()
        for transport in transports:
            self.__transports[transport] = None

    def handle_line(self, line):
        """
        To be implemented by the device.

        Raises: NotImplementedError
        """
        raise NotImplementedError

    def broadcast(self, msg):
        """
        broadcast the given message to all the transports

        Arguments:
            msg (str): message to be broadcasted
        """
        for transport in self.transports:
            transport.broadcast(msg)


class Server(object):
    """
    The emulation server

    Handles a set of devices
    """

333
    def __init__(self, name="", devices=(), backdoor=None):
334
        self.name = name
335
336
        self._log = logging.getLogger("{0}.{1}".format(_log.name, name))
        self._log.info("Bootstraping server")
337
338
        if backdoor:
            from gevent.backdoor import BackdoorServer
339
340
341
342
343
344
345
346
347

            banner = (
                "Welcome to Bliss emulator server console.\n"
                "My name is {0!r}. You can access me through the "
                "'server()' function. Have fun!".format(name)
            )
            self.backdoor = BackdoorServer(
                backdoor, banner=banner, locals=dict(server=weakref.ref(self))
            )
348
            self.backdoor.start()
349
            self._log.info("Backdoor opened at %r", backdoor)
350
351

        else:
352
            self._log.info("no backdoor declared")
353

354
355
356
357
358
        self.devices = {}
        for device in devices:
            try:
                self.create_device(device)
            except Exception as error:
359
360
361
362
363
                dname = device.get("name", device.get("class", "unknown"))
                self._log.error(
                    "error creating device %s (will not be available): %s", dname, error
                )
                self._log.debug("details: %s", error, exc_info=1)
364

365
366
367
368
369
    def terminate(self):
        for device in self.devices:
            for tp in device.transports:
                tp.terminate()

370
    def create_device(self, device_info):
371
372
373
        klass_name = device_info.get("class")
        name = device_info.get("name", klass_name)
        self._log.info("Creating device %s (%r)", name, klass_name)
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
        device, transports = create_device(device_info)
        self.devices[device] = transports
        return device, transports

    def get_device_by_name(self, name):
        for device in self.devices:
            if device.name == name:
                return device

    def start(self):
        for device in self.devices:
            for interface in self.devices[device]:
                interface.start()

    def stop(self):
        for device in self.devices:
            for interface in self.devices[device]:
                interface.stop()

    def serve_forever(self):
        stop_events = []
        for device in self.devices:
            for interface in self.devices[device]:
                stop_events.append(interface._stop_event)
        self.start()
        try:
            gevent.joinall(stop_events)
        finally:
            self.stop()

    def __str__(self):
405
        return "{0}({1})".format(self.__class__.__name__, self.name)
406
407


408
409
def create_device(device_info):
    device_info = dict(device_info)
410
411
412
413
    class_name = device_info.pop("class")
    module_name = device_info.pop("module", class_name.lower())
    package_name = device_info.pop("package", None)
    name = device_info.pop("name", class_name)
414
415

    if package_name is None:
Lucas Felix's avatar
Lucas Felix committed
416
        package_name = "tests.emulators." + module_name
417
418
419

    __import__(package_name)
    package = sys.modules[package_name]
420
421
    klass = getattr(package, class_name)
    device = klass(name, **device_info)
422

423
    transports_info = device_info.pop("transports", ())
424
425
426
    transports = []
    for interface_info in transports_info:
        ikwargs = dict(interface_info)
427
428
        itype = ikwargs.pop("type", "tcp")
        if itype == "tcp":
429
            iklass = TCPServer
430
        elif itype == "serial":
431
            iklass = SerialServer
432
        ikwargs["device"] = device
433
434
435
436
437
438
439
        transports.append(iklass(**ikwargs))
    device.transports = transports
    return device, transports


def create_server_from_config(config, name):
    cfg = config.get_config(name)
440
    backdoor, devices = cfg.get("backdoor", None), cfg.get("devices", ())
441
442
443
444
445
446
447
    return Server(name=name, devices=devices, backdoor=backdoor)


def main():
    import argparse
    from bliss.config.static import get_config

448
449
450
451
452
453
454
455
456
457
    parser = argparse.ArgumentParser(description=__doc__.split("\n")[1])
    parser.add_argument(
        "name", help="server name as defined in the static configuration"
    )
    parser.add_argument(
        "--log-level",
        default="WARNING",
        help="log level",
        choices=["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"],
    )
458
459
    args = parser.parse_args()

460
    fmt = "%(asctime)-15s %(levelname)-5s %(name)s: %(message)s"
461
462
463
464
465
466
467
468
469
470
    level = getattr(logging, args.log_level.upper())
    logging.basicConfig(format=fmt, level=level)
    config = get_config()

    server = create_server_from_config(config, args.name)

    try:
        server.serve_forever()
    except KeyboardInterrupt:
        print("\nCtrl-C Pressed. Bailing out...")
471
472
        try:
            server.terminate()
473
            print("Server terminated... I'll be back.")
474
        except:
475
476
            print("No terminate function for server or error in terminating.")

477
478
479

if __name__ == "__main__":
    main()