diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index 05492d9..30a8643 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -145,19 +145,26 @@ class SocketIO(object): self.wait_for_connection = wait_for_connection self._namespace_by_path = {} self.kw = kw + + self.__transport = None; + # These two fields work to control the heartbeat thread. self.heartbeat_terminator = None; self.heartbeat_thread = None; + # Saved session information. self.session = None; + # This is stores the set of paths (namespaces) that need to be # reconnected to. self.reconnect_paths = {}; + # This sets of a chain of events that attempts to connect to # the server at the base namespace. self.define(Namespace) def __enter__(self): + _log.debug("[enter]"); return self def __exit__(self, *exception_pack): @@ -193,9 +200,19 @@ class SocketIO(object): self._transport.emit(path, event, args, callback) def reconnect(self): - """Reconnects to a set of namespaces. + """Reconnects the client. + + Reconnects to the server and connects to the previously + connected set of namespaces. """ + _log.debug(" [reconnect attempt]"); + + # Reconnect to the server. + if self.__transport is not None: + self.__transport.close(); + self.__transport = self._get_transport(); + for path in self.reconnect_paths: # We avoid reconnecting to the default namespace because # socketIO_client connects to that already. @@ -203,6 +220,10 @@ class SocketIO(object): continue; _log.debug("Reconnecting to path: %s" % repr(path)) self._transport.connect(path); + # Restore paths. + self._namespace_by_path = copy.copy(self.reconnect_paths); + for namespace in self._namespace_by_path: + self._namespace_by_path[namespace]._transport = self.__transport; self.reconnect_paths = {}; def wait(self, seconds=None, for_callbacks=False): @@ -212,6 +233,15 @@ class SocketIO(object): """ warning_screen = _yield_warning_screen(seconds) for elapsed_time in warning_screen: + # We will end up here in the case that we + # disconnected. + if len(self.reconnect_paths) > 0: + try: + self.reconnect(); + except ConnectionError as e: + time.sleep(1); + continue; + try: try: self._process_events() @@ -220,26 +250,23 @@ class SocketIO(object): if self._stop_waiting(for_callbacks): break - # We will end up here in the case that we - # disconnected, then reconnected AND we were - # successful. - if len(self.reconnect_paths) > 0: - self.reconnect(); except ConnectionError as e: try: # This is where we end up if the connection was # severed. The client will disconnect here. if len(self.reconnect_paths) is 0: - self.reconnect_paths = copy.deepcopy(self._namespace_by_path); + self.reconnect_paths = copy.copy(self._namespace_by_path); - self._terminate_heartbeat(); + self._terminate_heartbeat(); + + for namespace in self.reconnect_paths: + self.disconnect(namespace); warning = Exception('[connection error] %s' % e) - self._transport._connected = False; warning_screen.throw(warning) except StopIteration: _log.warn(warning) - self.disconnect() + _log.debug("[wait canceled]"); def _process_events(self): @@ -317,18 +344,19 @@ class SocketIO(object): @property def connected(self): - return self.__transport.connected + return self.__transport.connected if self.__transport is not None else False; @property def _transport(self): try: - if self.connected: + if self.__transport is not None and self.connected: return self.__transport except AttributeError: pass warning_screen = _yield_warning_screen(seconds=None) for elapsed_time in warning_screen: try: + _log.debug("[create transport]"); self.__transport = self._get_transport() break except ConnectionError as e: @@ -349,7 +377,7 @@ class SocketIO(object): if packet.type == PacketType.PONG: _log.debug("[PONG] %s" % repr(packet)); - self.heartbeat_terminator.set(); + self._terminate_heartbeat(); # Technically we would need to pause the current # transport (which should be polling in this @@ -391,6 +419,7 @@ class SocketIO(object): try: return self._upgrade(); except: + _log.warn("[websocket] Failed to upgrade to websocket") pass; return transport diff --git a/socketIO_client/parser.py b/socketIO_client/parser.py index 8fe06b1..39fcb3d 100644 --- a/socketIO_client/parser.py +++ b/socketIO_client/parser.py @@ -35,7 +35,7 @@ class Packet(): def encode_as_string(self, for_websocket = False): data = ""; path = ""; - if self.type == PacketType.MESSAGE: + if self.type == PacketType.MESSAGE and not isinstance(self.payload, basestring): data = self.payload.encode_as_string(); path = self.payload.path; else: @@ -230,8 +230,9 @@ def decode_packet(packet): message = decode_message(payload); payload = message; - return offset + length, Packet(packet_type, payload); + return offset + length - 1, Packet(packet_type, payload); else: + import ipdb; ipdb.set_trace(); pass; def encode_packet_string(code, path, data): diff --git a/socketIO_client/transports.py b/socketIO_client/transports.py index 65be157..2c70cf4 100644 --- a/socketIO_client/transports.py +++ b/socketIO_client/transports.py @@ -41,7 +41,7 @@ class _AbstractTransport(object): self.close() def connect(self, path): - if path != "": + if True or path != "": _log.debug("Connecting to path: %s" % path); data = Message(MessageType.CONNECT, path).encode_as_string(); self.send_packet(PacketType.MESSAGE, path, data); @@ -50,11 +50,11 @@ class _AbstractTransport(object): responded = False; while not responded: for packet in self.recv_packet(): - _log.debug("[connect wait] Waiting for confirmation"); + _log.debug("[connect wait] Waiting for confirmation of connect to: %s" % path); if (packet.type == PacketType.MESSAGE and packet.payload.type == MessageType.CONNECT and packet.payload.path == path): - _log.debug("Connected to path: %s" % path); + _log.debug("[connect] Connected to path: %s" % path); responded = True; else: self.send_packet(PacketType.OPEN, path, data); @@ -78,7 +78,7 @@ class _AbstractTransport(object): _log.debug("[ack] Sending ACK for packet: %d" % packet_id); message = Message(MessageType.ACK, "", path, "", packet_id); packet = Packet(PacketType.MESSAGE, message); - self.send_engineio_packet(packet) + self.send_engineio_packet(packet); def noop(self, path=''): self.send_packet(PacketType.NOOP, path) @@ -86,7 +86,6 @@ class _AbstractTransport(object): def send_packet(self, code, path='', data='', callback=None): packet_text = Packet(code, data).encode_as_string(); self.send(packet_text) - _log.debug('[packet sent] %s', packet_text) def recv_packet(self): try: @@ -127,6 +126,7 @@ class WebsocketTransport(_AbstractTransport): base_url, parser.ENGINE_PROTOCOL, socketIO_session.id) try: + _log.debug("[websocket] Connecting"); self._connection = websocket.create_connection(self._url) except socket.timeout as e: raise ConnectionError(e) @@ -141,17 +141,16 @@ class WebsocketTransport(_AbstractTransport): def send_message(self, message, callback = None): packet_text = message.encode_as_string(); self.send(packet_text) - _log.debug('[packet sent] %s', packet_text) def send_engineio_packet(self, packet, callback=None): packet_text = packet.encode_as_string(for_websocket = True); self.send(packet_text) - _log.debug('[packet sent] %s', packet_text) def send_packet(self, code, path="", data='', callback=None): self.send_message(Message(code, data), callback); def send(self, packet_text): + _log.debug("[websocket] send: " + str(packet_text)); try: self._connection.send(packet_text) except websocket.WebSocketTimeoutException as e: @@ -186,7 +185,8 @@ class WebsocketTransport(_AbstractTransport): raise ConnectionError(e) def close(self): - self._connection.close() + self._connection.close(); + self._connected = False; class XHR_PollingTransport(_AbstractTransport): @@ -207,8 +207,9 @@ class XHR_PollingTransport(_AbstractTransport): @property def _params(self): return dict(t=int(time.time() * 1000)) - + def send(self, packet_text): + _log.debug("[xhr] send: " + str(packet_text)); uri = self._url + "&" + '&'.join("%s=%s" % (k, v) for (k, v) in self._params.iteritems()); response = None; try: