qgevent.py 8.29 KB
Newer Older
1 2 3 4 5 6
"""A Qt event dispatcher based on the gevent event loop.

Example usage:

    import sys
    from PyQt5.QtWidgets import QApplication
7
    from bliss.flint.qgevent import set_gevent_dispatcher
8 9 10 11 12 13 14 15 16 17 18 19 20 21

    if __name__ == '__main__':
        set_gevent_dispatcher()
        app = QApplication(sys.argv)
        sys.exit(app.exec_())

"""

import os
import time

import gevent
import gevent.select

22
import weakref
23
from PyQt5.QtWidgets import QApplication
24 25 26 27 28 29 30
from PyQt5.QtCore import (
    QCoreApplication,
    QAbstractEventDispatcher,
    QEventLoop,
    QTimerEvent,
    QEvent,
)
31

32
from bliss.flint import qwindowsystem
33 34 35 36

__all__ = ["QGeventDispatcher", "set_gevent_dispatcher"]


37 38 39 40 41 42 43 44
def synchronize(fn):
    def _(self, *args, **kwargs):
        with self._lock:
            return fn(self, *args, **kwargs)

    return _


45 46 47 48 49 50 51 52 53 54 55 56 57 58
class QGeventDispatcher(QAbstractEventDispatcher):
    """A Qt event dispatcher based on the gevent event loop.

    It allows for gevent concurrency within a qt application.
    """

    def __init__(self):
        # Parent call
        super(QGeventDispatcher, self).__init__()

        # Pipe for thread-safe communication
        self._read_pipe, self._write_pipe = os.pipe()

        # {obj: {tid: timer_info}} dictionary
59
        self._timer_infos = weakref.WeakKeyDictionary()
60 61 62 63 64 65 66 67 68 69 70
        # {tid: (obj, timer_task)} dictionary
        self._timer_tasks = {}

        # Notifier dictionaries
        self._read_notifiers = {}
        self._write_notifiers = {}
        self._error_notifiers = {}

        # Internal flag
        self._interrupted = False

71 72
        self._lock = gevent.lock.RLock()

73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
    # Thread-safe communication

    def wakeUp(self):
        """Wake the event loop up.

        Thread-safe.
        """
        os.write(self._write_pipe, b".")

    def interrupt(self):
        """Interrupt processEvents and wake up the event loop.

        Thread-safe.
        """
        self._interrupted = True
        self.wakeUp()

    # Event management

    def flush(self):
        """Send posted events."""
        QApplication.sendPostedEvents()

    def hasPendingEvents(self):
        """True if the QApplication or the WindowSystemInterface
        has events to process, False otherwise
        """
        return bool(
            qwindowsystem.globalPostedEventsCount()
            or qwindowsystem.windowSystemEventsQueued()
        )

    def processEvents(self, flags):
        """Process QApplication, WindowSystemInterface and socket events."""
        # Reset interrupted flag
        self._interrupted = False

        # Emit awake signal
        self.awake.emit()

        # Send all posted events
        QApplication.sendPostedEvents()

        # Check for interruption
        if self._interrupted:
            return False

        # Manage flags
        exclude_notifiers = flags & QEventLoop.ExcludeSocketNotifiers
        wait_for_more_events = flags & QEventLoop.WaitForMoreEvents

        # Emit about-to-block signal
        if wait_for_more_events:
            self.aboutToBlock.emit()
            timeout = None
        else:
129
            timeout = 0.0
130 131 132 133 134 135 136 137 138 139 140

        # Poll the file descriptors
        rlist = [self._read_pipe]
        rlist += [] if exclude_notifiers else list(self._read_notifiers)
        wlist = [] if exclude_notifiers else list(self._write_notifiers)
        elist = [] if exclude_notifiers else list(self._error_notifiers)
        read_events, write_events, error_events = gevent.select.select(
            rlist, wlist, elist, timeout=timeout
        )
        # Flush the thread communication pipe
        if self._read_pipe in read_events:
141 142 143 144 145 146
            while True:
                os.read(self._read_pipe, 64 * 1024)
                r, _, _ = gevent.select.select([self._read_pipe], [], [], 0)
                if not r:
                    break

147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
            read_events.remove(self._read_pipe)

        # Get all activated notifiers
        all_events = read_events, write_events, error_events
        all_notifiers = (
            self._read_notifiers,
            self._write_notifiers,
            self._error_notifiers,
        )
        notifiers = [
            notifiers[event]
            for events, notifiers in zip(all_events, all_notifiers)
            for event in events
        ]

        # Send all notifier events
        for notifier in notifiers:
            QApplication.sendEvent(notifier, QEvent(QEvent.SockAct))

        # Process window system interface events
        wsi_events = qwindowsystem.sendWindowSystemEvents(flags)

        # Return True if some events have been processed, False otherwise
        return bool(wsi_events or notifiers)

    # Timers
173
    @synchronize
174 175 176 177 178 179
    def registeredTimers(self, obj):
        """Get the timer info for all registered timers for a given object."""
        if obj not in self._timer_infos:
            return []
        return list(self._timer_infos[obj].values())

180
    @synchronize
181
    def registerTimer(self, tid, interval, ttype, obj):
182

183
        """Register a new timer, given a unique tid."""
184 185 186
        d = self._timer_infos.setdefault(obj, dict())
        d[tid] = self.TimerInfo(tid, interval, ttype)
        wobj = weakref.ref(obj)
187
        self._timer_tasks[tid] = (
188 189
            wobj,
            gevent.spawn(self._timer_run, interval / 1000.0, wobj, tid),
190 191
        )

192
    @synchronize
193
    def unregisterTimer(self, tid):
194

195
        """Unregister the timer corresponding to the given tid."""
196 197
        wobj, timer_task = self._timer_tasks.pop(tid, (None, None))
        if wobj is None:
198
            return True
199 200 201
        obj = wobj()
        if obj is not None:
            self._timer_infos[obj].pop(tid)
202 203 204
        timer_task.kill()
        return True

205
    @synchronize
206 207 208 209 210 211 212 213 214
    def unregisterTimers(self, obj):
        """Unregister all the timers corresponding to the given object."""
        if obj not in self._timer_infos:
            return False
        tids = list(self._timer_infos[obj])
        for tid in tids:
            self.unregisterTimer(tid)
        return True

215 216 217 218 219 220 221 222
    @staticmethod
    def _post_event(wobj, tid):
        obj = wobj()
        if obj is None:
            raise RuntimeError

        QCoreApplication.postEvent(obj, QTimerEvent(tid))

223 224 225 226 227
    def _timer_run(self, interval, obj, tid):
        """Target for the timer background tasks."""
        deadline = time.time()
        while True:
            deadline += interval
228
            sleep_time = max(0, deadline - time.time())
229
            # Sleep to the deadline
230 231
            if sleep_time < 1e-6:
                gevent.sleep(0)  # idle()
232 233 234
                # obviously we don't follow
                # synchronize  with the clock
                deadline = time.time()
235 236
            else:
                gevent.sleep(sleep_time)
237 238
            try:
                # Use postEvent to avoid auto cancellation
239 240 241 242 243 244
                if self._timer_tasks.get(tid, (None, None)) != (
                    obj,
                    gevent.getcurrent(),
                ):
                    break
                self._post_event(obj, tid)
245 246 247
            except RuntimeError:
                # obj is dead, cannot post event
                break
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283

    # Sockets

    def registerSocketNotifier(self, notifier):
        fd = int(notifier.socket())
        types = notifier.Read, notifier.Write, notifier.Exception
        dicts = self._read_notifiers, self._write_notifiers, self._error_notifiers
        # Find the right notifier type
        for ntype, fds_dict in zip(types, dicts):
            if notifier.type() != ntype:
                continue
            # Prevent multiple registration
            if fd in fds_dict:
                raise ValueError(
                    "A notifier for socket {} and type {}"
                    "has already been registered".format(fd, notifier.type())
                )
            # Register the notifier
            fds_dict[fd] = notifier
            return

    def unregisterSocketNotifier(self, notifier):
        types = notifier.Read, notifier.Write, notifier.Exception
        dicts = self._read_notifiers, self._write_notifiers, self._error_notifiers
        # Find the right notifier type
        for ntype, fds_dict in zip(types, dicts):
            if notifier.type() != ntype:
                continue
            # Unregister the notifier
            fds_dict.pop(int(notifier.socket()))
            return


def set_gevent_dispatcher():
    """Set gevent qt event dispatcher."""
    QApplication.setEventDispatcher(QGeventDispatcher())