This commit is contained in:
Roy Hyunjin Han 2015-02-15 13:54:19 -05:00
commit e98f98f9a2
4 changed files with 53 additions and 60 deletions

View file

@ -8,8 +8,11 @@ try:
except ImportError:
from urlparse import urlparse as parse_url
from .exceptions import ConnectionError, TimeoutError, PacketError
from .transports import _get_response, _negotiate_transport, TRANSPORTS
from .exceptions import (
ConnectionError, TimeoutError, PacketError, SocketIOError)
from .transports import (
_get_response, TRANSPORTS,
_WebsocketTransport, _XHR_PollingTransport, _JSONP_PollingTransport)
__version__ = '0.5.4'
@ -24,7 +27,6 @@ RETRY_INTERVAL_IN_SECONDS = 1
class BaseNamespace(object):
'Define client behavior'
def __init__(self, _transport, path):
@ -310,39 +312,70 @@ class SocketIO(object):
except AttributeError:
pass
warning_screen = _yield_warning_screen(seconds=None)
for elapsed_time in warning_screen:
try:
self.__transport = self._get_transport()
socketIO_session = _get_socketIO_session(
self.is_secure, self.base_url, **self.kw)
break
except ConnectionError as e:
if not self.wait_for_connection:
raise
warning = Exception('[waiting for connection] %s' % e)
try:
warning = Exception('[waiting for connection] %s' % e)
warning_screen.throw(warning)
except StopIteration:
self.log(logging.WARNING, warning)
return self.__transport
supported_transports = self._get_supported_transports(socketIO_session)
def _get_transport(self):
socketIO_session = _get_socketIO_session(
self.is_secure, self.base_url, **self.kw)
self.log(logging.DEBUG, '[transports available] %s', ' '.join(
socketIO_session.server_supported_transports))
# Initialize heartbeat_pacemaker
self._heartbeat_interval = socketIO_session.heartbeat_timeout / 2
self.heartbeat_pacemaker = self._make_heartbeat_pacemaker(
heartbeat_interval=self._heartbeat_interval)
next(self.heartbeat_pacemaker)
# Negotiate transport
transport = _negotiate_transport(
self.client_supported_transports, socketIO_session,
self.is_secure, self.base_url, **self.kw)
# Update namespaces
for elapsed_time in warning_screen:
self.log(
logging.DEBUG,
'[trying transport] %s', supported_transports[0])
try:
self.__transport = self._get_transport(
socketIO_session, supported_transports[0])
break
except ConnectionError as e:
try:
supported_transports.pop(0)
except IndexError:
raise
for path, namespace in self._namespace_by_path.items():
namespace._transport = transport
transport.connect(path)
return transport
namespace._transport = self.__transport
self.__transport.connect(path)
return self.__transport
def _get_supported_transports(self, session):
self.log(
logging.DEBUG,
'[transports available] %s',
' '.join(session.server_supported_transports))
supported_transports = list(set(
session.server_supported_transports,
).intersection(self.client_supported_transports))
if not supported_transports:
raise SocketIOError(' '.join([
'could not negotiate a transport:',
'client supports %s but' % ', '.join(
self.client_supported_transports),
'server supports %s' % ', '.join(
session.server_supported_transports),
]))
return supported_transports
def _get_transport(self, session, transport_name):
return {
'websocket': _WebsocketTransport,
'xhr-polling': _XHR_PollingTransport,
'jsonp-polling': _JSONP_PollingTransport,
}[transport_name](session, self.is_secure, self.base_url, **self.kw)
def _make_heartbeat_pacemaker(self, heartbeat_interval):
heartbeat_time = time.time()

View file

@ -177,15 +177,6 @@ class BaseMixin(object):
'ack_callback_response': (PAYLOAD,),
})
"""
def test_rapid_fire(self):
'Capture all server events'
namespace = self.socketIO.define(Namespace)
self.socketIO.emit('rapid_fire')
self.socketIO.wait(30)
self.assertEqual(namespace.messages, range(100000))
"""
class Test_WebsocketTransport(TestCase, BaseMixin):
@ -217,7 +208,6 @@ class Namespace(LoggingNamespace):
self.response = None
self.args_by_event = {}
self.called_on_disconnect = False
self.messages = []
def on_disconnect(self):
self.called_on_disconnect = True
@ -233,6 +223,3 @@ class Namespace(LoggingNamespace):
def on_wait_with_disconnect_response(self):
self.disconnect()
def on_rapid_fire(self, x):
self.messages.append(x)

View file

@ -296,26 +296,6 @@ class _JSONP_PollingTransport(_AbstractTransport):
self._connected = False
def _negotiate_transport(
client_supported_transports, session,
is_secure, base_url, **kw):
server_supported_transports = session.server_supported_transports
for supported_transport in client_supported_transports:
if supported_transport in server_supported_transports:
_log.debug('[%s] [transport selected] %s', base_url,
supported_transport)
return {
'websocket': _WebsocketTransport,
'xhr-polling': _XHR_PollingTransport,
'jsonp-polling': _JSONP_PollingTransport,
}[supported_transport](session, is_secure, base_url, **kw)
raise SocketIOError(' '.join([
'could not negotiate a transport:',
'client supports %s but' % ', '.join(client_supported_transports),
'server supports %s' % ', '.join(server_supported_transports),
]))
def _yield_text_from_framed_data(framed_data, parse=lambda x: x):
parts = [parse(x) for x in framed_data.split(BOUNDARY)]
for text_length, text in zip(parts[1::2], parts[2::2]):