Added an event queue so that emitted events that failed can be automatically retried upon reconnection. Also fixed a potential infinite loop bug wrt reconnecting automatically

This commit is contained in:
Sean Arietta 2014-12-23 21:38:42 -08:00
commit a1f37e8e60

View file

@ -38,9 +38,8 @@ class BaseNamespace(object):
'Initialize custom variables here; you can override this method'
pass
def emit(self, event, *args, **kw):
callback, args = find_callback(args, kw)
self._client.emit(self.path, event, args, callback)
def emit(self, event_name, *args, **kw):
self._client.emit(event_name, path = self.path, *args, **kw)
def disconnect(self):
self._client.disconnect(self.path)
@ -110,6 +109,15 @@ class BaseNamespace(object):
'on_' + event.replace(' ', '_'),
lambda *args: self.on_event(event, *args))
class SocketIOEvent(object):
def __init__(self, path, name, args, callback):
self.path = path;
self.name = name;
self.args = args;
self.callback = callback;
def __str__(self):
return str(self.path) + "/" + str(self.name) + "(" + str(self.args) + ")(" + str(self.callback) + ")";
class SocketIO(object):
"""Create a socket.io client that connects to a socket.io server
@ -152,6 +160,12 @@ class SocketIO(object):
# This sets of a chain of events that attempts to connect to
# the server at the base namespace.
self.define(Namespace)
self._transport.connect("");
# Events that fail to emit due to connection errors will be
# placed in this 'queue' and re-sent automatically upon
# reconnect.
self._event_retry_queue = [];
def __enter__(self):
_log.debug("[enter]");
@ -182,11 +196,24 @@ class SocketIO(object):
def on(self, event, callback, path=''):
return self.get_namespace(path).on(event, callback)
def emit(self, event, *args, **kw):
def _emit_event(self, event):
"""Emits an Emittable object.
This function enables automatically re-emitting events that
failed due to connection errors.
"""
self._transport.emit(event.path, event.name, event.args, event.callback);
def emit(self, event_name, *args, **kw):
path = kw.get('path', '')
callback, args = find_callback(args, kw)
event = SocketIOEvent(path, event_name, args, callback);
self._event_retry_queue.append(event);
try:
self._transport.emit(path, event, args, callback);
#import ipdb; ipdb.set_trace();
self._emit_event(event);
self._event_retry_queue.pop();
except ConnectionError as e:
_log.warn("[emit] Connection error: %s" % str(e));
self._handle_severed_connection();
@ -205,6 +232,11 @@ class SocketIO(object):
self.__transport.close();
self.__transport = None;
# We call the _create_transport directly ahead of the loop
# below so that self._tranport will not result in an infinite
# loop.
self._create_transport();
for path in self.reconnect_paths:
# We avoid reconnecting to the default namespace because
# socketIO_client connects to that already.
@ -216,6 +248,11 @@ class SocketIO(object):
self._namespace_by_path = copy.copy(self.reconnect_paths);
self.reconnect_paths = {};
# Send any pending events.
for event in self._event_retry_queue:
_log.debug("[reconnect] Re-emitting event: %s" % str(event));
self._emit_event(event);
def _handle_severed_connection(self):
"""Handles severed (unexpectedly terminated) connections
@ -225,9 +262,10 @@ class SocketIO(object):
self.reconnect_paths = copy.copy(self._namespace_by_path);
self._terminate_heartbeat();
self.__transport = None;
for namespace in self.reconnect_paths:
self.disconnect(namespace);
self.disconnect(namespace, skip_transport_disconnect = True);
def wait(self, seconds=None, for_callbacks=False):
"""Wait in a loop and process events as defined in the namespaces.
@ -235,16 +273,7 @@ class SocketIO(object):
- Omit seconds, i.e. call wait() without arguments, to wait forever.
"""
warning_screen = _yield_warning_screen(seconds)
for elapsed_time in warning_screen:
# We will end up here in the case that we
# disconnected.
if len(self.reconnect_paths) > 0:
try:
self.reconnect();
except ConnectionError as e:
time.sleep(RETRY_INTERVAL_IN_SECONDS);
continue;
for elapsed_time in warning_screen:
try:
try:
self._process_events()
@ -329,15 +358,15 @@ class SocketIO(object):
def wait_for_callbacks(self, seconds=None):
self.wait(seconds, for_callbacks=True)
def disconnect(self, path=''):
if self.connected:
def disconnect(self, path='', skip_transport_disconnect = False):
if self.connected and not skip_transport_disconnect:
try:
self._transport.disconnect(path)
except ConnectionError as e:
_log.warn("[disconnect] Connection error: %s" % str(e));
namespace = self._namespace_by_path[path]
namespace.on_disconnect()
namespace = self._namespace_by_path[path]
namespace.on_disconnect()
if path:
del self._namespace_by_path[path]
@ -345,6 +374,10 @@ class SocketIO(object):
def connected(self):
return self.__transport.connected if self.__transport is not None else False;
def _create_transport(self):
_log.debug("[create transport]");
self.__transport = self._get_transport();
@property
def _transport(self):
try:
@ -355,8 +388,7 @@ class SocketIO(object):
warning_screen = _yield_warning_screen(seconds=None)
for elapsed_time in warning_screen:
try:
_log.debug("[create transport]");
self.__transport = self._get_transport()
self._create_transport();
break
except ConnectionError as e:
if not self.wait_for_connection:
@ -366,6 +398,17 @@ class SocketIO(object):
warning_screen.throw(warning)
except StopIteration:
_log.warn(warning)
continue;
# If we disconnected before, self.reconnected_paths will be
# non-empty.
while len(self.reconnect_paths) > 0:
try:
self.reconnect();
except ConnectionError as e:
time.sleep(RETRY_INTERVAL_IN_SECONDS);
continue;
return self.__transport
def _upgrade(self):