Debugging alternate transports
This commit is contained in:
parent
096d41c072
commit
018e566049
5 changed files with 400 additions and 216 deletions
|
|
@ -15,6 +15,7 @@ var main = io.of('').on('connection', function(socket) {
|
|||
}
|
||||
});
|
||||
socket.on('emit', function() {
|
||||
console.log('hey');
|
||||
socket.emit('emit_response');
|
||||
});
|
||||
socket.on('emit_with_payload', function(payload) {
|
||||
|
|
|
|||
|
|
@ -1,20 +1,21 @@
|
|||
import logging
|
||||
import json
|
||||
import requests
|
||||
import socket
|
||||
import time
|
||||
import websocket
|
||||
from collections import namedtuple
|
||||
|
||||
from .exceptions import SocketIOConnectionError, _TimeoutError, _PacketError
|
||||
from .transports import _get_response, _negotiate_transport, TRANSPORTS
|
||||
|
||||
_Session = namedtuple('_Session', [
|
||||
|
||||
_SocketIOSession = namedtuple('_SocketIOSession', [
|
||||
'id',
|
||||
'heartbeat_timeout',
|
||||
'server_supported_transports',
|
||||
])
|
||||
_log = logging.getLogger(__name__)
|
||||
TRANSPORTS = 'websocket', 'xhr-polling', 'jsonp-polling'
|
||||
PROTOCOL_VERSION = 1
|
||||
RETRY_INTERVAL_IN_SECONDS = 1
|
||||
|
||||
|
||||
class BaseNamespace(object):
|
||||
|
|
@ -157,22 +158,26 @@ class SocketIO(object):
|
|||
|
||||
def wait(self, seconds=None, for_callbacks=False):
|
||||
try:
|
||||
warning_screen = _yield_warning_screen(seconds, sleep=1)
|
||||
warning_screen = _yield_warning_screen(seconds)
|
||||
for elapsed_time in warning_screen:
|
||||
try:
|
||||
if for_callbacks and not self._transport.has_ack_callback:
|
||||
break
|
||||
try:
|
||||
self._process_packet(self._transport.recv_packet())
|
||||
packet = self._transport.recv_packet().next()
|
||||
self._process_packet(packet)
|
||||
except _TimeoutError:
|
||||
pass
|
||||
except _PacketError as error:
|
||||
_log.warn('[packet error] %s', error)
|
||||
except _PacketError as e:
|
||||
_log.warn('[packet error] %s', e)
|
||||
self.heartbeat_pacemaker.send(elapsed_time)
|
||||
except SocketIOConnectionError as error:
|
||||
except SocketIOConnectionError as e:
|
||||
try:
|
||||
warning = Exception('[connection error] %s' % e)
|
||||
warning_screen.throw(warning)
|
||||
except StopIteration:
|
||||
_log.warn(warning)
|
||||
self.disconnect()
|
||||
warning = Exception('[connection error] %s' % error)
|
||||
warning_screen.throw(warning)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
|
|
@ -198,29 +203,33 @@ class SocketIO(object):
|
|||
return self.__transport
|
||||
except AttributeError:
|
||||
pass
|
||||
warning_screen = _yield_warning_screen(seconds=None, sleep=1)
|
||||
warning_screen = _yield_warning_screen(seconds=None)
|
||||
for elapsed_time in warning_screen:
|
||||
try:
|
||||
self.__transport = self._get_transport()
|
||||
break
|
||||
except SocketIOConnectionError as error:
|
||||
except SocketIOConnectionError as e:
|
||||
if not self.wait_for_connection:
|
||||
raise
|
||||
warning = Exception('[waiting for connection] %s' % error)
|
||||
warning_screen.throw(warning)
|
||||
try:
|
||||
warning = Exception('[waiting for connection] %s' % e)
|
||||
warning_screen.throw(warning)
|
||||
except StopIteration:
|
||||
_log.warn(warning)
|
||||
return self.__transport
|
||||
|
||||
def _get_transport(self):
|
||||
self.session = _get_session(self.secure, self.base_url, **self.kw)
|
||||
socketIO_session = _get_socketIO_session(
|
||||
self.secure, self.base_url, **self.kw)
|
||||
_log.debug('[transports available] %s', ' '.join(
|
||||
self.session.server_supported_transports))
|
||||
socketIO_session.server_supported_transports))
|
||||
# Initialize heartbeat_pacemaker
|
||||
self.heartbeat_pacemaker = self._make_heartbeat_pacemaker(
|
||||
heartbeat_interval=self.session.heartbeat_timeout - 2)
|
||||
heartbeat_interval=socketIO_session.heartbeat_timeout - 2)
|
||||
self.heartbeat_pacemaker.next()
|
||||
# Negotiate transport
|
||||
transport = _negotiate_transport(
|
||||
self.client_supported_transports, self.session,
|
||||
self.client_supported_transports, socketIO_session,
|
||||
self.secure, self.base_url, **self.kw)
|
||||
# Update namespaces
|
||||
for namespace in self._namespace_by_path.values():
|
||||
|
|
@ -314,143 +323,6 @@ class SocketIO(object):
|
|||
return lambda *args: self._transport.ack(packet_id, *args)
|
||||
|
||||
|
||||
class SocketIOError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class _TimeoutError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class _PacketError(SocketIOError):
|
||||
pass
|
||||
|
||||
|
||||
class SocketIOConnectionError(SocketIOError):
|
||||
pass
|
||||
|
||||
|
||||
class _AbstractTransport(object):
|
||||
|
||||
def __init__(self):
|
||||
self._packet_id = 0
|
||||
self._callback_by_packet_id = {}
|
||||
|
||||
def disconnect(self, path=''):
|
||||
if not self.connected:
|
||||
return
|
||||
if path:
|
||||
self.send_packet(0, path)
|
||||
else:
|
||||
self.connection.close()
|
||||
|
||||
def connect(self, path):
|
||||
self.send_packet(1, path)
|
||||
|
||||
def send_heartbeat(self):
|
||||
self.send_packet(2)
|
||||
|
||||
def message(self, path, data, callback):
|
||||
if isinstance(data, basestring):
|
||||
code = 3
|
||||
else:
|
||||
code = 4
|
||||
data = json.dumps(data, ensure_ascii=False)
|
||||
self.send_packet(code, path, data, callback)
|
||||
|
||||
def emit(self, path, event, args, callback):
|
||||
data = json.dumps(dict(name=event, args=args), ensure_ascii=False)
|
||||
self.send_packet(5, path, data, callback)
|
||||
|
||||
def ack(self, packet_id, *args):
|
||||
packet_id = packet_id.rstrip('+')
|
||||
data = '%s+%s' % (
|
||||
packet_id,
|
||||
json.dumps(args, ensure_ascii=False),
|
||||
) if args else packet_id
|
||||
self.send_packet(6, data=data)
|
||||
|
||||
def noop(self):
|
||||
self.send_packet(8)
|
||||
|
||||
def send_packet(self, code, path='', data='', callback=None):
|
||||
packet_id = self.set_ack_callback(callback) if callback else ''
|
||||
packet_parts = str(code), packet_id, path, data
|
||||
packet_text = ':'.join(packet_parts)
|
||||
self.send(packet_text)
|
||||
_log.debug('[packet sent] %s', packet_text)
|
||||
|
||||
def recv_packet(self):
|
||||
code, packet_id, path, data = None, None, None, None
|
||||
packet_text = self.recv()
|
||||
_log.debug('[packet received] %s', packet_text)
|
||||
try:
|
||||
packet_parts = packet_text.split(':', 3)
|
||||
except AttributeError:
|
||||
raise _PacketError('invalid packet (%s)' % packet_text)
|
||||
packet_count = len(packet_parts)
|
||||
if 4 == packet_count:
|
||||
code, packet_id, path, data = packet_parts
|
||||
elif 3 == packet_count:
|
||||
code, packet_id, path = packet_parts
|
||||
elif 1 == packet_count:
|
||||
code = packet_parts[0]
|
||||
return code, packet_id, path, data
|
||||
|
||||
def set_ack_callback(self, callback):
|
||||
'Set callback to be called after server sends an acknowledgment'
|
||||
self._packet_id += 1
|
||||
self._callback_by_packet_id[str(self._packet_id)] = callback
|
||||
return '%s+' % self._packet_id
|
||||
|
||||
def get_ack_callback(self, packet_id):
|
||||
'Get callback to be called after server sends an acknowledgment'
|
||||
callback = self._callback_by_packet_id[packet_id]
|
||||
del self._callback_by_packet_id[packet_id]
|
||||
return callback
|
||||
|
||||
@property
|
||||
def has_ack_callback(self):
|
||||
return True if self._callback_by_packet_id else False
|
||||
|
||||
|
||||
class _WebsocketTransport(_AbstractTransport):
|
||||
|
||||
def __init__(self, session, secure, base_url, **kw):
|
||||
super(_WebsocketTransport, self).__init__()
|
||||
url = '%s://%s/websocket/%s' % (
|
||||
'wss' if secure else 'ws',
|
||||
base_url, session.id)
|
||||
_log.debug('[transport selected] %s', url)
|
||||
try:
|
||||
self.connection = websocket.create_connection(url)
|
||||
except socket.timeout as error:
|
||||
raise SocketIOConnectionError(error)
|
||||
except socket.error as error:
|
||||
raise SocketIOConnectionError(error)
|
||||
self.connection.settimeout(1)
|
||||
|
||||
@property
|
||||
def connected(self):
|
||||
return self.connection.connected
|
||||
|
||||
def recv(self):
|
||||
try:
|
||||
return self.connection.recv()
|
||||
except socket.timeout:
|
||||
raise _TimeoutError
|
||||
except socket.error as error:
|
||||
raise SocketIOConnectionError(error)
|
||||
except websocket.WebSocketConnectionClosedException:
|
||||
raise SocketIOConnectionError('server closed connection')
|
||||
|
||||
def send(self, packet_text):
|
||||
try:
|
||||
self.connection.send(packet_text)
|
||||
except socket.error:
|
||||
raise SocketIOConnectionError('could not send %s' % packet_text)
|
||||
|
||||
|
||||
def find_callback(args, kw=None):
|
||||
'Return callback whether passed as a last argument or as a keyword'
|
||||
if args and callable(args[-1]):
|
||||
|
|
@ -461,7 +333,7 @@ def find_callback(args, kw=None):
|
|||
return None, args
|
||||
|
||||
|
||||
def _yield_warning_screen(seconds=None, sleep=0):
|
||||
def _yield_warning_screen(seconds=None):
|
||||
last_warning = None
|
||||
for elapsed_time in _yield_elapsed_time(seconds):
|
||||
try:
|
||||
|
|
@ -471,7 +343,7 @@ def _yield_warning_screen(seconds=None, sleep=0):
|
|||
if last_warning != warning:
|
||||
last_warning = warning
|
||||
_log.warn(warning)
|
||||
time.sleep(sleep)
|
||||
time.sleep(RETRY_INTERVAL_IN_SECONDS)
|
||||
|
||||
|
||||
def _yield_elapsed_time(seconds=None):
|
||||
|
|
@ -483,40 +355,19 @@ def _yield_elapsed_time(seconds=None):
|
|||
yield time.time() - start_time
|
||||
|
||||
|
||||
def _get_session(secure, base_url, **kw):
|
||||
def _get_socketIO_session(secure, base_url, **kw):
|
||||
server_url = '%s://%s/' % ('https' if secure else 'http', base_url)
|
||||
try:
|
||||
response = requests.get(server_url, **kw)
|
||||
except requests.exceptions.ConnectionError:
|
||||
raise SocketIOConnectionError('could not start connection')
|
||||
status = response.status_code
|
||||
if 200 != status:
|
||||
raise SocketIOConnectionError('unexpected status code (%s)' % status)
|
||||
response = _get_response(requests.get, server_url, **kw)
|
||||
except _TimeoutError as e:
|
||||
raise SocketIOConnectionError(e)
|
||||
response_parts = response.text.split(':')
|
||||
return _Session(
|
||||
return _SocketIOSession(
|
||||
id=response_parts[0],
|
||||
heartbeat_timeout=int(response_parts[1]),
|
||||
server_supported_transports=response_parts[3].split(','))
|
||||
|
||||
|
||||
def _negotiate_transport(
|
||||
client_supported_transports, session,
|
||||
secure, base_url, **kw):
|
||||
server_supported_transports = session.server_supported_transports
|
||||
for supported_transport in client_supported_transports:
|
||||
if supported_transport in server_supported_transports:
|
||||
return {
|
||||
'websocket': _WebsocketTransport,
|
||||
# 'xhr-polling':
|
||||
# 'jsonp-polling':
|
||||
}[supported_transport](session, secure, base_url, **kw)
|
||||
raise SocketIOError(' '.join([
|
||||
'could not negotiate a transport:',
|
||||
'client supports %s but' % ', '.join(client_supported_transports),
|
||||
'server supports %s' % ', '.join(server_supported_transports),
|
||||
]))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
requests_log = logging.getLogger('requests')
|
||||
requests_log.setLevel(logging.WARNING)
|
||||
|
|
|
|||
14
socketIO_client/exceptions.py
Normal file
14
socketIO_client/exceptions.py
Normal file
|
|
@ -0,0 +1,14 @@
|
|||
class SocketIOError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class SocketIOConnectionError(SocketIOError):
|
||||
pass
|
||||
|
||||
|
||||
class _TimeoutError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class _PacketError(SocketIOError):
|
||||
pass
|
||||
|
|
@ -1,7 +1,9 @@
|
|||
import logging
|
||||
from socketIO_client import SocketIO, BaseNamespace, find_callback
|
||||
from unittest import TestCase
|
||||
|
||||
from . import SocketIO, BaseNamespace, find_callback
|
||||
from .transports import TIMEOUT_IN_SECONDS
|
||||
|
||||
|
||||
HOST = 'localhost'
|
||||
PORT = 8000
|
||||
|
|
@ -10,22 +12,21 @@ PAYLOAD = {'xxx': 'yyy'}
|
|||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
|
||||
class TestSocketIO(TestCase):
|
||||
class BaseMixin(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.socketIO = SocketIO(HOST, PORT)
|
||||
self.called_on_response = False
|
||||
|
||||
def tearDown(self):
|
||||
del self.socketIO
|
||||
|
||||
def on_response(self, *args):
|
||||
self.called_on_response = True
|
||||
for arg in args:
|
||||
if isinstance(arg, dict):
|
||||
self.assertEqual(arg, PAYLOAD)
|
||||
else:
|
||||
self.assertEqual(arg, DATA)
|
||||
self.called_on_response = True
|
||||
|
||||
def test_disconnect(self):
|
||||
'Disconnect'
|
||||
|
|
@ -42,100 +43,97 @@ class TestSocketIO(TestCase):
|
|||
|
||||
def test_message(self):
|
||||
'Message'
|
||||
self.socketIO.define(Namespace)
|
||||
namespace = self.socketIO.define(Namespace)
|
||||
self.socketIO.message()
|
||||
self.socketIO.wait(0.1)
|
||||
namespace = self.socketIO.get_namespace()
|
||||
self.socketIO.wait(self.wait_time_in_seconds)
|
||||
self.assertEqual(namespace.response, 'message_response')
|
||||
|
||||
def test_message_with_data(self):
|
||||
'Message with data'
|
||||
self.socketIO.define(Namespace)
|
||||
namespace = self.socketIO.define(Namespace)
|
||||
self.socketIO.message(DATA)
|
||||
self.socketIO.wait(0.1)
|
||||
namespace = self.socketIO.get_namespace()
|
||||
self.socketIO.wait(self.wait_time_in_seconds)
|
||||
self.assertEqual(namespace.response, DATA)
|
||||
|
||||
def test_message_with_payload(self):
|
||||
'Message with payload'
|
||||
self.socketIO.define(Namespace)
|
||||
namespace = self.socketIO.define(Namespace)
|
||||
self.socketIO.message(PAYLOAD)
|
||||
self.socketIO.wait(0.1)
|
||||
namespace = self.socketIO.get_namespace()
|
||||
self.socketIO.wait(self.wait_time_in_seconds)
|
||||
self.assertEqual(namespace.response, PAYLOAD)
|
||||
|
||||
def test_message_with_callback(self):
|
||||
'Message with callback'
|
||||
self.socketIO.message(callback=self.on_response)
|
||||
self.socketIO.wait_for_callbacks(seconds=0.1)
|
||||
self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds)
|
||||
self.assertTrue(self.called_on_response)
|
||||
|
||||
def test_message_with_callback_with_data(self):
|
||||
'Message with callback with data'
|
||||
self.socketIO.message(DATA, self.on_response)
|
||||
self.socketIO.wait_for_callbacks(seconds=0.1)
|
||||
self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds)
|
||||
self.assertTrue(self.called_on_response)
|
||||
|
||||
def test_emit(self):
|
||||
'Emit'
|
||||
self.socketIO.define(Namespace)
|
||||
namespace = self.socketIO.define(Namespace)
|
||||
self.socketIO.emit('emit')
|
||||
self.socketIO.wait(0.1)
|
||||
self.assertEqual(self.socketIO.get_namespace().args_by_event, {
|
||||
self.socketIO.wait(self.wait_time_in_seconds)
|
||||
self.assertEqual(namespace.args_by_event, {
|
||||
'emit_response': (),
|
||||
})
|
||||
|
||||
def test_emit_with_payload(self):
|
||||
'Emit with payload'
|
||||
self.socketIO.define(Namespace)
|
||||
namespace = self.socketIO.define(Namespace)
|
||||
self.socketIO.emit('emit_with_payload', PAYLOAD)
|
||||
self.socketIO.wait(0.1)
|
||||
self.assertEqual(self.socketIO.get_namespace().args_by_event, {
|
||||
self.socketIO.wait(self.wait_time_in_seconds)
|
||||
self.assertEqual(namespace.args_by_event, {
|
||||
'emit_with_payload_response': (PAYLOAD,),
|
||||
})
|
||||
|
||||
def test_emit_with_multiple_payloads(self):
|
||||
'Emit with multiple payloads'
|
||||
self.socketIO.define(Namespace)
|
||||
namespace = self.socketIO.define(Namespace)
|
||||
self.socketIO.emit('emit_with_multiple_payloads', PAYLOAD, PAYLOAD)
|
||||
self.socketIO.wait(0.1)
|
||||
self.assertEqual(self.socketIO.get_namespace().args_by_event, {
|
||||
self.socketIO.wait(self.wait_time_in_seconds)
|
||||
self.assertEqual(namespace.args_by_event, {
|
||||
'emit_with_multiple_payloads_response': (PAYLOAD, PAYLOAD),
|
||||
})
|
||||
|
||||
def test_emit_with_callback(self):
|
||||
'Emit with callback'
|
||||
self.socketIO.emit('emit_with_callback', self.on_response)
|
||||
self.socketIO.wait_for_callbacks(seconds=0.1)
|
||||
self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds)
|
||||
self.assertTrue(self.called_on_response)
|
||||
|
||||
def test_emit_with_callback_with_payload(self):
|
||||
'Emit with callback with payload'
|
||||
self.socketIO.emit('emit_with_callback_with_payload',
|
||||
self.on_response)
|
||||
self.socketIO.wait_for_callbacks(seconds=0.1)
|
||||
self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds)
|
||||
self.assertTrue(self.called_on_response)
|
||||
|
||||
def test_emit_with_callback_with_multiple_payloads(self):
|
||||
'Emit with callback with multiple payloads'
|
||||
self.socketIO.emit('emit_with_callback_with_multiple_payloads',
|
||||
self.on_response)
|
||||
self.socketIO.wait_for_callbacks(seconds=0.1)
|
||||
self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds)
|
||||
self.assertTrue(self.called_on_response)
|
||||
|
||||
def test_emit_with_event(self):
|
||||
'Emit to trigger an event'
|
||||
self.socketIO.on('emit_with_event_response', self.on_response)
|
||||
self.socketIO.emit('emit_with_event', PAYLOAD)
|
||||
self.socketIO.wait(0.1)
|
||||
self.socketIO.wait(self.wait_time_in_seconds)
|
||||
self.assertTrue(self.called_on_response)
|
||||
|
||||
def test_ack(self):
|
||||
'Trigger server callback'
|
||||
self.socketIO.define(Namespace)
|
||||
namespace = self.socketIO.define(Namespace)
|
||||
self.socketIO.emit('ack', PAYLOAD)
|
||||
self.socketIO.wait(0.1)
|
||||
self.assertEqual(self.socketIO.get_namespace().args_by_event, {
|
||||
self.socketIO.wait(self.wait_time_in_seconds)
|
||||
self.assertEqual(namespace.args_by_event, {
|
||||
'ack_response': (PAYLOAD,),
|
||||
'ack_callback_response': (PAYLOAD,),
|
||||
})
|
||||
|
|
@ -146,7 +144,7 @@ class TestSocketIO(TestCase):
|
|||
chatNamespace = self.socketIO.define(Namespace, '/chat')
|
||||
newsNamespace = self.socketIO.define(Namespace, '/news')
|
||||
newsNamespace.emit('emit_with_payload', PAYLOAD)
|
||||
self.socketIO.wait(0.1)
|
||||
self.socketIO.wait(self.wait_time_in_seconds)
|
||||
self.assertEqual(mainNamespace.args_by_event, {})
|
||||
self.assertEqual(chatNamespace.args_by_event, {})
|
||||
self.assertEqual(newsNamespace.args_by_event, {
|
||||
|
|
@ -154,6 +152,30 @@ class TestSocketIO(TestCase):
|
|||
})
|
||||
|
||||
|
||||
class Test_WebsocketTransport(BaseMixin):
|
||||
|
||||
def setUp(self):
|
||||
super(Test_WebsocketTransport, self).setUp()
|
||||
self.socketIO = SocketIO(HOST, PORT, transports=['websocket'])
|
||||
self.wait_time_in_seconds = 0.1
|
||||
|
||||
|
||||
class Test_XHR_PollingTransport(BaseMixin):
|
||||
|
||||
def setUp(self):
|
||||
super(Test_XHR_PollingTransport, self).setUp()
|
||||
self.socketIO = SocketIO(HOST, PORT, transports=['xhr-polling'])
|
||||
self.wait_time_in_seconds = TIMEOUT_IN_SECONDS + 1
|
||||
|
||||
|
||||
class Test_JSONP_PollingTransport(BaseMixin):
|
||||
|
||||
def setUp(self):
|
||||
super(Test_JSONP_PollingTransport, self).setUp()
|
||||
self.socketIO = SocketIO(HOST, PORT, transports=['jsonp-polling'])
|
||||
self.wait_time_in_seconds = TIMEOUT_IN_SECONDS + 1
|
||||
|
||||
|
||||
class Namespace(BaseNamespace):
|
||||
|
||||
def initialize(self):
|
||||
|
|
|
|||
296
socketIO_client/transports.py
Normal file
296
socketIO_client/transports.py
Normal file
|
|
@ -0,0 +1,296 @@
|
|||
import json
|
||||
import logging
|
||||
import re
|
||||
import requests
|
||||
import socket
|
||||
import time
|
||||
import websocket
|
||||
from itertools import izip
|
||||
|
||||
from .exceptions import SocketIOError, SocketIOConnectionError, _TimeoutError
|
||||
|
||||
|
||||
TRANSPORTS = 'websocket', 'xhr-polling', 'jsonp-polling'
|
||||
BOUNDARY = u'\ufffd'.encode('utf-8')
|
||||
TIMEOUT_IN_SECONDS = 2
|
||||
_log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class _AbstractTransport(object):
|
||||
|
||||
def __init__(self):
|
||||
self._packet_id = 0
|
||||
self._callback_by_packet_id = {}
|
||||
|
||||
def disconnect(self, path=''):
|
||||
if not self.connected:
|
||||
return
|
||||
if path:
|
||||
self.send_packet(0, path)
|
||||
else:
|
||||
self.close()
|
||||
|
||||
def connect(self, path):
|
||||
self.send_packet(1, path)
|
||||
|
||||
def send_heartbeat(self):
|
||||
self.send_packet(2)
|
||||
|
||||
def message(self, path, data, callback):
|
||||
if isinstance(data, basestring):
|
||||
code = 3
|
||||
else:
|
||||
code = 4
|
||||
data = json.dumps(data, ensure_ascii=False)
|
||||
self.send_packet(code, path, data, callback)
|
||||
|
||||
def emit(self, path, event, args, callback):
|
||||
data = json.dumps(dict(name=event, args=args), ensure_ascii=False)
|
||||
self.send_packet(5, path, data, callback)
|
||||
|
||||
def ack(self, packet_id, *args):
|
||||
packet_id = packet_id.rstrip('+')
|
||||
data = '%s+%s' % (
|
||||
packet_id,
|
||||
json.dumps(args, ensure_ascii=False),
|
||||
) if args else packet_id
|
||||
self.send_packet(6, data=data)
|
||||
|
||||
def noop(self):
|
||||
self.send_packet(8)
|
||||
|
||||
def send_packet(self, code, path='', data='', callback=None):
|
||||
packet_id = self.set_ack_callback(callback) if callback else ''
|
||||
packet_parts = str(code), packet_id, path, data
|
||||
packet_text = ':'.join(packet_parts)
|
||||
self.send(packet_text)
|
||||
_log.debug('[packet sent] %s', packet_text)
|
||||
|
||||
def recv_packet(self):
|
||||
code, packet_id, path, data = None, None, None, None
|
||||
for packet_text in self.recv():
|
||||
_log.debug('[packet received] %s', packet_text)
|
||||
try:
|
||||
packet_parts = packet_text.split(':', 3)
|
||||
except AttributeError:
|
||||
_log.warn('[packet error] %s', packet_text)
|
||||
continue
|
||||
packet_count = len(packet_parts)
|
||||
if 4 == packet_count:
|
||||
code, packet_id, path, data = packet_parts
|
||||
elif 3 == packet_count:
|
||||
code, packet_id, path = packet_parts
|
||||
elif 1 == packet_count:
|
||||
code = packet_parts[0]
|
||||
yield code, packet_id, path, data
|
||||
|
||||
def set_ack_callback(self, callback):
|
||||
'Set callback to be called after server sends an acknowledgment'
|
||||
self._packet_id += 1
|
||||
self._callback_by_packet_id[str(self._packet_id)] = callback
|
||||
return '%s+' % self._packet_id
|
||||
|
||||
def get_ack_callback(self, packet_id):
|
||||
'Get callback to be called after server sends an acknowledgment'
|
||||
callback = self._callback_by_packet_id[packet_id]
|
||||
del self._callback_by_packet_id[packet_id]
|
||||
return callback
|
||||
|
||||
@property
|
||||
def has_ack_callback(self):
|
||||
return True if self._callback_by_packet_id else False
|
||||
|
||||
|
||||
class _WebsocketTransport(_AbstractTransport):
|
||||
|
||||
def __init__(self, socketIO_session, secure, base_url, **kw):
|
||||
super(_WebsocketTransport, self).__init__()
|
||||
url = '%s://%s/websocket/%s' % (
|
||||
'wss' if secure else 'ws',
|
||||
base_url, socketIO_session.id)
|
||||
try:
|
||||
self._connection = websocket.create_connection(url)
|
||||
except socket.timeout as e:
|
||||
raise SocketIOConnectionError(e)
|
||||
except socket.error as e:
|
||||
raise SocketIOConnectionError(e)
|
||||
self._connection.settimeout(TIMEOUT_IN_SECONDS)
|
||||
|
||||
@property
|
||||
def connected(self):
|
||||
return self._connection.connected
|
||||
|
||||
def send(self, packet_text):
|
||||
try:
|
||||
self._connection.send(packet_text)
|
||||
except socket.error:
|
||||
raise SocketIOConnectionError('could not send %s' % packet_text)
|
||||
|
||||
def recv(self):
|
||||
try:
|
||||
yield self._connection.recv()
|
||||
except socket.timeout:
|
||||
raise _TimeoutError
|
||||
except socket.error as e:
|
||||
raise SocketIOConnectionError(e)
|
||||
except websocket.WebSocketConnectionClosedException as e:
|
||||
raise SocketIOConnectionError('connection closed (%s)' % e)
|
||||
|
||||
def close(self):
|
||||
self._connection.close()
|
||||
|
||||
|
||||
class _XHR_PollingTransport(_AbstractTransport):
|
||||
|
||||
def __init__(self, socketIO_session, secure, base_url, **kw):
|
||||
super(_XHR_PollingTransport, self).__init__()
|
||||
self._url = '%s://%s/xhr-polling/%s' % (
|
||||
'https' if secure else 'http',
|
||||
base_url, socketIO_session.id)
|
||||
self._connected = True
|
||||
self._http_session = _prepare_http_session(kw)
|
||||
|
||||
@property
|
||||
def connected(self):
|
||||
return self._connected
|
||||
|
||||
@property
|
||||
def _params(self):
|
||||
return dict(t=time.time())
|
||||
|
||||
def send(self, packet_text):
|
||||
_get_response(
|
||||
self._http_session.post,
|
||||
self._url,
|
||||
params=self._params,
|
||||
data=packet_text,
|
||||
timeout=TIMEOUT_IN_SECONDS)
|
||||
|
||||
def recv(self):
|
||||
response = _get_response(
|
||||
self._http_session.get,
|
||||
self._url,
|
||||
params=self._params,
|
||||
timeout=TIMEOUT_IN_SECONDS)
|
||||
encoded_text = response.text.encode('utf-8')
|
||||
if not encoded_text.startswith(BOUNDARY):
|
||||
yield encoded_text.decode('utf-8')
|
||||
for packet_text in _yield_text_from_framed_data(encoded_text):
|
||||
yield packet_text
|
||||
|
||||
def close(self):
|
||||
_get_response(
|
||||
self._http_session.get,
|
||||
self._url,
|
||||
params=dict(self._params.items() + [('disconnect', True)]))
|
||||
self._connected = False
|
||||
|
||||
|
||||
class _JSONP_PollingTransport(_AbstractTransport):
|
||||
|
||||
DATA_PATTERN = re.compile(r'io.j\[(\d+)\]\("(.*)"\);')
|
||||
|
||||
def __init__(self, socketIO_session, secure, base_url, **kw):
|
||||
super(_JSONP_PollingTransport, self).__init__()
|
||||
self._url = '%s://%s/jsonp-polling/%s' % (
|
||||
'https' if secure else 'http',
|
||||
base_url, socketIO_session.id)
|
||||
self._connected = True
|
||||
self._http_session = _prepare_http_session(kw)
|
||||
self._jsonp_id = 0
|
||||
|
||||
@property
|
||||
def connected(self):
|
||||
return self._connected
|
||||
|
||||
@property
|
||||
def _params(self):
|
||||
return dict(t=time.time(), jsonp=self._jsonp_id)
|
||||
|
||||
def send(self, packet_text):
|
||||
_get_response(
|
||||
self._http_session.post,
|
||||
self._url,
|
||||
params=self._params,
|
||||
data='d=%s' % requests.utils.quote(packet_text),
|
||||
headers={'content-type': 'application/x-www-form-urlencoded'},
|
||||
timeout=TIMEOUT_IN_SECONDS)
|
||||
|
||||
def recv(self):
|
||||
response = _get_response(
|
||||
self._http_session.get,
|
||||
self._url,
|
||||
params=self._params,
|
||||
headers={'content-type': 'application/javascript'},
|
||||
timeout=TIMEOUT_IN_SECONDS)
|
||||
encoded_text = response.text.encode('utf-8')
|
||||
if not encoded_text.startswith(BOUNDARY):
|
||||
self._jsonp_id, encoded_data = self.DATA_PATTERN.match(
|
||||
encoded_text).groups()
|
||||
yield encoded_data.decode('utf-8')
|
||||
for packet_text in _yield_text_from_framed_data(encoded_text):
|
||||
yield packet_text
|
||||
|
||||
def close(self):
|
||||
_get_response(
|
||||
self._http_session.get,
|
||||
self._url,
|
||||
params=dict(self._params.items() + [('disconnect', True)]))
|
||||
self._connected = False
|
||||
|
||||
|
||||
def _negotiate_transport(
|
||||
client_supported_transports, session,
|
||||
secure, base_url, **kw):
|
||||
server_supported_transports = session.server_supported_transports
|
||||
for supported_transport in client_supported_transports:
|
||||
if supported_transport in server_supported_transports:
|
||||
_log.debug('[transport selected] %s', supported_transport)
|
||||
return {
|
||||
'websocket': _WebsocketTransport,
|
||||
'xhr-polling': _XHR_PollingTransport,
|
||||
'jsonp-polling': _JSONP_PollingTransport,
|
||||
}[supported_transport](session, secure, base_url, **kw)
|
||||
raise SocketIOError(' '.join([
|
||||
'could not negotiate a transport:',
|
||||
'client supports %s but' % ', '.join(client_supported_transports),
|
||||
'server supports %s' % ', '.join(server_supported_transports),
|
||||
]))
|
||||
|
||||
|
||||
def _yield_text_from_framed_data(framed_data):
|
||||
parts = [x.decode('utf-8') for x in framed_data.split(BOUNDARY)]
|
||||
for text_length, text in izip(parts[1::2], parts[2::2]):
|
||||
if text_length == str(len(text)):
|
||||
yield text
|
||||
warning = 'invalid declared length=%s for packet_text=%s' % (
|
||||
text_length, text)
|
||||
_log.warn('[packet error] %s', warning)
|
||||
|
||||
|
||||
def _get_response(request, *args, **kw):
|
||||
try:
|
||||
response = request(*args, **kw)
|
||||
except requests.exceptions.Timeout as e:
|
||||
raise _TimeoutError(e)
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
raise SocketIOConnectionError(e)
|
||||
except requests.exceptions.SSLError as e:
|
||||
raise SocketIOConnectionError('could not negotiate SSL (%s)' % e)
|
||||
status = response.status_code
|
||||
if 200 != status:
|
||||
raise SocketIOConnectionError('unexpected status code (%s)' % status)
|
||||
return response
|
||||
|
||||
|
||||
def _prepare_http_session(kw):
|
||||
http_session = requests.Session()
|
||||
http_session.headers.update(kw.get('headers', {}))
|
||||
http_session.auth = kw.get('auth')
|
||||
http_session.proxies.update(kw.get('proxies', {}))
|
||||
http_session.hooks.update(kw.get('hooks', {}))
|
||||
http_session.params.update(kw.get('params', {}))
|
||||
http_session.verify = kw.get('verify')
|
||||
http_session.cert = kw.get('cert')
|
||||
http_session.cookies.update(kw.get('cookies', {}))
|
||||
return http_session
|
||||
Loading…
Add table
Add a link
Reference in a new issue