Commit 1caf9975 authored by Sebastien Petitdemange's avatar Sebastien Petitdemange

Merge branch '223-beacon-does-not-respond-to-concurrent-client-broadcast-requests' into 'master'

Resolve "Beacon does not respond to concurrent client broadcast requests"

Closes #223

See merge request !566
parents d0630127 ddd12a70
Pipeline #1791 passed with stages
in 5 minutes and 26 seconds
......@@ -165,9 +165,7 @@ class Connection(object):
#try to find the server on the same sub-net
if host is None or port is None:
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
udp.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
udp.bind(("",protocol.DEFAULT_UDP_CLIENT_PORT))
# go through all interfaces, and issue broadcast on each
for addr in ip4_broadcast_addresses(host is None):
udp.sendto('Hello',(addr,protocol.DEFAULT_UDP_SERVER_PORT))
......
......@@ -12,6 +12,7 @@ import os
import sys
import codecs
import shutil
import logging
import argparse
import weakref
import subprocess
......@@ -65,7 +66,10 @@ _options = None
_lock_object = {}
_client_to_object = weakref.WeakKeyDictionary()
_waiting_lock = weakref.WeakKeyDictionary()
_log = logging.getLogger('beacon')
_tlog = _log.getChild('tango')
_rlog = _log.getChild('redis')
_wlog = _log.getChild('web')
# Helpers
......@@ -157,7 +161,7 @@ def _lock(client_id,prio,lock_obj,raw_message) :
with _WaitStolenReply(stolen_lock) as w:
w.wait(3.)
except RuntimeError:
print "Warning: some client(s) didn't reply to the stolen lock"
_log.warning("some client(s) didn't reply to the stolen lock")
obj_already_locked = _client_to_object.get(client_id,set())
_client_to_object[client_id] = set(lock_obj).union(obj_already_locked)
......@@ -542,7 +546,7 @@ def _client_rx(client,local_connection):
break
except:
sys.excepthook(*sys.exc_info())
print 'Error with client id %s, close it' % client
_log.error('Error with client id %r, close it', client)
raise
if fd == client:
......@@ -566,14 +570,14 @@ def start_webserver(webapp_port, beacon_port, debug=True):
try:
import flask
except ImportError:
print "[WEB] flask cannot be imported: web application won't be available"
_wlog.error("flask cannot be imported: web application won't be available")
return
from gevent.wsgi import WSGIServer
from werkzeug.debug import DebuggedApplication
from .web.config_app import web_app
print "[WEB] Web application sitting on port:", webapp_port
_wlog.info("Web application sitting on port: %s", webapp_port)
web_app.debug = debug
web_app.beacon_port = beacon_port
http_server = WSGIServer(('', webapp_port), DebuggedApplication(web_app, evalex=True))
......@@ -611,9 +615,17 @@ def main(args=None):
help="web server port (default to 0: disable)")
parser.add_argument("--redis_socket", dest="redis_socket", default="/tmp/redis.sock",
help="Unix socket for redis (default to /tmp/redis.sock)")
parser.add_argument('--log_level', default='INFO', type=str,
help='log level',
choices=['DEBUG', 'INFO', 'WARN', 'ERROR'])
global _options
_options = parser.parse_args(args)
log_level = _options.log_level.upper()
log_fmt = '%(levelname)s %(asctime)-15s %(name)s: %(message)s'
logging.basicConfig(level=log_level, format=log_fmt)
# Binds system signals.
signal.signal(signal.SIGTERM, sigterm_handler)
signal.signal(signal.SIGINT, sigterm_handler)
......@@ -640,13 +652,13 @@ def main(args=None):
tcp.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
tcp.bind(("",_options.port))
beacon_port = tcp.getsockname()[1]
print "[beacon] server sitting on port:", beacon_port
print "[beacon] configuration path:", _options.db_path
_log.info("server sitting on port: %s", beacon_port)
_log.info("configuration path: %s", _options.db_path)
tcp.listen(512) # limit to 512 clients
#Tango databaseds
if _options.tango_port > 0:
print '[TANGO] Database started on port:',_options.tango_port
_tlog.info('Database started on port:',_options.tango_port)
tango_rp,tango_wp = os.pipe()
child_pid = os.fork()
if child_pid == 0:
......@@ -682,11 +694,12 @@ def main(args=None):
fd_list = [udp,tcp,rp,sig_read]
if tango_rp:
fd_list.append(tango_rp)
msg_prefix = {tango_rp:'[TANGO]',
rp:'[REDIS]'}
logger = {tango_rp: _tlog,
rp: _rlog}
bosse = True
udp_reply = '%s|%d' % (socket.gethostname(),beacon_port)
while bosse:
rlist,_,_ = select.select(fd_list,[],[], 1)
......@@ -695,7 +708,8 @@ def main(args=None):
if s == udp:
buff,address = udp.recvfrom(8192)
if buff.find('Hello') > -1:
udp.sendto('%s|%d' % (socket.gethostname(),beacon_port),address)
_log.info('address request from %s. Replying with %r', address, udp_reply)
udp.sendto(udp_reply,address)
elif s == tcp:
newSocket, addr = tcp.accept()
......@@ -710,17 +724,17 @@ def main(args=None):
else:
msg = os.read(s,8192)
if msg:
print '%s: %s' % (msg_prefix.get(s,'[DEFAULT]'),msg)
logger.get(s, _log).info(msg)
else:
fd_list.remove(tango_rp)
os.close(tango_rp)
print '%s: Warning: Database exit' % (msg_prefix.get(s,'[DEFAULT]'))
logger.get(s, _log).warning('database exit')
break
else:
# Check if redis is alive
redis_exit_code = redis_process.poll()
if redis_exit_code is not None:
print '[REDIS]: Error: redis exited with code %s. Bailing out!' % (redis_exit_code)
_rlog.critical('redis exited with code %s. Bailing out!', redis_exit_code)
redis_process = None
bosse = False
except KeyboardInterrupt:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment