From f182df8580899afd7bc28fdff9d77804975e884a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Sureau?= Date: Tue, 25 Mar 2014 16:18:19 +0100 Subject: [PATCH] Implement transports fallback --- socketIO_client/__init__.py | 80 ++++++++++++++++++++++++++--------- socketIO_client/transports.py | 19 --------- 2 files changed, 61 insertions(+), 38 deletions(-) diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index 3bdecf1..7012cfc 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -6,7 +6,7 @@ from collections import namedtuple from urlparse import urlparse from .exceptions import ConnectionError, TimeoutError, PacketError -from .transports import _get_response, _negotiate_transport, TRANSPORTS +from .transports import _get_response, TRANSPORTS, _WebsocketTransport, _XHR_PollingTransport, _JSONP_PollingTransport _SocketIOSession = namedtuple('_SocketIOSession', [ @@ -127,9 +127,11 @@ class SocketIO(object): def __init__( self, host, port=None, Namespace=BaseNamespace, - wait_for_connection=True, transports=TRANSPORTS, **kw): + wait_for_connection=True, transports=TRANSPORTS, + try_multiple_transports=True, **kw): self.is_secure, self.base_url = _parse_host(host, port) self.wait_for_connection = wait_for_connection + self.try_multiple_transports = try_multiple_transports self._namespace_by_path = {} self.client_supported_transports = transports self.kw = kw @@ -231,39 +233,79 @@ class SocketIO(object): return self.__transport except AttributeError: pass + warning_screen = _yield_warning_screen(seconds=None) + + # Transport handshake + if self.wait_for_connection: + for elapsed_time in warning_screen: + try: + socketIO_session = _get_socketIO_session(self.is_secure, self.base_url, **self.kw) + break + except ConnectionError as e: + try: + warning = Exception('[waiting for handshake] %s' % e) + warning_screen.throw(warning) + except StopIteration: + _log.warn(warning) + + else: + socketIO_session = _get_socketIO_session(self.is_secure, self.base_url, **self.kw) + + supported_transports = self._get_supported_transports(socketIO_session) + _log.debug('[negociated transports] %s', supported_transports) + + # Initialize heartbeat_pacemaker + self.heartbeat_pacemaker = self._make_heartbeat_pacemaker(socketIO_session.heartbeat_timeout / 2) + self.heartbeat_pacemaker.next() + for elapsed_time in warning_screen: + _log.debug('supported_transports = %s', supported_transports) try: - self.__transport = self._get_transport() + _log.debug('[trying transport] %s', supported_transports[0]) + self.__transport = self._get_transport(socketIO_session, supported_transports[0]) break except ConnectionError as e: - if not self.wait_for_connection: + last_transport = len(supported_transports) <= 1 + if not self.wait_for_connection and (not self.try_multiple_transports or last_transport): raise + try: warning = Exception('[waiting for connection] %s' % e) warning_screen.throw(warning) except StopIteration: _log.warn(warning) + + if self.try_multiple_transports and not last_transport: + supported_transports.pop(0) + + self._update_namespaces(self.__transport) return self.__transport - def _get_transport(self): - socketIO_session = _get_socketIO_session( - self.is_secure, self.base_url, **self.kw) - _log.debug('[transports available] %s', ' '.join( - socketIO_session.server_supported_transports)) - # Initialize heartbeat_pacemaker - self.heartbeat_pacemaker = self._make_heartbeat_pacemaker( - heartbeat_interval=socketIO_session.heartbeat_timeout / 2) - self.heartbeat_pacemaker.next() - # Negotiate transport - transport = _negotiate_transport( - self.client_supported_transports, socketIO_session, - self.is_secure, self.base_url, **self.kw) - # Update namespaces + def _get_supported_transports(self, session): + _log.debug('[transports available] %s', ' '.join(session.server_supported_transports)) + supported_transports = [t for t in self.client_supported_transports if t in session.server_supported_transports] + + if not supported_transports: + raise SocketIOError(' '.join([ + 'could not negotiate a transport:', + 'client supports %s but' % ', '.join(client_supported_transports), + 'server supports %s' % ', '.join(server_supported_transports), + ])) + + return supported_transports + + def _get_transport(self, session, transport_name): + return { + 'websocket': _WebsocketTransport, + 'xhr-polling': _XHR_PollingTransport, + 'jsonp-polling': _JSONP_PollingTransport, + }[transport_name](session, self.is_secure, self.base_url, **self.kw) + + def _update_namespaces(self, transport): for path, namespace in self._namespace_by_path.iteritems(): namespace._transport = transport transport.connect(path) - return transport def _make_heartbeat_pacemaker(self, heartbeat_interval): heartbeat_time = 0 diff --git a/socketIO_client/transports.py b/socketIO_client/transports.py index ed4e59c..5d911a5 100644 --- a/socketIO_client/transports.py +++ b/socketIO_client/transports.py @@ -274,25 +274,6 @@ class _JSONP_PollingTransport(_AbstractTransport): self._connected = False -def _negotiate_transport( - client_supported_transports, session, - is_secure, base_url, **kw): - server_supported_transports = session.server_supported_transports - for supported_transport in client_supported_transports: - if supported_transport in server_supported_transports: - _log.debug('[transport selected] %s', supported_transport) - return { - 'websocket': _WebsocketTransport, - 'xhr-polling': _XHR_PollingTransport, - 'jsonp-polling': _JSONP_PollingTransport, - }[supported_transport](session, is_secure, base_url, **kw) - raise SocketIOError(' '.join([ - 'could not negotiate a transport:', - 'client supports %s but' % ', '.join(client_supported_transports), - 'server supports %s' % ', '.join(server_supported_transports), - ])) - - def _yield_text_from_framed_data(framed_data, parse=lambda x: x): parts = [parse(x) for x in framed_data.split(BOUNDARY)] for text_length, text in izip(parts[1::2], parts[2::2]):