Added connection error handling for all send / recv related methods so that we can reconnect on all connection errors

This commit is contained in:
Sean Arietta 2014-12-23 19:15:50 -08:00
commit 6932d4c2d1

View file

@ -28,8 +28,8 @@ RETRY_INTERVAL_IN_SECONDS = 1
class BaseNamespace(object):
'Define client behavior'
def __init__(self, _transport, path):
self._transport = _transport
def __init__(self, client, path):
self._client = client;
self.path = path
self._callback_by_event = {}
self.initialize()
@ -38,15 +38,12 @@ class BaseNamespace(object):
'Initialize custom variables here; you can override this method'
pass
def message(self, data='', callback=None):
self._transport.message(self.path, data, callback)
def emit(self, event, *args, **kw):
callback, args = find_callback(args, kw)
self._transport.emit(self.path, event, args, callback)
self._client.emit(self.path, event, args, callback)
def disconnect(self):
self._transport.disconnect(self.path)
self._client.disconnect(self.path)
def on(self, event, callback):
'Define a callback to handle a custom event emitted by the server'
@ -168,31 +165,31 @@ class SocketIO(object):
self.disconnect()
self._terminate_heartbeat();
def _terminate_heartbeat(self):
"""Terminates the heartbeat thread.
"""
if self.heartbeat_terminator is not None:
self.heartbeat_terminator.set();
self.heartbeat_thread.join();
def define(self, Namespace, path=''):
if path:
self._transport.connect(path)
namespace = Namespace(self._transport, path)
_log.debug("[define] Path: %s" % path);
namespace = Namespace(self, path)
self._namespace_by_path[path] = namespace
if path:
try:
self._transport.connect(path);
except ConnectionError as e:
_log.warn("[define] Connection error: %s" % str(e));
self._handle_severed_connection();
return namespace
def on(self, event, callback, path=''):
return self.get_namespace(path).on(event, callback)
def message(self, data='', callback=None, path=''):
self._transport.message(path, data, callback)
def emit(self, event, *args, **kw):
path = kw.get('path', '')
callback, args = find_callback(args, kw)
self._transport.emit(path, event, args, callback)
try:
self._transport.emit(path, event, args, callback);
except ConnectionError as e:
_log.warn("[emit] Connection error: %s" % str(e));
self._handle_severed_connection();
def reconnect(self):
"""Reconnects the client.
@ -205,8 +202,8 @@ class SocketIO(object):
# Reconnect to the server.
if self.__transport is not None:
self.__transport.close();
self.__transport = self._get_transport();
self.__transport.close();
self.__transport = None;
for path in self.reconnect_paths:
# We avoid reconnecting to the default namespace because
@ -217,10 +214,21 @@ class SocketIO(object):
self._transport.connect(path);
# Restore paths.
self._namespace_by_path = copy.copy(self.reconnect_paths);
for namespace in self._namespace_by_path:
self._namespace_by_path[namespace]._transport = self.__transport;
self.reconnect_paths = {};
def _handle_severed_connection(self):
"""Handles severed (unexpectedly terminated) connections
"""
self._terminate_heartbeat();
if len(self.reconnect_paths) is 0:
self.reconnect_paths = copy.copy(self._namespace_by_path);
self._terminate_heartbeat();
for namespace in self.reconnect_paths:
self.disconnect(namespace);
def wait(self, seconds=None, for_callbacks=False):
"""Wait in a loop and process events as defined in the namespaces.
@ -247,16 +255,7 @@ class SocketIO(object):
except ConnectionError as e:
try:
# This is where we end up if the connection was
# severed. The client will disconnect here.
if len(self.reconnect_paths) is 0:
self.reconnect_paths = copy.copy(self._namespace_by_path);
self._terminate_heartbeat();
for namespace in self.reconnect_paths:
self.disconnect(namespace);
self._handle_severed_connection();
warning = Exception('[connection error] %s' % e)
warning_screen.throw(warning)
except StopIteration:
@ -332,7 +331,11 @@ class SocketIO(object):
def disconnect(self, path=''):
if self.connected:
self._transport.disconnect(path)
try:
self._transport.disconnect(path)
except ConnectionError as e:
_log.warn("[disconnect] Connection error: %s" % str(e));
namespace = self._namespace_by_path[path]
namespace.on_disconnect()
if path:
@ -340,7 +343,7 @@ class SocketIO(object):
@property
def connected(self):
return self.__transport.connected if self.__transport is not None else False;
return self.__transport.connected if self.__transport is not None else False;
@property
def _transport(self):
@ -399,6 +402,14 @@ class SocketIO(object):
self._start_heartbeat(websocket);
return websocket;
def _terminate_heartbeat(self):
"""Terminates the heartbeat thread.
"""
if self.heartbeat_terminator is not None:
self.heartbeat_terminator.set();
self.heartbeat_thread.join();
def _start_heartbeat(self, transport):
"""Starts the heartbeat thread.
@ -413,6 +424,11 @@ class SocketIO(object):
it's waiting.
"""
if self.heartbeat_thread is not None and not self.heartbeat_terminator.is_set():
_log.warn("[start hearbeat] heartbeat already started... terminating old heartbeat");
self._terminate_heartbeat();
_log.debug("[start heartbeat pacemaker]");
self.heartbeat_terminator = multiprocessing.Event();
self.heartbeat_thread = multiprocessing.Process(
@ -542,8 +558,14 @@ class SocketIO(object):
def _prepare_to_send_ack(self, path, packet_id):
'Return function that acknowledges the server'
return lambda *args: self._transport.ack(path, packet_id, *args)
return lambda *args: _send_ack(self, path, packet_id, *args);
def _send_ack(socketio, path, packet_id, *args):
try:
socketio._transport.ack(path, packet_id, *args);
except ConnectionError as e:
_log.warn("[send ack] Connection error: %s" % str(e));
socketio._handle_severed_connection();
def find_callback(args, kw=None):
'Return callback whether passed as a last argument or as a keyword'
@ -618,6 +640,9 @@ def _make_heartbeat_pacemaker(terminator, transport, heartbeat_interval):
_log.debug("[hearbeat]");
try:
transport.send_heartbeat();
except requests.exceptions.ConnectionError as e:
message = "[heartbeat] disconnected while sending PING";
_log.warn(message);
except:
pass;
_log.debug("[heartbeat terminated]");