diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index 5026cb9..6c5b41c 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -38,9 +38,8 @@ class BaseNamespace(object): 'Initialize custom variables here; you can override this method' pass - def emit(self, event, *args, **kw): - callback, args = find_callback(args, kw) - self._client.emit(self.path, event, args, callback) + def emit(self, event_name, *args, **kw): + self._client.emit(event_name, path = self.path, *args, **kw) def disconnect(self): self._client.disconnect(self.path) @@ -110,6 +109,15 @@ class BaseNamespace(object): 'on_' + event.replace(' ', '_'), lambda *args: self.on_event(event, *args)) +class SocketIOEvent(object): + def __init__(self, path, name, args, callback): + self.path = path; + self.name = name; + self.args = args; + self.callback = callback; + + def __str__(self): + return str(self.path) + "/" + str(self.name) + "(" + str(self.args) + ")(" + str(self.callback) + ")"; class SocketIO(object): """Create a socket.io client that connects to a socket.io server @@ -152,6 +160,12 @@ class SocketIO(object): # This sets of a chain of events that attempts to connect to # the server at the base namespace. self.define(Namespace) + self._transport.connect(""); + + # Events that fail to emit due to connection errors will be + # placed in this 'queue' and re-sent automatically upon + # reconnect. + self._event_retry_queue = []; def __enter__(self): _log.debug("[enter]"); @@ -182,11 +196,24 @@ class SocketIO(object): def on(self, event, callback, path=''): return self.get_namespace(path).on(event, callback) - def emit(self, event, *args, **kw): + def _emit_event(self, event): + """Emits an Emittable object. + + This function enables automatically re-emitting events that + failed due to connection errors. + + """ + self._transport.emit(event.path, event.name, event.args, event.callback); + + def emit(self, event_name, *args, **kw): path = kw.get('path', '') callback, args = find_callback(args, kw) + event = SocketIOEvent(path, event_name, args, callback); + self._event_retry_queue.append(event); try: - self._transport.emit(path, event, args, callback); + #import ipdb; ipdb.set_trace(); + self._emit_event(event); + self._event_retry_queue.pop(); except ConnectionError as e: _log.warn("[emit] Connection error: %s" % str(e)); self._handle_severed_connection(); @@ -205,6 +232,11 @@ class SocketIO(object): self.__transport.close(); self.__transport = None; + # We call the _create_transport directly ahead of the loop + # below so that self._tranport will not result in an infinite + # loop. + self._create_transport(); + for path in self.reconnect_paths: # We avoid reconnecting to the default namespace because # socketIO_client connects to that already. @@ -216,6 +248,11 @@ class SocketIO(object): self._namespace_by_path = copy.copy(self.reconnect_paths); self.reconnect_paths = {}; + # Send any pending events. + for event in self._event_retry_queue: + _log.debug("[reconnect] Re-emitting event: %s" % str(event)); + self._emit_event(event); + def _handle_severed_connection(self): """Handles severed (unexpectedly terminated) connections @@ -225,9 +262,10 @@ class SocketIO(object): self.reconnect_paths = copy.copy(self._namespace_by_path); self._terminate_heartbeat(); + self.__transport = None; for namespace in self.reconnect_paths: - self.disconnect(namespace); + self.disconnect(namespace, skip_transport_disconnect = True); def wait(self, seconds=None, for_callbacks=False): """Wait in a loop and process events as defined in the namespaces. @@ -235,16 +273,7 @@ class SocketIO(object): - Omit seconds, i.e. call wait() without arguments, to wait forever. """ warning_screen = _yield_warning_screen(seconds) - for elapsed_time in warning_screen: - # We will end up here in the case that we - # disconnected. - if len(self.reconnect_paths) > 0: - try: - self.reconnect(); - except ConnectionError as e: - time.sleep(RETRY_INTERVAL_IN_SECONDS); - continue; - + for elapsed_time in warning_screen: try: try: self._process_events() @@ -329,15 +358,15 @@ class SocketIO(object): def wait_for_callbacks(self, seconds=None): self.wait(seconds, for_callbacks=True) - def disconnect(self, path=''): - if self.connected: + def disconnect(self, path='', skip_transport_disconnect = False): + if self.connected and not skip_transport_disconnect: try: self._transport.disconnect(path) except ConnectionError as e: _log.warn("[disconnect] Connection error: %s" % str(e)); - namespace = self._namespace_by_path[path] - namespace.on_disconnect() + namespace = self._namespace_by_path[path] + namespace.on_disconnect() if path: del self._namespace_by_path[path] @@ -345,6 +374,10 @@ class SocketIO(object): def connected(self): return self.__transport.connected if self.__transport is not None else False; + def _create_transport(self): + _log.debug("[create transport]"); + self.__transport = self._get_transport(); + @property def _transport(self): try: @@ -355,8 +388,7 @@ class SocketIO(object): warning_screen = _yield_warning_screen(seconds=None) for elapsed_time in warning_screen: try: - _log.debug("[create transport]"); - self.__transport = self._get_transport() + self._create_transport(); break except ConnectionError as e: if not self.wait_for_connection: @@ -366,6 +398,17 @@ class SocketIO(object): warning_screen.throw(warning) except StopIteration: _log.warn(warning) + + continue; + # If we disconnected before, self.reconnected_paths will be + # non-empty. + while len(self.reconnect_paths) > 0: + try: + self.reconnect(); + except ConnectionError as e: + time.sleep(RETRY_INTERVAL_IN_SECONDS); + continue; + return self.__transport def _upgrade(self):