Start separating transport

This commit is contained in:
Roy Hyunjin Han 2015-02-20 15:38:53 -05:00
commit 96014d3d53
6 changed files with 374 additions and 95 deletions

View file

@ -1,6 +1,4 @@
Release 0.6.1 #41 #52
Implement on
Send pong
Change print statements into logging statements
Clean up

View file

@ -6,14 +6,13 @@ import time
from collections import namedtuple
from .compat import get_byte, get_character, get_unicode
from .exceptions import PacketError, TimeoutError
from .exceptions import ConnectionError, TimeoutError, PacketError
from .transports import _get_response
__version__ = '0.6.1'
_log = logging.getLogger(__name__)
EngineIOData = namedtuple('EngineIOData', ['data'])
SocketIOData = namedtuple('SocketIOData', ['path', 'ack_id', 'event', 'args'])
SocketIOData = namedtuple('SocketIOData', ['path', 'ack_id', 'args'])
TRANSPORTS = []
RETRY_INTERVAL_IN_SECONDS = 1
@ -27,18 +26,40 @@ class EngineIONamespace(object):
self.initialize()
def initialize(self):
'Initialize custom variables here; you can override this method'
pass
"""Initialize custom variables here.
You can override this method."""
def on_event(self, event, *args):
"""
Called after server sends an event; you can override this method.
Called only if a custom event handler does not exist,
such as one defined by namespace.on('my_event', my_function).
"""
callback, args = find_callback(args)
if callback:
callback(*args)
def on(self, event, callback):
'Define a callback to handle an event emitted by the server'
self._callback_by_event[event] = callback
def on_open(self):
"""Called after engine.io connects.
You can override this method."""
def on_close(self):
"""Called after engine.io disconnects.
You can override this method."""
def on_ping(self, data):
"""Called after engine.io sends a ping packet.
You can override this method."""
def on_pong(self, data):
"""Called after engine.io sends a pong packet.
You can override this method."""
def on_message(self, data):
"""Called after engine.io sends a message packet.
You can override this method."""
def on_upgrade(self):
"""Called after engine.io sends an upgrade packet.
You can override this method."""
def on_noop(self):
"""Called after engine.io sends a noop packet.
You can override this method."""
def _find_packet_callback(self, event):
# Check callbacks defined by on()
@ -46,10 +67,8 @@ class EngineIONamespace(object):
return self._callback_by_event[event]
except KeyError:
pass
# Check callbacks defined explicitly or use on_event()
return getattr(
self, 'on_' + event.replace(' ', '_'),
lambda *args: self.on_event(event, *args))
# Check callbacks defined explicitly
return getattr(self, 'on_' + event)
class SocketIONamespace(EngineIONamespace):
@ -60,13 +79,45 @@ class SocketIONamespace(EngineIONamespace):
super(SocketIONamespace, self).__init__(io)
def on_connect(self):
'Called after server connects; you can override this method'
"""Called after socket.io connects.
You can override this method."""
def on_reconnect(self):
'Called after server reconnects; you can override this method'
"""Called after socket.io reconnects.
You can override this method."""
def on_disconnect(self):
'Called after server disconnects; you can override this method'
"""Called after socket.io disconnects.
You can override this method."""
def on_event(self, event, *args):
"""
Called if there is no matching event handler.
You can override this method.
There are three ways to define an event handler:
- Call socketIO.on()
socketIO = SocketIO('localhost', 8000)
socketIO.on('my_event', my_function)
- Call namespace.on()
namespace = socketIO.get_namespace()
namespace.on('my_event', my_function)
- Define namespace.on_xxx
class Namespace(SocketIONamespace):
def on_my_event(self, *args):
my_function(*args)
socketIO.define(Namespace)"""
def on_error(self, data):
"""Called after socket.io sends an error packet.
You can override this method."""
def _find_packet_callback(self, event):
# Interpret events
@ -75,7 +126,15 @@ class SocketIONamespace(EngineIONamespace):
self._was_connected = True
else:
event = 'reconnect'
return super(SocketIONamespace, self)._find_packet_callback(event)
# Check callbacks defined by on()
try:
return self._callback_by_event[event]
except KeyError:
pass
# Check callbacks defined explicitly or use on_event()
return getattr(
self, 'on_' + event.replace(' ', '_'),
lambda *args: self.on_event(event, *args))
class LoggingMixin(object):
@ -134,21 +193,17 @@ class EngineIO(object):
resource='engine.io', **kw):
self._url = 'http://%s:%s/%s/' % (host, port, resource)
self._http_session = requests.Session()
print(self._url)
response = self._http_session.get(self._url, params={
'EIO': self._engineIO_protocol,
'transport': 'polling',
't': self._get_timestamp(),
})
print(response.url)
engineIO_packets = _decode_content(response.content)
engineIO_packets = _decode_engineIO_content(response.content)
engineIO_packet_type, engineIO_packet_data = engineIO_packets[0]
assert engineIO_packet_type == 0
value_by_name = json.loads(get_unicode(engineIO_packet_data))
print(value_by_name)
# value_by_name['upgrades']
# 'websocket' in value_by_name['upgrades']
self._ping_interval = value_by_name['pingInterval'] / float(1000)
self._ping_timeout = value_by_name['pingTimeout'] / float(1000)
self._session_id = value_by_name['sid']
@ -162,6 +217,22 @@ class EngineIO(object):
hurry_interval_in_seconds=1)
self._heartbeat_thread.start()
@property
def connected(self):
try:
transport = self.__transport
except AttributeError:
return False
else:
return transport.connected
def on(self, event, callback):
try:
namespace = self.get_namespace()
except PacketError:
namespace = self.define(EngineIONamespace)
return namespace.on(event, callback)
def define(self, Namespace):
self._namespace = namespace = Namespace(self)
return namespace
@ -172,16 +243,36 @@ class EngineIO(object):
except AttributeError:
raise PacketError('undefined engine.io namespace')
def wait(self, seconds=None):
def wait(self, seconds=None, **kw):
'Wait in a loop and react to events as defined in the namespaces'
self._heartbeat_thread.hurry()
warning_screen = _yield_warning_screen(seconds)
for elapsed_time in warning_screen:
if self._should_stop_waiting(**kw):
break
try:
self._process_packets()
except TimeoutError:
pass
try:
self._process_packets()
except TimeoutError:
pass
except ConnectionError as e:
try:
warning = Exception('[connection error] %s' % e)
warning_screen.throw(warning)
except StopIteration:
self._log(logging.WARNING, warning)
try:
namespace = self.get_namespace()
namespace.on_disconnect()
except PacketError:
pass
self._heartbeat_thread.relax()
def _should_stop_waiting(self):
# Use __transport to make sure that we do not reconnect inadvertently
return self.__transport._wants_to_disconnect
def _process_packets(self):
for engineIO_packet in self._recv_packet():
try:
@ -192,8 +283,6 @@ class EngineIO(object):
def _process_packet(self, packet):
engineIO_packet_type, engineIO_packet_data = packet
print('engineIO_packet_type = %s' % engineIO_packet_type)
engineIO_packet_data_parsed = _parse_engineIO_data(
engineIO_packet_data)
# Launch callbacks
namespace = self.get_namespace()
try:
@ -209,30 +298,31 @@ class EngineIO(object):
except KeyError:
raise PacketError(
'unexpected engine.io packet type (%s)' % engineIO_packet_type)
delegate(engineIO_packet_data_parsed, namespace._find_packet_callback)
delegate(engineIO_packet_data, namespace._find_packet_callback)
if engineIO_packet_type is 4:
return engineIO_packet_data
def _on_open(self, data_parsed, find_packet_callback):
pass
def _on_open(self, data, find_packet_callback):
find_packet_callback('open')()
def _on_close(self, data_parsed, find_packet_callback):
pass
def _on_close(self, data, find_packet_callback):
find_packet_callback('close')()
def _on_ping(self, data_parsed, find_packet_callback):
pass
def _on_ping(self, data, find_packet_callback):
self._pong(data)
find_packet_callback('ping')(data)
def _on_pong(self, data_parsed, find_packet_callback):
pass
def _on_pong(self, data, find_packet_callback):
find_packet_callback('pong')(data)
def _on_message(self, data_parsed, find_packet_callback):
pass
def _on_message(self, data, find_packet_callback):
find_packet_callback('message')(data)
def _on_upgrade(self, data_parsed, find_packet_callback):
pass
def _on_upgrade(self, data, find_packet_callback):
find_packet_callback('upgrade')()
def _on_noop(self, data_parsed, find_packet_callback):
pass
def _on_noop(self, data, find_packet_callback):
find_packet_callback('noop')()
def _get_timestamp(self):
timestamp = '%s-%s' % (
@ -247,13 +337,14 @@ class EngineIO(object):
'transport': 'polling',
't': self._get_timestamp(),
'sid': self._session_id,
}, data=_encode_content([
}, data=_encode_engineIO_content([
(engineIO_packet_type, engineIO_packet_data),
]), headers={
'content-type': 'application/octet-stream',
})
print('message()')
engineIO_packets = _decode_content(response.content)
print(response.content)
engineIO_packets = _decode_engineIO_content(response.content)
for engineIO_packet_type, engineIO_packet_data in engineIO_packets:
socketIO_packet_type = int(get_character(engineIO_packet_data, 0))
socketIO_packet_data = engineIO_packet_data[1:]
@ -261,21 +352,43 @@ class EngineIO(object):
print('socketIO_packet_type = %s' % socketIO_packet_type)
print('socketIO_packet_data = %s' % socketIO_packet_data)
def _ping(self):
def _ping(self, engineIO_packet_data=''):
engineIO_packet_type = 2
engineIO_packet_data = ''
response = self._http_session.post(self._url, params={
'EIO': self._engineIO_protocol,
'transport': 'polling',
't': self._get_timestamp(),
'sid': self._session_id,
}, data=_encode_content([
}, data=_encode_engineIO_content([
(engineIO_packet_type, engineIO_packet_data),
]), headers={
'content-type': 'application/octet-stream',
})
print('ping()')
engineIO_packets = _decode_content(response.content)
print(response.content)
engineIO_packets = _decode_engineIO_content(response.content)
for engineIO_packet_type, engineIO_packet_data in engineIO_packets:
socketIO_packet_type = int(get_character(engineIO_packet_data, 0))
socketIO_packet_data = engineIO_packet_data[1:]
print('engineIO_packet_type = %s' % engineIO_packet_type)
print('socketIO_packet_type = %s' % socketIO_packet_type)
print('socketIO_packet_data = %s' % socketIO_packet_data)
def _pong(self, engineIO_packet_data=''):
engineIO_packet_type = 3
response = self._http_session.post(self._url, params={
'EIO': self._engineIO_protocol,
'transport': 'polling',
't': self._get_timestamp(),
'sid': self._session_id,
}, data=_encode_engineIO_content([
(engineIO_packet_type, engineIO_packet_data),
]), headers={
'content-type': 'application/octet-stream',
})
print('pong()')
print(response.content)
engineIO_packets = _decode_engineIO_content(response.content)
for engineIO_packet_type, engineIO_packet_data in engineIO_packets:
socketIO_packet_type = int(get_character(engineIO_packet_data, 0))
socketIO_packet_data = engineIO_packet_data[1:]
@ -294,7 +407,7 @@ class EngineIO(object):
'sid': self._session_id,
},
timeout=self._ping_timeout)
for engineIO_packet in _decode_content(response.content):
for engineIO_packet in _decode_engineIO_content(response.content):
yield engineIO_packet
def _log(self, level, msg, *attrs):
@ -309,11 +422,20 @@ class SocketIO(EngineIO):
wait_for_connection=True, transports=TRANSPORTS,
resource='socket.io', **kw):
self._namespace_by_path = {}
self._callback_by_ack_id = {}
self._ack_id = 0
super(SocketIO, self).__init__(
host, port, Namespace,
wait_for_connection, transports,
resource, **kw)
def on(self, event, callback, path=''):
try:
namespace = self.get_namespace(path)
except PacketError:
namespace = self.define(SocketIONamespace, path)
return namespace.on(event, callback)
def define(self, Namespace, path=''):
if path:
self._connect(path)
@ -326,13 +448,45 @@ class SocketIO(EngineIO):
except KeyError:
raise PacketError('undefined socket.io namespace (%s)' % path)
def wait(self, seconds=None, for_callbacks=False):
super(SocketIO, self).wait(seconds, for_callbacks=for_callbacks)
def wait_for_callbacks(self, seconds=None):
self.wait(seconds, for_callbacks=True)
def _should_stop_waiting(self, for_callbacks):
# Use __transport to make sure that we do not reconnect inadvertently
if for_callbacks and not self.__transport.has_ack_callback:
return True
return super(SocketIO, self)._should_stop_waiting()
def emit(self, event, *args, **kw):
path = kw.get('path', '')
callback, args = find_callback(args, kw)
self._emit(path, event, args, callback)
def _emit(self, path, event, args, callback):
socketIO_packet_type = 2
socketIO_packet_data = json.dumps([event])
socketIO_packet_data = json.dumps([event] + list(args))
if callback:
ack_id = self._set_ack_callback(callback) if callback else ''
socketIO_packet_data = str(ack_id) + socketIO_packet_data
if path:
socketIO_packet_data = path + ',' + socketIO_packet_data
self._message(str(socketIO_packet_type) + socketIO_packet_data)
def on(self, event, callback):
pass
def _set_ack_callback(self, callback):
self._ack_id += 1
self._callback_by_ack_id[self._ack_id] = callback
return self._ack_id
def send(self, data='', callback=None):
args = [data]
if callback:
args.append(callback)
self.emit('message', *args)
def _process_packet(self, packet):
engineIO_packet_data = super(SocketIO, self)._process_packet(packet)
@ -341,8 +495,6 @@ class SocketIO(EngineIO):
socketIO_packet_type = int(get_character(engineIO_packet_data, 0))
socketIO_packet_data = engineIO_packet_data[1:]
print('socketIO_packet_type = %s' % socketIO_packet_type)
socketIO_packet_data_parsed = _parse_socketIO_data(
socketIO_packet_data)
# Launch callbacks
namespace = self.get_namespace()
try:
@ -358,33 +510,46 @@ class SocketIO(EngineIO):
except KeyError:
raise PacketError(
'unexpected socket.io packet type (%s)' % socketIO_packet_type)
delegate(socketIO_packet_data_parsed, namespace._find_packet_callback)
delegate(socketIO_packet_data, namespace._find_packet_callback)
return socketIO_packet_data
def _on_connect(self, data_parsed, find_packet_callback):
def _on_connect(self, data, find_packet_callback):
find_packet_callback('connect')()
def _on_disconnect(self, data_parsed, find_packet_callback):
def _on_disconnect(self, data, find_packet_callback):
find_packet_callback('disconnect')()
def _on_event(self, data_parsed, find_packet_callback):
def _on_event(self, data, find_packet_callback):
data_parsed = _parse_socketIO_data(data)
args = data_parsed.args
try:
event = args.pop(0)
except IndexError:
raise PacketError('missing event name')
if data_parsed.ack_id:
args.append(self._prepare_to_send_ack(
data_parsed.path, data_parsed.ack_id))
find_packet_callback(data_parsed.event)(*args)
find_packet_callback(event)(*args)
def _on_ack(self, data_parsed, find_packet_callback):
pass
def _on_ack(self, data, find_packet_callback):
data_parsed = _parse_socketIO_data(data)
try:
ack_callback = self._get_ack_callback(data_parsed.ack_id)
except KeyError:
return
ack_callback(*data_parsed.args)
def _on_error(self, data_parsed, find_packet_callback):
pass
def _get_ack_callback(self, ack_id):
return self._callback_by_ack_id.pop(ack_id)
def _on_binary_event(self, data_parsed, find_packet_callback):
pass
def _on_error(self, data, find_packet_callback):
find_packet_callback('error')(data)
def _on_binary_ack(self, data_parsed, find_packet_callback):
pass
def _on_binary_event(self, data, find_packet_callback):
self._log(logging.WARNING, '[not implemented] binary event')
def _on_binary_ack(self, data, find_packet_callback):
self._log(logging.WARNING, '[not implemented] binary ack')
def _prepare_to_send_ack(self, path, ack_id):
'Return function that acknowledges the server'
@ -421,6 +586,8 @@ class HeartbeatThread(threading.Thread):
def hurry(self):
self._adrenaline.set()
self._rest.set()
self._rest.clear()
def stop(self):
self._stop.set()
@ -436,7 +603,7 @@ def find_callback(args, kw=None):
return None, args
def _decode_content(content):
def _decode_engineIO_content(content):
packets = []
content_index = 0
content_length = len(content)
@ -454,7 +621,7 @@ def _decode_content(content):
return packets
def _encode_content(packets):
def _encode_engineIO_content(packets):
parts = []
for packet_type, packet_data in packets:
packet_string = str(packet_type) + str(packet_data)
@ -491,10 +658,6 @@ def _make_packet_header(packet_string):
return ''.join(chr(x) for x in header_digits)
def _parse_engineIO_data(data):
return EngineIOData(data=get_unicode(data))
def _parse_socketIO_data(data):
data = get_unicode(data)
if data.startswith('/'):
@ -510,14 +673,11 @@ def _parse_socketIO_data(data):
data = data[1:]
except (ValueError, IndexError):
ack_id = None
if data:
x = json.loads(data)
event = x[0]
args = x[1:]
else:
event = ''
try:
args = json.loads(data)
except ValueError:
args = []
return SocketIOData(path=path, ack_id=ack_id, event=event, args=args)
return SocketIOData(path=path, ack_id=ack_id, args=args)
def _yield_warning_screen(seconds=None):

View file

@ -6,11 +6,19 @@ from .. import SocketIO, LoggingSocketIONamespace, find_callback
HOST = 'localhost'
PORT = 8000
DATA = 'xxx'
PAYLOAD = {'xxx': 'yyy'}
logging.basicConfig(level=logging.DEBUG)
class BaseMixin(object):
def setUp(self):
self.called_on_response = False
def tearDown(self):
del self.socketIO
def test_emit(self):
'Emit'
namespace = self.socketIO.define(Namespace)
@ -20,6 +28,84 @@ class BaseMixin(object):
'emit_response': (),
})
def test_emit_with_payload(self):
'Emit with payload'
namespace = self.socketIO.define(Namespace)
self.socketIO.emit('emit_with_payload', PAYLOAD)
self.socketIO.wait(self.wait_time_in_seconds)
self.assertEqual(namespace.args_by_event, {
'emit_with_payload_response': (PAYLOAD,),
})
def test_emit_with_multiple_payloads(self):
'Emit with multiple payloads'
namespace = self.socketIO.define(Namespace)
self.socketIO.emit('emit_with_multiple_payloads', PAYLOAD, PAYLOAD)
self.socketIO.wait(self.wait_time_in_seconds)
self.assertEqual(namespace.args_by_event, {
'emit_with_multiple_payloads_response': (PAYLOAD, PAYLOAD),
})
def test_emit_with_callback(self):
'Emit with callback'
self.socketIO.define(LoggingSocketIONamespace)
self.socketIO.emit('emit_with_callback', self.on_response)
self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds)
self.assertTrue(self.called_on_response)
def test_emit_with_callback_with_payload(self):
'Emit with callback with payload'
self.socketIO.define(LoggingSocketIONamespace)
self.socketIO.emit(
'emit_with_callback_with_payload', self.on_response)
self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds)
self.assertTrue(self.called_on_response)
def test_emit_with_callback_with_multiple_payloads(self):
'Emit with callback with multiple payloads'
self.socketIO.define(LoggingSocketIONamespace)
self.socketIO.emit(
'emit_with_callback_with_multiple_payloads', self.on_response)
self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds)
self.assertTrue(self.called_on_response)
def test_emit_with_event(self):
'Emit to trigger an event'
self.socketIO.on('emit_with_event_response', self.on_response)
self.socketIO.emit('emit_with_event', PAYLOAD)
self.socketIO.wait(self.wait_time_in_seconds)
self.assertTrue(self.called_on_response)
def test_send(self):
'Send'
namespace = self.socketIO.define(Namespace)
self.socketIO.send()
self.socketIO.wait(self.wait_time_in_seconds)
self.assertEqual(namespace.response, 'message_response')
def test_send_with_data(self):
'Send with data'
namespace = self.socketIO.define(Namespace)
self.socketIO.send(DATA)
self.socketIO.wait(self.wait_time_in_seconds)
self.assertEqual(namespace.response, DATA)
def on_response(self, *args):
for arg in args:
if isinstance(arg, dict):
self.assertEqual(arg, PAYLOAD)
else:
self.assertEqual(arg, DATA)
self.called_on_response = True
# class Test_WebsocketTransport(TestCase, BaseMixin):
# def setUp(self):
# super(Test_WebsocketTransport, self).setUp()
# self.socketIO = SocketIO(HOST, PORT, transports=['websocket'])
# self.wait_time_in_seconds = 0.1
class Test_XHR_PollingTransport(TestCase, BaseMixin):
@ -32,10 +118,21 @@ class Test_XHR_PollingTransport(TestCase, BaseMixin):
class Namespace(LoggingSocketIONamespace):
def initialize(self):
self.called_on_disconnect = False
self.args_by_event = {}
self.response = None
def on_disconnect(self):
self.called_on_disconnect = True
def on_wait_with_disconnect_response(self):
self.disconnect()
def on_event(self, event, *args):
callback, args = find_callback(args)
if callback:
callback(*args)
self.args_by_event[event] = args
def on_message(self, data):
self.response = data

View file

@ -1,4 +1,25 @@
<script src="/socket.io/socket.io.js"></script>
<script>
var socket = io('http://localhost');
var chat = io('/chat');
var news = io('/news');
socket.on('server_expects_callback', function(payload, fn) {fn(payload)});
socket.emit('trigger_server_expects_callback', 'whee');
socket.emit('emit');
socket.emit('emit_with_payload');
socket.emit('emit_with_multiple_payloads', 'aaa', 'bbb');
socket.emit('emit_with_callback', function() {console.log('whee')});
socket.emit('emit_with_callback_with_payload', function(x) {console.log('whee ' + x)});
socket.emit('emit_with_callback_with_multiple_payloads', function(x, y) {console.log('whee ' + x + ' ' + y)});
socket.emit('emit_with_event');
socket.emit('aaa');
chat.on('server_expects_callback', function(payload, fn) {fn(payload)});
chat.emit('trigger_server_expects_callback', 'whee');
chat.emit('emit_with_payload');
chat.emit('aaa');
news.emit('emit_with_payload');
news.emit('aaa');
</script>

View file

@ -1,5 +1,8 @@
var proxy = require('http-proxy').createProxyServer({
target: {host: 'localhost', port: 9000}
}).on('error', function(err, req, res) {
console.log('[ERROR] %s', err);
res.end();
});
var server = require('http').createServer(function(req, res) {
console.log('[REQUEST.%s] %s', req.method, req.url);

View file

@ -43,9 +43,9 @@ io.on('connection', function(socket) {
socket.on('emit_with_event', function(payload) {
socket.emit('emit_with_event_response', payload);
});
socket.on('ack', function(payload) {
socket.emit('ack_response', payload, function(payload) {
socket.emit('ack_callback_response', payload);
socket.on('trigger_server_expects_callback', function(payload) {
socket.emit('server_expects_callback', payload, function(payload) {
socket.emit('server_received_callback', payload);
});
});
socket.on('aaa', function() {
@ -66,9 +66,9 @@ io.of('/chat').on('connection', function(socket) {
socket.on('aaa', function() {
socket.emit('aaa_response', 'in chat');
});
socket.on('ack', function(payload) {
socket.emit('ack_response', payload, function(payload) {
socket.emit('ack_callback_response', payload);
socket.on('trigger_server_expects_callback', function(payload) {
socket.emit('server_expects_callback', payload, function(payload) {
socket.emit('server_received_callback', payload);
});
});
});