Client now attempts to reconnect forever on server disconnect and correctly reconnects

This commit is contained in:
Sean Arietta 2014-12-22 21:46:05 -08:00
commit 2d1257bf8f
3 changed files with 55 additions and 24 deletions

View file

@ -145,19 +145,26 @@ class SocketIO(object):
self.wait_for_connection = wait_for_connection
self._namespace_by_path = {}
self.kw = kw
self.__transport = None;
# These two fields work to control the heartbeat thread.
self.heartbeat_terminator = None;
self.heartbeat_thread = None;
# Saved session information.
self.session = None;
# This is stores the set of paths (namespaces) that need to be
# reconnected to.
self.reconnect_paths = {};
# This sets of a chain of events that attempts to connect to
# the server at the base namespace.
self.define(Namespace)
def __enter__(self):
_log.debug("[enter]");
return self
def __exit__(self, *exception_pack):
@ -193,9 +200,19 @@ class SocketIO(object):
self._transport.emit(path, event, args, callback)
def reconnect(self):
"""Reconnects to a set of namespaces.
"""Reconnects the client.
Reconnects to the server and connects to the previously
connected set of namespaces.
"""
_log.debug(" [reconnect attempt]");
# Reconnect to the server.
if self.__transport is not None:
self.__transport.close();
self.__transport = self._get_transport();
for path in self.reconnect_paths:
# We avoid reconnecting to the default namespace because
# socketIO_client connects to that already.
@ -203,6 +220,10 @@ class SocketIO(object):
continue;
_log.debug("Reconnecting to path: %s" % repr(path))
self._transport.connect(path);
# Restore paths.
self._namespace_by_path = copy.copy(self.reconnect_paths);
for namespace in self._namespace_by_path:
self._namespace_by_path[namespace]._transport = self.__transport;
self.reconnect_paths = {};
def wait(self, seconds=None, for_callbacks=False):
@ -212,6 +233,15 @@ class SocketIO(object):
"""
warning_screen = _yield_warning_screen(seconds)
for elapsed_time in warning_screen:
# We will end up here in the case that we
# disconnected.
if len(self.reconnect_paths) > 0:
try:
self.reconnect();
except ConnectionError as e:
time.sleep(1);
continue;
try:
try:
self._process_events()
@ -220,26 +250,23 @@ class SocketIO(object):
if self._stop_waiting(for_callbacks):
break
# We will end up here in the case that we
# disconnected, then reconnected AND we were
# successful.
if len(self.reconnect_paths) > 0:
self.reconnect();
except ConnectionError as e:
try:
# This is where we end up if the connection was
# severed. The client will disconnect here.
if len(self.reconnect_paths) is 0:
self.reconnect_paths = copy.deepcopy(self._namespace_by_path);
self.reconnect_paths = copy.copy(self._namespace_by_path);
self._terminate_heartbeat();
self._terminate_heartbeat();
for namespace in self.reconnect_paths:
self.disconnect(namespace);
warning = Exception('[connection error] %s' % e)
self._transport._connected = False;
warning_screen.throw(warning)
except StopIteration:
_log.warn(warning)
self.disconnect()
_log.debug("[wait canceled]");
def _process_events(self):
@ -317,18 +344,19 @@ class SocketIO(object):
@property
def connected(self):
return self.__transport.connected
return self.__transport.connected if self.__transport is not None else False;
@property
def _transport(self):
try:
if self.connected:
if self.__transport is not None and self.connected:
return self.__transport
except AttributeError:
pass
warning_screen = _yield_warning_screen(seconds=None)
for elapsed_time in warning_screen:
try:
_log.debug("[create transport]");
self.__transport = self._get_transport()
break
except ConnectionError as e:
@ -349,7 +377,7 @@ class SocketIO(object):
if packet.type == PacketType.PONG:
_log.debug("[PONG] %s" % repr(packet));
self.heartbeat_terminator.set();
self._terminate_heartbeat();
# Technically we would need to pause the current
# transport (which should be polling in this
@ -391,6 +419,7 @@ class SocketIO(object):
try:
return self._upgrade();
except:
_log.warn("[websocket] Failed to upgrade to websocket")
pass;
return transport

View file

@ -35,7 +35,7 @@ class Packet():
def encode_as_string(self, for_websocket = False):
data = "";
path = "";
if self.type == PacketType.MESSAGE:
if self.type == PacketType.MESSAGE and not isinstance(self.payload, basestring):
data = self.payload.encode_as_string();
path = self.payload.path;
else:
@ -230,8 +230,9 @@ def decode_packet(packet):
message = decode_message(payload);
payload = message;
return offset + length, Packet(packet_type, payload);
return offset + length - 1, Packet(packet_type, payload);
else:
import ipdb; ipdb.set_trace();
pass;
def encode_packet_string(code, path, data):

View file

@ -41,7 +41,7 @@ class _AbstractTransport(object):
self.close()
def connect(self, path):
if path != "":
if True or path != "":
_log.debug("Connecting to path: %s" % path);
data = Message(MessageType.CONNECT, path).encode_as_string();
self.send_packet(PacketType.MESSAGE, path, data);
@ -50,11 +50,11 @@ class _AbstractTransport(object):
responded = False;
while not responded:
for packet in self.recv_packet():
_log.debug("[connect wait] Waiting for confirmation");
_log.debug("[connect wait] Waiting for confirmation of connect to: %s" % path);
if (packet.type == PacketType.MESSAGE
and packet.payload.type == MessageType.CONNECT
and packet.payload.path == path):
_log.debug("Connected to path: %s" % path);
_log.debug("[connect] Connected to path: %s" % path);
responded = True;
else:
self.send_packet(PacketType.OPEN, path, data);
@ -78,7 +78,7 @@ class _AbstractTransport(object):
_log.debug("[ack] Sending ACK for packet: %d" % packet_id);
message = Message(MessageType.ACK, "", path, "", packet_id);
packet = Packet(PacketType.MESSAGE, message);
self.send_engineio_packet(packet)
self.send_engineio_packet(packet);
def noop(self, path=''):
self.send_packet(PacketType.NOOP, path)
@ -86,7 +86,6 @@ class _AbstractTransport(object):
def send_packet(self, code, path='', data='', callback=None):
packet_text = Packet(code, data).encode_as_string();
self.send(packet_text)
_log.debug('[packet sent] %s', packet_text)
def recv_packet(self):
try:
@ -127,6 +126,7 @@ class WebsocketTransport(_AbstractTransport):
base_url, parser.ENGINE_PROTOCOL, socketIO_session.id)
try:
_log.debug("[websocket] Connecting");
self._connection = websocket.create_connection(self._url)
except socket.timeout as e:
raise ConnectionError(e)
@ -141,17 +141,16 @@ class WebsocketTransport(_AbstractTransport):
def send_message(self, message, callback = None):
packet_text = message.encode_as_string();
self.send(packet_text)
_log.debug('[packet sent] %s', packet_text)
def send_engineio_packet(self, packet, callback=None):
packet_text = packet.encode_as_string(for_websocket = True);
self.send(packet_text)
_log.debug('[packet sent] %s', packet_text)
def send_packet(self, code, path="", data='', callback=None):
self.send_message(Message(code, data), callback);
def send(self, packet_text):
_log.debug("[websocket] send: " + str(packet_text));
try:
self._connection.send(packet_text)
except websocket.WebSocketTimeoutException as e:
@ -186,7 +185,8 @@ class WebsocketTransport(_AbstractTransport):
raise ConnectionError(e)
def close(self):
self._connection.close()
self._connection.close();
self._connected = False;
class XHR_PollingTransport(_AbstractTransport):
@ -207,8 +207,9 @@ class XHR_PollingTransport(_AbstractTransport):
@property
def _params(self):
return dict(t=int(time.time() * 1000))
def send(self, packet_text):
_log.debug("[xhr] send: " + str(packet_text));
uri = self._url + "&" + '&'.join("%s=%s" % (k, v) for (k, v) in self._params.iteritems());
response = None;
try: