Commit a93eecfd authored by Matias Guijarro's avatar Matias Guijarro
Browse files

Merge branch 'tcp_command_fixes' into 'master'

Tcp command fixes

Closes #90

See merge request !391
parents f33880fa 232a5093
......@@ -23,6 +23,8 @@ from .exceptions import CommunicationError, CommunicationTimeout
from ..common.greenlet_utils import KillMask
from .util import HexMsg
from bliss.common.task_utils import error_cleanup
class SocketTimeout(CommunicationTimeout):
'''Socket timeout error'''
......@@ -299,19 +301,26 @@ class Command:
return self
def __exit__(self, *args):
while not self.__transaction.empty():
read_value = self.__transaction.get()
if not isinstance(read_value,socket.error):
self.data += self.__transaction.get()
with self.__socket._lock:
try:
trans_index = self.__socket._transaction_list.index(self.__transaction)
except ValueError: # not in list weird
return
if trans_index is 0:
while not self.__transaction.empty():
read_value = self.__transaction.get()
if not isinstance(read_value,socket.error):
self.data += read_value
if self.__clear_transaction and \
len(self.__socket._transaction_list) > 1:
self.__socket._transaction_list[1].put(self.data)
else:
self.__transaction.put(self.data)
if self.__clear_transaction and \
len(self.__socket._transaction_list) > 1:
self.__socket._transaction_list[1].put(self.data)
else:
self.__transaction.put(self.data)
if self.__clear_transaction:
self.__socket._transaction_list.pop(0)
if self.__clear_transaction:
self.__socket._transaction_list.pop(trans_index)
def __init__(self, host, port,
eol='\n', # end of line for each rx message
......@@ -429,7 +438,8 @@ class Command:
if transaction is None and create_transaction:
transaction = self.new_transaction()
self._debug("Tx: %r %r",msg, HexMsg(msg))
self._fd.sendall(msg)
with error_cleanup(self._pop_transaction,transaction=transaction):
self._fd.sendall(msg)
return transaction
def write(self, msg, timeout=None):
......@@ -487,10 +497,11 @@ class Command:
try:
while(1):
raw_data = fd.recv(16 * 1024)
if raw_data and command._transaction_list:
command._transaction_list[0].put(raw_data)
else:
break
with command._lock:
if raw_data and command._transaction_list:
command._transaction_list[0].put(raw_data)
else:
break
except:
pass
finally:
......@@ -499,8 +510,9 @@ class Command:
command._connected = False
command._fd = None
#inform all pending transaction that the socket is closed
for trans in command._transaction_list:
trans.put(socket.error(errno.EPIPE,"Broken pipe"))
with command._lock:
for trans in command._transaction_list:
trans.put(socket.error(errno.EPIPE,"Broken pipe"))
except ReferenceError:
pass
......@@ -510,6 +522,9 @@ class Command:
self._transaction_list.append(data_queue)
return data_queue
def _pop_transaction(self,transaction=None):
index = self._transaction_list.index(transaction)
self._transaction_list.pop(index)
class TcpError(CommunicationError):
'''TCP communication error'''
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment