diff --git a/serve_tests.js b/serve_tests.js index 82c8024..f69247a 100644 --- a/serve_tests.js +++ b/serve_tests.js @@ -15,6 +15,7 @@ var main = io.of('').on('connection', function(socket) { } }); socket.on('emit', function() { + console.log('hey'); socket.emit('emit_response'); }); socket.on('emit_with_payload', function(payload) { diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index 6dbbc7f..2759f5a 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -1,20 +1,21 @@ import logging import json import requests -import socket import time -import websocket from collections import namedtuple +from .exceptions import SocketIOConnectionError, _TimeoutError, _PacketError +from .transports import _get_response, _negotiate_transport, TRANSPORTS -_Session = namedtuple('_Session', [ + +_SocketIOSession = namedtuple('_SocketIOSession', [ 'id', 'heartbeat_timeout', 'server_supported_transports', ]) _log = logging.getLogger(__name__) -TRANSPORTS = 'websocket', 'xhr-polling', 'jsonp-polling' PROTOCOL_VERSION = 1 +RETRY_INTERVAL_IN_SECONDS = 1 class BaseNamespace(object): @@ -157,22 +158,26 @@ class SocketIO(object): def wait(self, seconds=None, for_callbacks=False): try: - warning_screen = _yield_warning_screen(seconds, sleep=1) + warning_screen = _yield_warning_screen(seconds) for elapsed_time in warning_screen: try: if for_callbacks and not self._transport.has_ack_callback: break try: - self._process_packet(self._transport.recv_packet()) + packet = self._transport.recv_packet().next() + self._process_packet(packet) except _TimeoutError: pass - except _PacketError as error: - _log.warn('[packet error] %s', error) + except _PacketError as e: + _log.warn('[packet error] %s', e) self.heartbeat_pacemaker.send(elapsed_time) - except SocketIOConnectionError as error: + except SocketIOConnectionError as e: + try: + warning = Exception('[connection error] %s' % e) + warning_screen.throw(warning) + except StopIteration: + _log.warn(warning) self.disconnect() - warning = Exception('[connection error] %s' % error) - warning_screen.throw(warning) except KeyboardInterrupt: pass @@ -198,29 +203,33 @@ class SocketIO(object): return self.__transport except AttributeError: pass - warning_screen = _yield_warning_screen(seconds=None, sleep=1) + warning_screen = _yield_warning_screen(seconds=None) for elapsed_time in warning_screen: try: self.__transport = self._get_transport() break - except SocketIOConnectionError as error: + except SocketIOConnectionError as e: if not self.wait_for_connection: raise - warning = Exception('[waiting for connection] %s' % error) - warning_screen.throw(warning) + try: + warning = Exception('[waiting for connection] %s' % e) + warning_screen.throw(warning) + except StopIteration: + _log.warn(warning) return self.__transport def _get_transport(self): - self.session = _get_session(self.secure, self.base_url, **self.kw) + socketIO_session = _get_socketIO_session( + self.secure, self.base_url, **self.kw) _log.debug('[transports available] %s', ' '.join( - self.session.server_supported_transports)) + socketIO_session.server_supported_transports)) # Initialize heartbeat_pacemaker self.heartbeat_pacemaker = self._make_heartbeat_pacemaker( - heartbeat_interval=self.session.heartbeat_timeout - 2) + heartbeat_interval=socketIO_session.heartbeat_timeout - 2) self.heartbeat_pacemaker.next() # Negotiate transport transport = _negotiate_transport( - self.client_supported_transports, self.session, + self.client_supported_transports, socketIO_session, self.secure, self.base_url, **self.kw) # Update namespaces for namespace in self._namespace_by_path.values(): @@ -314,143 +323,6 @@ class SocketIO(object): return lambda *args: self._transport.ack(packet_id, *args) -class SocketIOError(Exception): - pass - - -class _TimeoutError(Exception): - pass - - -class _PacketError(SocketIOError): - pass - - -class SocketIOConnectionError(SocketIOError): - pass - - -class _AbstractTransport(object): - - def __init__(self): - self._packet_id = 0 - self._callback_by_packet_id = {} - - def disconnect(self, path=''): - if not self.connected: - return - if path: - self.send_packet(0, path) - else: - self.connection.close() - - def connect(self, path): - self.send_packet(1, path) - - def send_heartbeat(self): - self.send_packet(2) - - def message(self, path, data, callback): - if isinstance(data, basestring): - code = 3 - else: - code = 4 - data = json.dumps(data, ensure_ascii=False) - self.send_packet(code, path, data, callback) - - def emit(self, path, event, args, callback): - data = json.dumps(dict(name=event, args=args), ensure_ascii=False) - self.send_packet(5, path, data, callback) - - def ack(self, packet_id, *args): - packet_id = packet_id.rstrip('+') - data = '%s+%s' % ( - packet_id, - json.dumps(args, ensure_ascii=False), - ) if args else packet_id - self.send_packet(6, data=data) - - def noop(self): - self.send_packet(8) - - def send_packet(self, code, path='', data='', callback=None): - packet_id = self.set_ack_callback(callback) if callback else '' - packet_parts = str(code), packet_id, path, data - packet_text = ':'.join(packet_parts) - self.send(packet_text) - _log.debug('[packet sent] %s', packet_text) - - def recv_packet(self): - code, packet_id, path, data = None, None, None, None - packet_text = self.recv() - _log.debug('[packet received] %s', packet_text) - try: - packet_parts = packet_text.split(':', 3) - except AttributeError: - raise _PacketError('invalid packet (%s)' % packet_text) - packet_count = len(packet_parts) - if 4 == packet_count: - code, packet_id, path, data = packet_parts - elif 3 == packet_count: - code, packet_id, path = packet_parts - elif 1 == packet_count: - code = packet_parts[0] - return code, packet_id, path, data - - def set_ack_callback(self, callback): - 'Set callback to be called after server sends an acknowledgment' - self._packet_id += 1 - self._callback_by_packet_id[str(self._packet_id)] = callback - return '%s+' % self._packet_id - - def get_ack_callback(self, packet_id): - 'Get callback to be called after server sends an acknowledgment' - callback = self._callback_by_packet_id[packet_id] - del self._callback_by_packet_id[packet_id] - return callback - - @property - def has_ack_callback(self): - return True if self._callback_by_packet_id else False - - -class _WebsocketTransport(_AbstractTransport): - - def __init__(self, session, secure, base_url, **kw): - super(_WebsocketTransport, self).__init__() - url = '%s://%s/websocket/%s' % ( - 'wss' if secure else 'ws', - base_url, session.id) - _log.debug('[transport selected] %s', url) - try: - self.connection = websocket.create_connection(url) - except socket.timeout as error: - raise SocketIOConnectionError(error) - except socket.error as error: - raise SocketIOConnectionError(error) - self.connection.settimeout(1) - - @property - def connected(self): - return self.connection.connected - - def recv(self): - try: - return self.connection.recv() - except socket.timeout: - raise _TimeoutError - except socket.error as error: - raise SocketIOConnectionError(error) - except websocket.WebSocketConnectionClosedException: - raise SocketIOConnectionError('server closed connection') - - def send(self, packet_text): - try: - self.connection.send(packet_text) - except socket.error: - raise SocketIOConnectionError('could not send %s' % packet_text) - - def find_callback(args, kw=None): 'Return callback whether passed as a last argument or as a keyword' if args and callable(args[-1]): @@ -461,7 +333,7 @@ def find_callback(args, kw=None): return None, args -def _yield_warning_screen(seconds=None, sleep=0): +def _yield_warning_screen(seconds=None): last_warning = None for elapsed_time in _yield_elapsed_time(seconds): try: @@ -471,7 +343,7 @@ def _yield_warning_screen(seconds=None, sleep=0): if last_warning != warning: last_warning = warning _log.warn(warning) - time.sleep(sleep) + time.sleep(RETRY_INTERVAL_IN_SECONDS) def _yield_elapsed_time(seconds=None): @@ -483,40 +355,19 @@ def _yield_elapsed_time(seconds=None): yield time.time() - start_time -def _get_session(secure, base_url, **kw): +def _get_socketIO_session(secure, base_url, **kw): server_url = '%s://%s/' % ('https' if secure else 'http', base_url) try: - response = requests.get(server_url, **kw) - except requests.exceptions.ConnectionError: - raise SocketIOConnectionError('could not start connection') - status = response.status_code - if 200 != status: - raise SocketIOConnectionError('unexpected status code (%s)' % status) + response = _get_response(requests.get, server_url, **kw) + except _TimeoutError as e: + raise SocketIOConnectionError(e) response_parts = response.text.split(':') - return _Session( + return _SocketIOSession( id=response_parts[0], heartbeat_timeout=int(response_parts[1]), server_supported_transports=response_parts[3].split(',')) -def _negotiate_transport( - client_supported_transports, session, - secure, base_url, **kw): - server_supported_transports = session.server_supported_transports - for supported_transport in client_supported_transports: - if supported_transport in server_supported_transports: - return { - 'websocket': _WebsocketTransport, - # 'xhr-polling': - # 'jsonp-polling': - }[supported_transport](session, secure, base_url, **kw) - raise SocketIOError(' '.join([ - 'could not negotiate a transport:', - 'client supports %s but' % ', '.join(client_supported_transports), - 'server supports %s' % ', '.join(server_supported_transports), - ])) - - if __name__ == '__main__': requests_log = logging.getLogger('requests') requests_log.setLevel(logging.WARNING) diff --git a/socketIO_client/exceptions.py b/socketIO_client/exceptions.py new file mode 100644 index 0000000..a614636 --- /dev/null +++ b/socketIO_client/exceptions.py @@ -0,0 +1,14 @@ +class SocketIOError(Exception): + pass + + +class SocketIOConnectionError(SocketIOError): + pass + + +class _TimeoutError(Exception): + pass + + +class _PacketError(SocketIOError): + pass diff --git a/socketIO_client/tests.py b/socketIO_client/tests.py index d16e7cd..6986694 100644 --- a/socketIO_client/tests.py +++ b/socketIO_client/tests.py @@ -1,7 +1,9 @@ import logging -from socketIO_client import SocketIO, BaseNamespace, find_callback from unittest import TestCase +from . import SocketIO, BaseNamespace, find_callback +from .transports import TIMEOUT_IN_SECONDS + HOST = 'localhost' PORT = 8000 @@ -10,22 +12,21 @@ PAYLOAD = {'xxx': 'yyy'} logging.basicConfig(level=logging.DEBUG) -class TestSocketIO(TestCase): +class BaseMixin(TestCase): def setUp(self): - self.socketIO = SocketIO(HOST, PORT) self.called_on_response = False def tearDown(self): del self.socketIO def on_response(self, *args): - self.called_on_response = True for arg in args: if isinstance(arg, dict): self.assertEqual(arg, PAYLOAD) else: self.assertEqual(arg, DATA) + self.called_on_response = True def test_disconnect(self): 'Disconnect' @@ -42,100 +43,97 @@ class TestSocketIO(TestCase): def test_message(self): 'Message' - self.socketIO.define(Namespace) + namespace = self.socketIO.define(Namespace) self.socketIO.message() - self.socketIO.wait(0.1) - namespace = self.socketIO.get_namespace() + self.socketIO.wait(self.wait_time_in_seconds) self.assertEqual(namespace.response, 'message_response') def test_message_with_data(self): 'Message with data' - self.socketIO.define(Namespace) + namespace = self.socketIO.define(Namespace) self.socketIO.message(DATA) - self.socketIO.wait(0.1) - namespace = self.socketIO.get_namespace() + self.socketIO.wait(self.wait_time_in_seconds) self.assertEqual(namespace.response, DATA) def test_message_with_payload(self): 'Message with payload' - self.socketIO.define(Namespace) + namespace = self.socketIO.define(Namespace) self.socketIO.message(PAYLOAD) - self.socketIO.wait(0.1) - namespace = self.socketIO.get_namespace() + self.socketIO.wait(self.wait_time_in_seconds) self.assertEqual(namespace.response, PAYLOAD) def test_message_with_callback(self): 'Message with callback' self.socketIO.message(callback=self.on_response) - self.socketIO.wait_for_callbacks(seconds=0.1) + self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) self.assertTrue(self.called_on_response) def test_message_with_callback_with_data(self): 'Message with callback with data' self.socketIO.message(DATA, self.on_response) - self.socketIO.wait_for_callbacks(seconds=0.1) + self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) self.assertTrue(self.called_on_response) def test_emit(self): 'Emit' - self.socketIO.define(Namespace) + namespace = self.socketIO.define(Namespace) self.socketIO.emit('emit') - self.socketIO.wait(0.1) - self.assertEqual(self.socketIO.get_namespace().args_by_event, { + self.socketIO.wait(self.wait_time_in_seconds) + self.assertEqual(namespace.args_by_event, { 'emit_response': (), }) def test_emit_with_payload(self): 'Emit with payload' - self.socketIO.define(Namespace) + namespace = self.socketIO.define(Namespace) self.socketIO.emit('emit_with_payload', PAYLOAD) - self.socketIO.wait(0.1) - self.assertEqual(self.socketIO.get_namespace().args_by_event, { + self.socketIO.wait(self.wait_time_in_seconds) + self.assertEqual(namespace.args_by_event, { 'emit_with_payload_response': (PAYLOAD,), }) def test_emit_with_multiple_payloads(self): 'Emit with multiple payloads' - self.socketIO.define(Namespace) + namespace = self.socketIO.define(Namespace) self.socketIO.emit('emit_with_multiple_payloads', PAYLOAD, PAYLOAD) - self.socketIO.wait(0.1) - self.assertEqual(self.socketIO.get_namespace().args_by_event, { + self.socketIO.wait(self.wait_time_in_seconds) + self.assertEqual(namespace.args_by_event, { 'emit_with_multiple_payloads_response': (PAYLOAD, PAYLOAD), }) def test_emit_with_callback(self): 'Emit with callback' self.socketIO.emit('emit_with_callback', self.on_response) - self.socketIO.wait_for_callbacks(seconds=0.1) + self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) self.assertTrue(self.called_on_response) def test_emit_with_callback_with_payload(self): 'Emit with callback with payload' self.socketIO.emit('emit_with_callback_with_payload', self.on_response) - self.socketIO.wait_for_callbacks(seconds=0.1) + self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) self.assertTrue(self.called_on_response) def test_emit_with_callback_with_multiple_payloads(self): 'Emit with callback with multiple payloads' self.socketIO.emit('emit_with_callback_with_multiple_payloads', self.on_response) - self.socketIO.wait_for_callbacks(seconds=0.1) + self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) self.assertTrue(self.called_on_response) def test_emit_with_event(self): 'Emit to trigger an event' self.socketIO.on('emit_with_event_response', self.on_response) self.socketIO.emit('emit_with_event', PAYLOAD) - self.socketIO.wait(0.1) + self.socketIO.wait(self.wait_time_in_seconds) self.assertTrue(self.called_on_response) def test_ack(self): 'Trigger server callback' - self.socketIO.define(Namespace) + namespace = self.socketIO.define(Namespace) self.socketIO.emit('ack', PAYLOAD) - self.socketIO.wait(0.1) - self.assertEqual(self.socketIO.get_namespace().args_by_event, { + self.socketIO.wait(self.wait_time_in_seconds) + self.assertEqual(namespace.args_by_event, { 'ack_response': (PAYLOAD,), 'ack_callback_response': (PAYLOAD,), }) @@ -146,7 +144,7 @@ class TestSocketIO(TestCase): chatNamespace = self.socketIO.define(Namespace, '/chat') newsNamespace = self.socketIO.define(Namespace, '/news') newsNamespace.emit('emit_with_payload', PAYLOAD) - self.socketIO.wait(0.1) + self.socketIO.wait(self.wait_time_in_seconds) self.assertEqual(mainNamespace.args_by_event, {}) self.assertEqual(chatNamespace.args_by_event, {}) self.assertEqual(newsNamespace.args_by_event, { @@ -154,6 +152,30 @@ class TestSocketIO(TestCase): }) +class Test_WebsocketTransport(BaseMixin): + + def setUp(self): + super(Test_WebsocketTransport, self).setUp() + self.socketIO = SocketIO(HOST, PORT, transports=['websocket']) + self.wait_time_in_seconds = 0.1 + + +class Test_XHR_PollingTransport(BaseMixin): + + def setUp(self): + super(Test_XHR_PollingTransport, self).setUp() + self.socketIO = SocketIO(HOST, PORT, transports=['xhr-polling']) + self.wait_time_in_seconds = TIMEOUT_IN_SECONDS + 1 + + +class Test_JSONP_PollingTransport(BaseMixin): + + def setUp(self): + super(Test_JSONP_PollingTransport, self).setUp() + self.socketIO = SocketIO(HOST, PORT, transports=['jsonp-polling']) + self.wait_time_in_seconds = TIMEOUT_IN_SECONDS + 1 + + class Namespace(BaseNamespace): def initialize(self): diff --git a/socketIO_client/transports.py b/socketIO_client/transports.py new file mode 100644 index 0000000..c93ced6 --- /dev/null +++ b/socketIO_client/transports.py @@ -0,0 +1,296 @@ +import json +import logging +import re +import requests +import socket +import time +import websocket +from itertools import izip + +from .exceptions import SocketIOError, SocketIOConnectionError, _TimeoutError + + +TRANSPORTS = 'websocket', 'xhr-polling', 'jsonp-polling' +BOUNDARY = u'\ufffd'.encode('utf-8') +TIMEOUT_IN_SECONDS = 2 +_log = logging.getLogger(__name__) + + +class _AbstractTransport(object): + + def __init__(self): + self._packet_id = 0 + self._callback_by_packet_id = {} + + def disconnect(self, path=''): + if not self.connected: + return + if path: + self.send_packet(0, path) + else: + self.close() + + def connect(self, path): + self.send_packet(1, path) + + def send_heartbeat(self): + self.send_packet(2) + + def message(self, path, data, callback): + if isinstance(data, basestring): + code = 3 + else: + code = 4 + data = json.dumps(data, ensure_ascii=False) + self.send_packet(code, path, data, callback) + + def emit(self, path, event, args, callback): + data = json.dumps(dict(name=event, args=args), ensure_ascii=False) + self.send_packet(5, path, data, callback) + + def ack(self, packet_id, *args): + packet_id = packet_id.rstrip('+') + data = '%s+%s' % ( + packet_id, + json.dumps(args, ensure_ascii=False), + ) if args else packet_id + self.send_packet(6, data=data) + + def noop(self): + self.send_packet(8) + + def send_packet(self, code, path='', data='', callback=None): + packet_id = self.set_ack_callback(callback) if callback else '' + packet_parts = str(code), packet_id, path, data + packet_text = ':'.join(packet_parts) + self.send(packet_text) + _log.debug('[packet sent] %s', packet_text) + + def recv_packet(self): + code, packet_id, path, data = None, None, None, None + for packet_text in self.recv(): + _log.debug('[packet received] %s', packet_text) + try: + packet_parts = packet_text.split(':', 3) + except AttributeError: + _log.warn('[packet error] %s', packet_text) + continue + packet_count = len(packet_parts) + if 4 == packet_count: + code, packet_id, path, data = packet_parts + elif 3 == packet_count: + code, packet_id, path = packet_parts + elif 1 == packet_count: + code = packet_parts[0] + yield code, packet_id, path, data + + def set_ack_callback(self, callback): + 'Set callback to be called after server sends an acknowledgment' + self._packet_id += 1 + self._callback_by_packet_id[str(self._packet_id)] = callback + return '%s+' % self._packet_id + + def get_ack_callback(self, packet_id): + 'Get callback to be called after server sends an acknowledgment' + callback = self._callback_by_packet_id[packet_id] + del self._callback_by_packet_id[packet_id] + return callback + + @property + def has_ack_callback(self): + return True if self._callback_by_packet_id else False + + +class _WebsocketTransport(_AbstractTransport): + + def __init__(self, socketIO_session, secure, base_url, **kw): + super(_WebsocketTransport, self).__init__() + url = '%s://%s/websocket/%s' % ( + 'wss' if secure else 'ws', + base_url, socketIO_session.id) + try: + self._connection = websocket.create_connection(url) + except socket.timeout as e: + raise SocketIOConnectionError(e) + except socket.error as e: + raise SocketIOConnectionError(e) + self._connection.settimeout(TIMEOUT_IN_SECONDS) + + @property + def connected(self): + return self._connection.connected + + def send(self, packet_text): + try: + self._connection.send(packet_text) + except socket.error: + raise SocketIOConnectionError('could not send %s' % packet_text) + + def recv(self): + try: + yield self._connection.recv() + except socket.timeout: + raise _TimeoutError + except socket.error as e: + raise SocketIOConnectionError(e) + except websocket.WebSocketConnectionClosedException as e: + raise SocketIOConnectionError('connection closed (%s)' % e) + + def close(self): + self._connection.close() + + +class _XHR_PollingTransport(_AbstractTransport): + + def __init__(self, socketIO_session, secure, base_url, **kw): + super(_XHR_PollingTransport, self).__init__() + self._url = '%s://%s/xhr-polling/%s' % ( + 'https' if secure else 'http', + base_url, socketIO_session.id) + self._connected = True + self._http_session = _prepare_http_session(kw) + + @property + def connected(self): + return self._connected + + @property + def _params(self): + return dict(t=time.time()) + + def send(self, packet_text): + _get_response( + self._http_session.post, + self._url, + params=self._params, + data=packet_text, + timeout=TIMEOUT_IN_SECONDS) + + def recv(self): + response = _get_response( + self._http_session.get, + self._url, + params=self._params, + timeout=TIMEOUT_IN_SECONDS) + encoded_text = response.text.encode('utf-8') + if not encoded_text.startswith(BOUNDARY): + yield encoded_text.decode('utf-8') + for packet_text in _yield_text_from_framed_data(encoded_text): + yield packet_text + + def close(self): + _get_response( + self._http_session.get, + self._url, + params=dict(self._params.items() + [('disconnect', True)])) + self._connected = False + + +class _JSONP_PollingTransport(_AbstractTransport): + + DATA_PATTERN = re.compile(r'io.j\[(\d+)\]\("(.*)"\);') + + def __init__(self, socketIO_session, secure, base_url, **kw): + super(_JSONP_PollingTransport, self).__init__() + self._url = '%s://%s/jsonp-polling/%s' % ( + 'https' if secure else 'http', + base_url, socketIO_session.id) + self._connected = True + self._http_session = _prepare_http_session(kw) + self._jsonp_id = 0 + + @property + def connected(self): + return self._connected + + @property + def _params(self): + return dict(t=time.time(), jsonp=self._jsonp_id) + + def send(self, packet_text): + _get_response( + self._http_session.post, + self._url, + params=self._params, + data='d=%s' % requests.utils.quote(packet_text), + headers={'content-type': 'application/x-www-form-urlencoded'}, + timeout=TIMEOUT_IN_SECONDS) + + def recv(self): + response = _get_response( + self._http_session.get, + self._url, + params=self._params, + headers={'content-type': 'application/javascript'}, + timeout=TIMEOUT_IN_SECONDS) + encoded_text = response.text.encode('utf-8') + if not encoded_text.startswith(BOUNDARY): + self._jsonp_id, encoded_data = self.DATA_PATTERN.match( + encoded_text).groups() + yield encoded_data.decode('utf-8') + for packet_text in _yield_text_from_framed_data(encoded_text): + yield packet_text + + def close(self): + _get_response( + self._http_session.get, + self._url, + params=dict(self._params.items() + [('disconnect', True)])) + self._connected = False + + +def _negotiate_transport( + client_supported_transports, session, + secure, base_url, **kw): + server_supported_transports = session.server_supported_transports + for supported_transport in client_supported_transports: + if supported_transport in server_supported_transports: + _log.debug('[transport selected] %s', supported_transport) + return { + 'websocket': _WebsocketTransport, + 'xhr-polling': _XHR_PollingTransport, + 'jsonp-polling': _JSONP_PollingTransport, + }[supported_transport](session, secure, base_url, **kw) + raise SocketIOError(' '.join([ + 'could not negotiate a transport:', + 'client supports %s but' % ', '.join(client_supported_transports), + 'server supports %s' % ', '.join(server_supported_transports), + ])) + + +def _yield_text_from_framed_data(framed_data): + parts = [x.decode('utf-8') for x in framed_data.split(BOUNDARY)] + for text_length, text in izip(parts[1::2], parts[2::2]): + if text_length == str(len(text)): + yield text + warning = 'invalid declared length=%s for packet_text=%s' % ( + text_length, text) + _log.warn('[packet error] %s', warning) + + +def _get_response(request, *args, **kw): + try: + response = request(*args, **kw) + except requests.exceptions.Timeout as e: + raise _TimeoutError(e) + except requests.exceptions.ConnectionError as e: + raise SocketIOConnectionError(e) + except requests.exceptions.SSLError as e: + raise SocketIOConnectionError('could not negotiate SSL (%s)' % e) + status = response.status_code + if 200 != status: + raise SocketIOConnectionError('unexpected status code (%s)' % status) + return response + + +def _prepare_http_session(kw): + http_session = requests.Session() + http_session.headers.update(kw.get('headers', {})) + http_session.auth = kw.get('auth') + http_session.proxies.update(kw.get('proxies', {})) + http_session.hooks.update(kw.get('hooks', {})) + http_session.params.update(kw.get('params', {})) + http_session.verify = kw.get('verify') + http_session.cert = kw.get('cert') + http_session.cookies.update(kw.get('cookies', {})) + return http_session