diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index a2b6c8c..5026cb9 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -28,8 +28,8 @@ RETRY_INTERVAL_IN_SECONDS = 1 class BaseNamespace(object): 'Define client behavior' - def __init__(self, _transport, path): - self._transport = _transport + def __init__(self, client, path): + self._client = client; self.path = path self._callback_by_event = {} self.initialize() @@ -38,15 +38,12 @@ class BaseNamespace(object): 'Initialize custom variables here; you can override this method' pass - def message(self, data='', callback=None): - self._transport.message(self.path, data, callback) - def emit(self, event, *args, **kw): callback, args = find_callback(args, kw) - self._transport.emit(self.path, event, args, callback) + self._client.emit(self.path, event, args, callback) def disconnect(self): - self._transport.disconnect(self.path) + self._client.disconnect(self.path) def on(self, event, callback): 'Define a callback to handle a custom event emitted by the server' @@ -168,31 +165,31 @@ class SocketIO(object): self.disconnect() self._terminate_heartbeat(); - def _terminate_heartbeat(self): - """Terminates the heartbeat thread. - - """ - if self.heartbeat_terminator is not None: - self.heartbeat_terminator.set(); - self.heartbeat_thread.join(); - def define(self, Namespace, path=''): - if path: - self._transport.connect(path) - namespace = Namespace(self._transport, path) + _log.debug("[define] Path: %s" % path); + namespace = Namespace(self, path) self._namespace_by_path[path] = namespace + + if path: + try: + self._transport.connect(path); + except ConnectionError as e: + _log.warn("[define] Connection error: %s" % str(e)); + self._handle_severed_connection(); + return namespace def on(self, event, callback, path=''): return self.get_namespace(path).on(event, callback) - def message(self, data='', callback=None, path=''): - self._transport.message(path, data, callback) - def emit(self, event, *args, **kw): path = kw.get('path', '') callback, args = find_callback(args, kw) - self._transport.emit(path, event, args, callback) + try: + self._transport.emit(path, event, args, callback); + except ConnectionError as e: + _log.warn("[emit] Connection error: %s" % str(e)); + self._handle_severed_connection(); def reconnect(self): """Reconnects the client. @@ -205,8 +202,8 @@ class SocketIO(object): # Reconnect to the server. if self.__transport is not None: - self.__transport.close(); - self.__transport = self._get_transport(); + self.__transport.close(); + self.__transport = None; for path in self.reconnect_paths: # We avoid reconnecting to the default namespace because @@ -217,10 +214,21 @@ class SocketIO(object): self._transport.connect(path); # Restore paths. self._namespace_by_path = copy.copy(self.reconnect_paths); - for namespace in self._namespace_by_path: - self._namespace_by_path[namespace]._transport = self.__transport; self.reconnect_paths = {}; + def _handle_severed_connection(self): + """Handles severed (unexpectedly terminated) connections + + """ + self._terminate_heartbeat(); + if len(self.reconnect_paths) is 0: + self.reconnect_paths = copy.copy(self._namespace_by_path); + + self._terminate_heartbeat(); + + for namespace in self.reconnect_paths: + self.disconnect(namespace); + def wait(self, seconds=None, for_callbacks=False): """Wait in a loop and process events as defined in the namespaces. @@ -247,16 +255,7 @@ class SocketIO(object): except ConnectionError as e: try: - # This is where we end up if the connection was - # severed. The client will disconnect here. - if len(self.reconnect_paths) is 0: - self.reconnect_paths = copy.copy(self._namespace_by_path); - - self._terminate_heartbeat(); - - for namespace in self.reconnect_paths: - self.disconnect(namespace); - + self._handle_severed_connection(); warning = Exception('[connection error] %s' % e) warning_screen.throw(warning) except StopIteration: @@ -332,7 +331,11 @@ class SocketIO(object): def disconnect(self, path=''): if self.connected: - self._transport.disconnect(path) + try: + self._transport.disconnect(path) + except ConnectionError as e: + _log.warn("[disconnect] Connection error: %s" % str(e)); + namespace = self._namespace_by_path[path] namespace.on_disconnect() if path: @@ -340,7 +343,7 @@ class SocketIO(object): @property def connected(self): - return self.__transport.connected if self.__transport is not None else False; + return self.__transport.connected if self.__transport is not None else False; @property def _transport(self): @@ -399,6 +402,14 @@ class SocketIO(object): self._start_heartbeat(websocket); return websocket; + def _terminate_heartbeat(self): + """Terminates the heartbeat thread. + + """ + if self.heartbeat_terminator is not None: + self.heartbeat_terminator.set(); + self.heartbeat_thread.join(); + def _start_heartbeat(self, transport): """Starts the heartbeat thread. @@ -413,6 +424,11 @@ class SocketIO(object): it's waiting. """ + + if self.heartbeat_thread is not None and not self.heartbeat_terminator.is_set(): + _log.warn("[start hearbeat] heartbeat already started... terminating old heartbeat"); + self._terminate_heartbeat(); + _log.debug("[start heartbeat pacemaker]"); self.heartbeat_terminator = multiprocessing.Event(); self.heartbeat_thread = multiprocessing.Process( @@ -542,8 +558,14 @@ class SocketIO(object): def _prepare_to_send_ack(self, path, packet_id): 'Return function that acknowledges the server' - return lambda *args: self._transport.ack(path, packet_id, *args) + return lambda *args: _send_ack(self, path, packet_id, *args); +def _send_ack(socketio, path, packet_id, *args): + try: + socketio._transport.ack(path, packet_id, *args); + except ConnectionError as e: + _log.warn("[send ack] Connection error: %s" % str(e)); + socketio._handle_severed_connection(); def find_callback(args, kw=None): 'Return callback whether passed as a last argument or as a keyword' @@ -618,6 +640,9 @@ def _make_heartbeat_pacemaker(terminator, transport, heartbeat_interval): _log.debug("[hearbeat]"); try: transport.send_heartbeat(); + except requests.exceptions.ConnectionError as e: + message = "[heartbeat] disconnected while sending PING"; + _log.warn(message); except: pass; _log.debug("[heartbeat terminated]");