Commit 1e056e21 authored by Matias Guijarro's avatar Matias Guijarro

conductor/connection.py: renamed self._fd => self._socket since it is a socket, not a fd

parent 7fe90f13
...@@ -126,7 +126,6 @@ class Connection(object): ...@@ -126,7 +126,6 @@ class Connection(object):
cnt._message_queue.pop(self._message_key, None) cnt._message_queue.pop(self._message_key, None)
def __init__(self, host=None, port=None): def __init__(self, host=None, port=None):
self._socket = None
if host is None: if host is None:
host = os.environ.get("BEACON_HOST") host = os.environ.get("BEACON_HOST")
if host is not None and ":" in host: if host is not None and ":" in host:
...@@ -149,14 +148,14 @@ class Connection(object): ...@@ -149,14 +148,14 @@ class Connection(object):
self._message_queue = {} self._message_queue = {}
self._redis_connection = {} self._redis_connection = {}
self._clean() self._clean()
self._fd = None self._socket = None
self._raw_read_task = None self._raw_read_task = None
self._greenlet_to_lockobjects = weakref.WeakKeyDictionary() self._greenlet_to_lockobjects = weakref.WeakKeyDictionary()
def close(self): def close(self):
if self._fd: if self._socket:
self._fd.close() self._socket.close()
self._fd = None self._socket = None
if self._raw_read_task is not None: if self._raw_read_task is not None:
self._raw_read_task.join() self._raw_read_task.join()
self._raw_read_task = None self._raw_read_task = None
...@@ -175,7 +174,7 @@ class Connection(object): ...@@ -175,7 +174,7 @@ class Connection(object):
def connect(self): def connect(self):
# Already connected # Already connected
if self._fd is not None: if self._socket is not None:
return return
# Address undefined # Address undefined
...@@ -184,10 +183,10 @@ class Connection(object): ...@@ -184,10 +183,10 @@ class Connection(object):
# UDS connection # UDS connection
if self.uds: if self.uds:
self._fd = self._uds_connect(self.uds) self._socket = self._uds_connect(self.uds)
# TCP connection # TCP connection
else: else:
self._fd = self._tcp_connect(self._host, self._port) self._socket = self._tcp_connect(self._host, self._port)
# Spawn read task # Spawn read task
self._raw_read_task = gevent.spawn(self._raw_read) self._raw_read_task = gevent.spawn(self._raw_read)
...@@ -246,27 +245,27 @@ class Connection(object): ...@@ -246,27 +245,27 @@ class Connection(object):
return self._discovery(host, timeout=timeout) return self._discovery(host, timeout=timeout)
def _tcp_connect(self, host, port): def _tcp_connect(self, host, port):
fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
fd.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
fd.setsockopt(socket.SOL_IP, socket.IP_TOS, 0x10) sock.setsockopt(socket.SOL_IP, socket.IP_TOS, 0x10)
try: try:
fd.connect((host, port)) sock.connect((host, port))
except IOError: except IOError:
raise RuntimeError( raise RuntimeError(
"Conductor server on host `{}:{}' does not reply (check beacon server)".format( "Conductor server on host `{}:{}' does not reply (check beacon server)".format(
host, port host, port
) )
) )
return fd return sock
def _uds_connect(self, uds_path): def _uds_connect(self, uds_path):
fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
fd.connect(uds_path) sock.connect(uds_path)
return fd return sock
def _uds_query(self, timeout=1.0): def _uds_query(self, timeout=1.0):
self._uds_query_event.clear() self._uds_query_event.clear()
self._fd.sendall( self._socket.sendall(
protocol.message(protocol.UDS_QUERY, socket.gethostname().encode()) protocol.message(protocol.UDS_QUERY, socket.gethostname().encode())
) )
self._uds_query_event.wait(timeout) self._uds_query_event.wait(timeout)
...@@ -282,7 +281,9 @@ class Connection(object): ...@@ -282,7 +281,9 @@ class Connection(object):
timeout, RuntimeError("lock timeout (%s)" % str(devices_name)) timeout, RuntimeError("lock timeout (%s)" % str(devices_name))
): ):
while 1: while 1:
self._fd.sendall(protocol.message(protocol.LOCK, wait_lock.msg())) self._socket.sendall(
protocol.message(protocol.LOCK, wait_lock.msg())
)
status = wait_lock.get() status = wait_lock.get()
if status == protocol.LOCK_OK_REPLY: if status == protocol.LOCK_OK_REPLY:
break break
...@@ -304,7 +305,7 @@ class Connection(object): ...@@ -304,7 +305,7 @@ class Connection(object):
with gevent.Timeout( with gevent.Timeout(
timeout, RuntimeError("unlock timeout (%s)" % str(devices_name)) timeout, RuntimeError("unlock timeout (%s)" % str(devices_name))
): ):
self._fd.sendall(protocol.message(protocol.UNLOCK, msg)) self._socket.sendall(protocol.message(protocol.UNLOCK, msg))
locked_objects = self._greenlet_to_lockobjects.setdefault( locked_objects = self._greenlet_to_lockobjects.setdefault(
gevent.getcurrent(), dict() gevent.getcurrent(), dict()
) )
...@@ -326,7 +327,7 @@ class Connection(object): ...@@ -326,7 +327,7 @@ class Connection(object):
): ):
while self._redis_host is None: while self._redis_host is None:
self._redis_query_event.clear() self._redis_query_event.clear()
self._fd.sendall(protocol.message(protocol.REDIS_QUERY)) self._socket.sendall(protocol.message(protocol.REDIS_QUERY))
self._redis_query_event.wait() self._redis_query_event.wait()
return self._redis_host, self._redis_port return self._redis_host, self._redis_port
...@@ -360,7 +361,7 @@ class Connection(object): ...@@ -360,7 +361,7 @@ class Connection(object):
with gevent.Timeout(timeout, RuntimeError("Can't get configuration file")): with gevent.Timeout(timeout, RuntimeError("Can't get configuration file")):
with self.WaitingQueue(self) as wq: with self.WaitingQueue(self) as wq:
msg = b"%s|%s" % (wq.message_key(), file_path.encode()) msg = b"%s|%s" % (wq.message_key(), file_path.encode())
self._fd.sendall(protocol.message(protocol.CONFIG_GET_FILE, msg)) self._socket.sendall(protocol.message(protocol.CONFIG_GET_FILE, msg))
value = wq.get() value = wq.get()
if isinstance(value, RuntimeError): if isinstance(value, RuntimeError):
raise value raise value
...@@ -372,7 +373,7 @@ class Connection(object): ...@@ -372,7 +373,7 @@ class Connection(object):
with gevent.Timeout(timeout, RuntimeError("Can't get configuration tree")): with gevent.Timeout(timeout, RuntimeError("Can't get configuration tree")):
with self.WaitingQueue(self) as wq: with self.WaitingQueue(self) as wq:
msg = b"%s|%s" % (wq.message_key(), base_path.encode()) msg = b"%s|%s" % (wq.message_key(), base_path.encode())
self._fd.sendall(protocol.message(protocol.CONFIG_GET_DB_TREE, msg)) self._socket.sendall(protocol.message(protocol.CONFIG_GET_DB_TREE, msg))
value = wq.get() value = wq.get()
if isinstance(value, RuntimeError): if isinstance(value, RuntimeError):
raise value raise value
...@@ -386,7 +387,7 @@ class Connection(object): ...@@ -386,7 +387,7 @@ class Connection(object):
with gevent.Timeout(timeout, RuntimeError("Can't remove configuration file")): with gevent.Timeout(timeout, RuntimeError("Can't remove configuration file")):
with self.WaitingQueue(self) as wq: with self.WaitingQueue(self) as wq:
msg = b"%s|%s" % (wq.message_key(), file_path.encode()) msg = b"%s|%s" % (wq.message_key(), file_path.encode())
self._fd.sendall(protocol.message(protocol.CONFIG_REMOVE_FILE, msg)) self._socket.sendall(protocol.message(protocol.CONFIG_REMOVE_FILE, msg))
for rx_msg in wq.queue(): for rx_msg in wq.queue():
print(rx_msg) print(rx_msg)
...@@ -399,7 +400,7 @@ class Connection(object): ...@@ -399,7 +400,7 @@ class Connection(object):
src_path.encode(), src_path.encode(),
dst_path.encode(), dst_path.encode(),
) )
self._fd.sendall(protocol.message(protocol.CONFIG_MOVE_PATH, msg)) self._socket.sendall(protocol.message(protocol.CONFIG_MOVE_PATH, msg))
for rx_msg in wq.queue(): for rx_msg in wq.queue():
print(rx_msg) print(rx_msg)
...@@ -409,7 +410,7 @@ class Connection(object): ...@@ -409,7 +410,7 @@ class Connection(object):
with gevent.Timeout(timeout, RuntimeError("Can't get configuration file")): with gevent.Timeout(timeout, RuntimeError("Can't get configuration file")):
with self.WaitingQueue(self) as wq: with self.WaitingQueue(self) as wq:
msg = b"%s|%s" % (wq.message_key(), base_path.encode()) msg = b"%s|%s" % (wq.message_key(), base_path.encode())
self._fd.sendall( self._socket.sendall(
protocol.message(protocol.CONFIG_GET_DB_BASE_PATH, msg) protocol.message(protocol.CONFIG_GET_DB_BASE_PATH, msg)
) )
for rx_msg in wq.queue(): for rx_msg in wq.queue():
...@@ -430,7 +431,7 @@ class Connection(object): ...@@ -430,7 +431,7 @@ class Connection(object):
file_path.encode(), file_path.encode(),
content.encode(), content.encode(),
) )
self._fd.sendall(protocol.message(protocol.CONFIG_SET_DB_FILE, msg)) self._socket.sendall(protocol.message(protocol.CONFIG_SET_DB_FILE, msg))
for rx_msg in wq.queue(): for rx_msg in wq.queue():
raise rx_msg raise rx_msg
...@@ -440,7 +441,7 @@ class Connection(object): ...@@ -440,7 +441,7 @@ class Connection(object):
with gevent.Timeout(timeout, RuntimeError("Can't get python modules")): with gevent.Timeout(timeout, RuntimeError("Can't get python modules")):
with self.WaitingQueue(self) as wq: with self.WaitingQueue(self) as wq:
msg = b"%s|%s" % (wq.message_key(), base_path.encode()) msg = b"%s|%s" % (wq.message_key(), base_path.encode())
self._fd.sendall( self._socket.sendall(
protocol.message(protocol.CONFIG_GET_PYTHON_MODULE, msg) protocol.message(protocol.CONFIG_GET_PYTHON_MODULE, msg)
) )
for rx_msg in wq.queue(): for rx_msg in wq.queue():
...@@ -490,7 +491,7 @@ class Connection(object): ...@@ -490,7 +491,7 @@ class Connection(object):
try: try:
data = b"" data = b""
while True: while True:
raw_data = self._fd.recv(16 * 1024) raw_data = self._socket.recv(16 * 1024)
if not raw_data: if not raw_data:
break break
data = b"%s%s" % (data, raw_data) data = b"%s%s" % (data, raw_data)
...@@ -501,7 +502,7 @@ class Connection(object): ...@@ -501,7 +502,7 @@ class Connection(object):
break break
try: try:
# print 'rx',messageType # print 'rx',messageType
if self._lock_mgt(self._fd, messageType, message): if self._lock_mgt(self._socket, messageType, message):
continue continue
elif messageType in ( elif messageType in (
protocol.CONFIG_GET_FILE_OK, protocol.CONFIG_GET_FILE_OK,
...@@ -545,12 +546,12 @@ class Connection(object): ...@@ -545,12 +546,12 @@ class Connection(object):
elif messageType == protocol.UDS_OK: elif messageType == protocol.UDS_OK:
try: try:
uds_path = message.decode() uds_path = message.decode()
fd = self._uds_connect(uds_path) sock = self._uds_connect(uds_path)
except socket.error: except socket.error:
raise raise
else: else:
self._fd.close() self._socket.close()
self._fd = fd self._socket = sock
self._port = uds_path self._port = uds_path
finally: finally:
self._uds_query_event.set() self._uds_query_event.set()
...@@ -571,9 +572,9 @@ class Connection(object): ...@@ -571,9 +572,9 @@ class Connection(object):
except: except:
sys.excepthook(*sys.exc_info()) sys.excepthook(*sys.exc_info())
finally: finally:
if self._fd: if self._socket:
self._fd.close() self._socket.close()
self._fd = None self._socket = None
self._clean() self._clean()
def _clean(self): def _clean(self):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment