Single process, multiple greenlet: causes Redis connection exceptions
I have a tango server (this will be the external Bliss writer service) that launches multiple tango devices, each spawning a greenlet that starts listening to events of a particular Bliss session, so basically:
from bliss.data.node import get_session_node
...
node = get_session_node(session_name)
logger.info("Start listening to scans")
for event_type, node in session_node.iterator.walk_on_new_events(filter="scan"):
...
In the examples below I have three greenlets, listing to Bliss sessions "nexus_writer_config", "nexus_writer_base", and "test_session" respectively. So I have 1 process and 3 greenlet that execute the code above.
Both the calls the get_session_node
and walk_on_new_events
can cause exceptions. @matias.guijarro: is this expected behaviour? That is: listening to Redis in multiple greenlets in the same process may cause connection exceptions? If this is the case I should probably have one tango server instance for each tango device listening to Redis.
More details on the exceptions being thrown:
Issue 1. The concurrent calls to get_session_node
can cause exceptions.
Here the call get_session_node("test_session")
throws and exception while get_session_node("nexus_writer_config")
and get_session_node("nexus_writer_base")
return fine ("Start listening to scans" in output):
(miniconda3-latest) hpc3-2803:wout/dev/blissmain % BEACON_HOST=hpc3-2803:40208 TANGO_HOST=hpc3-2803:56347 NexusWriterService test --log=info
Unknown exception while trying to fill database cache...
Traceback (most recent call last):
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/bliss/config/conductor/connection.py", line 532, in _raw_read
raw_data = self._socket.recv(16 * 1024)
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/gevent/_socket3.py", line 382, in recv
self._wait(self._read_event)
File "src/gevent/_hub_primitives.py", line 284, in gevent.__hub_primitives.wait_on_socket
File "src/gevent/_hub_primitives.py", line 289, in gevent.__hub_primitives.wait_on_socket
File "src/gevent/_hub_primitives.py", line 264, in gevent.__hub_primitives._primitive_wait
gevent.exceptions.ConcurrentObjectUseError: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent.__waiter.Waiter object at 0x7fad3874d1d0>>
ERROR:nexus_writer_service.session_writer: [test_session (FAULT)] crashed with exception:
Traceback (most recent call last):
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/nexus_writer_service/session_writer.py", line 169, in _run
session_node = _get_session_node(self.session_name)
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/bliss/data/node.py", line 131, in get_session_node
return DataNodeContainer(None, session_name)
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/bliss/data/node.py", line 559, in __init__
**keys,
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/bliss/data/node.py", line 441, in __init__
connection = client.get_redis_connection(db=1)
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/bliss/config/conductor/client.py", line 44, in f
return func(*args, **keys)
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/bliss/config/conductor/client.py", line 105, in get_redis_connection
return connection.get_redis_connection(db=db)
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/bliss/config/conductor/connection.py", line 62, in f
return func(self, *args, **keys)
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/bliss/config/conductor/connection.py", line 348, in get_redis_connection
cnx = self.create_redis_connection(db=db)
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/bliss/config/conductor/connection.py", line 360, in create_redis_connection
address = self.get_redis_connection_address()
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/bliss/config/conductor/connection.py", line 62, in f
return func(self, *args, **keys)
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/bliss/config/conductor/connection.py", line 338, in get_redis_connection_address
self._socket.sendall(protocol.message(protocol.REDIS_QUERY))
AttributeError: 'NoneType' object has no attribute 'sendall'
INFO:nexus_writer_service.session_writer: [test_session (FAULT)] Session writer greenlet exits
Traceback (most recent call last):
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/bliss/config/conductor/connection.py", line 532, in _raw_read
raw_data = self._socket.recv(16 * 1024)
AttributeError: 'NoneType' object has no attribute 'recv'
Ready to accept request
INFO:nexus_writer_service.session_writer: [nexus_writer_config (ON)] Start listening to scans
INFO:nexus_writer_service.session_writer: [nexus_writer_base (ON)] Start listening to scans
Solution 1. I basically retry calling get_session_node
until I get the node (timeout=x seconds). Ok, so I can live with this.
Issue 2. The concurrent calls to walk_on_new_events
can cause exceptions.
INFO:nexus_writer_service.session_writer: [nexus_writer_base (ON)] Start listening to scans
INFO:nexus_writer_service.session_writer: [test_session (ON)] Start listening to scans
Traceback (most recent call last):
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/bliss/config/conductor/connection.py", line 532, in _raw_read
raw_data = self._socket.recv(16 * 1024)
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/gevent/_socket3.py", line 382, in recv
self._wait(self._read_event)
File "src/gevent/_hub_primitives.py", line 284, in gevent.__hub_primitives.wait_on_socket
File "src/gevent/_hub_primitives.py", line 289, in gevent.__hub_primitives.wait_on_socket
File "src/gevent/_hub_primitives.py", line 264, in gevent.__hub_primitives._primitive_wait
gevent.exceptions.ConcurrentObjectUseError: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent.__waiter.Waiter object at 0x7f9a6c5ebd70>>
ERROR:nexus_writer_service.session_writer: [test_session (FAULT)] crashed with exception:
Traceback (most recent call last):
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/redis/connection.py", line 183, in _read_from_socket
data = recv(self._sock, socket_read_size)
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/redis/_compat.py", line 58, in recv
return sock.recv(*args, **kwargs)
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/gevent/_socket3.py", line 382, in recv
self._wait(self._read_event)
File "src/gevent/_hub_primitives.py", line 284, in gevent.__hub_primitives.wait_on_socket
File "src/gevent/_hub_primitives.py", line 289, in gevent.__hub_primitives.wait_on_socket
File "src/gevent/_hub_primitives.py", line 271, in gevent.__hub_primitives._primitive_wait
File "src/gevent/_hub_primitives.py", line 46, in gevent.__hub_primitives.WaitOperationsGreenlet.wait
File "src/gevent/_hub_primitives.py", line 46, in gevent.__hub_primitives.WaitOperationsGreenlet.wait
File "src/gevent/_hub_primitives.py", line 55, in gevent.__hub_primitives.WaitOperationsGreenlet.wait
File "src/gevent/_waiter.py", line 151, in gevent.__waiter.Waiter.get
File "src/gevent/_greenlet_primitives.py", line 60, in gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/_greenlet_primitives.py", line 60, in gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/_greenlet_primitives.py", line 64, in gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/__greenlet_primitives.pxd", line 35, in gevent.__greenlet_primitives._greenlet_switch
gevent._socketcommon.cancel_wait_ex: [Errno 9] File descriptor was closed in another greenlet
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/nexus_writer_service/session_writer.py", line 175, in _run
filter="scan"
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/bliss/data/node.py", line 313, in walk_on_new_events
self.jumpahead()
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/bliss/data/node.py", line 303, in jumpahead
data_node_2_children = self._get_grandchildren(db_name)
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/bliss/data/node.py", line 254, in _get_grandchildren
"%s*_children_list" % db_name, connection=self.node.db_connection
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/bliss/data/node.py", line 252, in <listcomp>
x
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/bliss/config/settings.py", line 179, in scan
cursor, values = connection.scan(cursor=cursor, match=match, count=count)
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/redis/client.py", line 1822, in scan
return self.execute_command('SCAN', *pieces)
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/redis/client.py", line 836, in execute_command
conn = self.connection or pool.get_connection(command_name, **options)
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/redis/connection.py", line 1071, in get_connection
connection.connect()
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/redis/connection.py", line 547, in connect
self.on_connect()
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/redis/connection.py", line 623, in on_connect
if nativestr(self.read_response()) != 'OK':
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/redis/connection.py", line 699, in read_response
response = self._parser.read_response()
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/redis/connection.py", line 309, in read_response
response = self._buffer.readline()
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/redis/connection.py", line 241, in readline
self._read_from_socket()
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/redis/connection.py", line 208, in _read_from_socket
(ex.args,))
redis.exceptions.ConnectionError: Error while reading from socket: (9, 'File descriptor was closed in another greenlet')
Ready to accept request
INFO:nexus_writer_service.session_writer: [test_session (FAULT)] Session writer greenlet exits
ERROR:nexus_writer_service.session_writer: [nexus_writer_base (FAULT)] crashed with exception:
Traceback (most recent call last):
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/redis/connection.py", line 183, in _read_from_socket
data = recv(self._sock, socket_read_size)
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/redis/_compat.py", line 58, in recv
return sock.recv(*args, **kwargs)
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/gevent/_socket3.py", line 382, in recv
self._wait(self._read_event)
File "src/gevent/_hub_primitives.py", line 284, in gevent.__hub_primitives.wait_on_socket
File "src/gevent/_hub_primitives.py", line 289, in gevent.__hub_primitives.wait_on_socket
File "src/gevent/_hub_primitives.py", line 271, in gevent.__hub_primitives._primitive_wait
File "src/gevent/_hub_primitives.py", line 46, in gevent.__hub_primitives.WaitOperationsGreenlet.wait
File "src/gevent/_hub_primitives.py", line 46, in gevent.__hub_primitives.WaitOperationsGreenlet.wait
File "src/gevent/_hub_primitives.py", line 55, in gevent.__hub_primitives.WaitOperationsGreenlet.wait
File "src/gevent/_waiter.py", line 151, in gevent.__waiter.Waiter.get
File "src/gevent/_greenlet_primitives.py", line 60, in gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/_greenlet_primitives.py", line 60, in gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/_greenlet_primitives.py", line 64, in gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/__greenlet_primitives.pxd", line 35, in gevent.__greenlet_primitives._greenlet_switch
gevent._socketcommon.cancel_wait_ex: [Errno 9] File descriptor was closed in another greenlet
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/nexus_writer_service/session_writer.py", line 175, in _run
filter="scan"
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/bliss/data/node.py", line 313, in walk_on_new_events
self.jumpahead()
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/bliss/data/node.py", line 303, in jumpahead
data_node_2_children = self._get_grandchildren(db_name)
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/bliss/data/node.py", line 254, in _get_grandchildren
"%s*_children_list" % db_name, connection=self.node.db_connection
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/bliss/data/node.py", line 252, in <listcomp>
x
File "/mntdirect/_data_id21_inhouse/wout/dev/blissmain/bliss/config/settings.py", line 179, in scan
cursor, values = connection.scan(cursor=cursor, match=match, count=count)
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/redis/client.py", line 1822, in scan
return self.execute_command('SCAN', *pieces)
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/redis/client.py", line 836, in execute_command
conn = self.connection or pool.get_connection(command_name, **options)
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/redis/connection.py", line 1071, in get_connection
connection.connect()
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/redis/connection.py", line 547, in connect
self.on_connect()
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/redis/connection.py", line 623, in on_connect
if nativestr(self.read_response()) != 'OK':
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/redis/connection.py", line 699, in read_response
response = self._parser.read_response()
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/redis/connection.py", line 309, in read_response
response = self._buffer.readline()
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/redis/connection.py", line 241, in readline
self._read_from_socket()
File "/data/id21/inhouse/wout/dev/virtualenvs/rnice8/bliss/py37/lib/python3.7/site-packages/redis/connection.py", line 208, in _read_from_socket
(ex.args,))
redis.exceptions.ConnectionError: Error while reading from socket: (9, 'File descriptor was closed in another greenlet')
INFO:nexus_writer_service.session_writer: [nexus_writer_base (FAULT)] Session writer greenlet exits
INFO:nexus_writer_service.session_writer: [nexus_writer_config (ON)] Start listening to scans
Solution 2. I could take the same approach as for issue 1 (retry until timeout) but I'm not so fond of that.