diff --git a/TODO.goals b/TODO.goals index 09ff239..619a558 100644 --- a/TODO.goals +++ b/TODO.goals @@ -1,6 +1,4 @@ Release 0.6.1 #41 #52 - Implement on - Send pong Change print statements into logging statements Clean up diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index 61306f3..d390fda 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -6,14 +6,13 @@ import time from collections import namedtuple from .compat import get_byte, get_character, get_unicode -from .exceptions import PacketError, TimeoutError +from .exceptions import ConnectionError, TimeoutError, PacketError from .transports import _get_response __version__ = '0.6.1' _log = logging.getLogger(__name__) -EngineIOData = namedtuple('EngineIOData', ['data']) -SocketIOData = namedtuple('SocketIOData', ['path', 'ack_id', 'event', 'args']) +SocketIOData = namedtuple('SocketIOData', ['path', 'ack_id', 'args']) TRANSPORTS = [] RETRY_INTERVAL_IN_SECONDS = 1 @@ -27,18 +26,40 @@ class EngineIONamespace(object): self.initialize() def initialize(self): - 'Initialize custom variables here; you can override this method' - pass + """Initialize custom variables here. + You can override this method.""" - def on_event(self, event, *args): - """ - Called after server sends an event; you can override this method. - Called only if a custom event handler does not exist, - such as one defined by namespace.on('my_event', my_function). - """ - callback, args = find_callback(args) - if callback: - callback(*args) + def on(self, event, callback): + 'Define a callback to handle an event emitted by the server' + self._callback_by_event[event] = callback + + def on_open(self): + """Called after engine.io connects. + You can override this method.""" + + def on_close(self): + """Called after engine.io disconnects. + You can override this method.""" + + def on_ping(self, data): + """Called after engine.io sends a ping packet. + You can override this method.""" + + def on_pong(self, data): + """Called after engine.io sends a pong packet. + You can override this method.""" + + def on_message(self, data): + """Called after engine.io sends a message packet. + You can override this method.""" + + def on_upgrade(self): + """Called after engine.io sends an upgrade packet. + You can override this method.""" + + def on_noop(self): + """Called after engine.io sends a noop packet. + You can override this method.""" def _find_packet_callback(self, event): # Check callbacks defined by on() @@ -46,10 +67,8 @@ class EngineIONamespace(object): return self._callback_by_event[event] except KeyError: pass - # Check callbacks defined explicitly or use on_event() - return getattr( - self, 'on_' + event.replace(' ', '_'), - lambda *args: self.on_event(event, *args)) + # Check callbacks defined explicitly + return getattr(self, 'on_' + event) class SocketIONamespace(EngineIONamespace): @@ -60,13 +79,45 @@ class SocketIONamespace(EngineIONamespace): super(SocketIONamespace, self).__init__(io) def on_connect(self): - 'Called after server connects; you can override this method' + """Called after socket.io connects. + You can override this method.""" def on_reconnect(self): - 'Called after server reconnects; you can override this method' + """Called after socket.io reconnects. + You can override this method.""" def on_disconnect(self): - 'Called after server disconnects; you can override this method' + """Called after socket.io disconnects. + You can override this method.""" + + def on_event(self, event, *args): + """ + Called if there is no matching event handler. + You can override this method. + There are three ways to define an event handler: + + - Call socketIO.on() + + socketIO = SocketIO('localhost', 8000) + socketIO.on('my_event', my_function) + + - Call namespace.on() + + namespace = socketIO.get_namespace() + namespace.on('my_event', my_function) + + - Define namespace.on_xxx + + class Namespace(SocketIONamespace): + + def on_my_event(self, *args): + my_function(*args) + + socketIO.define(Namespace)""" + + def on_error(self, data): + """Called after socket.io sends an error packet. + You can override this method.""" def _find_packet_callback(self, event): # Interpret events @@ -75,7 +126,15 @@ class SocketIONamespace(EngineIONamespace): self._was_connected = True else: event = 'reconnect' - return super(SocketIONamespace, self)._find_packet_callback(event) + # Check callbacks defined by on() + try: + return self._callback_by_event[event] + except KeyError: + pass + # Check callbacks defined explicitly or use on_event() + return getattr( + self, 'on_' + event.replace(' ', '_'), + lambda *args: self.on_event(event, *args)) class LoggingMixin(object): @@ -134,21 +193,17 @@ class EngineIO(object): resource='engine.io', **kw): self._url = 'http://%s:%s/%s/' % (host, port, resource) self._http_session = requests.Session() - print(self._url) response = self._http_session.get(self._url, params={ 'EIO': self._engineIO_protocol, 'transport': 'polling', 't': self._get_timestamp(), }) - print(response.url) - - engineIO_packets = _decode_content(response.content) + engineIO_packets = _decode_engineIO_content(response.content) engineIO_packet_type, engineIO_packet_data = engineIO_packets[0] assert engineIO_packet_type == 0 value_by_name = json.loads(get_unicode(engineIO_packet_data)) - print(value_by_name) - # value_by_name['upgrades'] + # 'websocket' in value_by_name['upgrades'] self._ping_interval = value_by_name['pingInterval'] / float(1000) self._ping_timeout = value_by_name['pingTimeout'] / float(1000) self._session_id = value_by_name['sid'] @@ -162,6 +217,22 @@ class EngineIO(object): hurry_interval_in_seconds=1) self._heartbeat_thread.start() + @property + def connected(self): + try: + transport = self.__transport + except AttributeError: + return False + else: + return transport.connected + + def on(self, event, callback): + try: + namespace = self.get_namespace() + except PacketError: + namespace = self.define(EngineIONamespace) + return namespace.on(event, callback) + def define(self, Namespace): self._namespace = namespace = Namespace(self) return namespace @@ -172,16 +243,36 @@ class EngineIO(object): except AttributeError: raise PacketError('undefined engine.io namespace') - def wait(self, seconds=None): + def wait(self, seconds=None, **kw): + 'Wait in a loop and react to events as defined in the namespaces' self._heartbeat_thread.hurry() warning_screen = _yield_warning_screen(seconds) for elapsed_time in warning_screen: + if self._should_stop_waiting(**kw): + break try: - self._process_packets() - except TimeoutError: - pass + try: + self._process_packets() + except TimeoutError: + pass + except ConnectionError as e: + try: + warning = Exception('[connection error] %s' % e) + warning_screen.throw(warning) + except StopIteration: + self._log(logging.WARNING, warning) + try: + namespace = self.get_namespace() + namespace.on_disconnect() + except PacketError: + pass + self._heartbeat_thread.relax() + def _should_stop_waiting(self): + # Use __transport to make sure that we do not reconnect inadvertently + return self.__transport._wants_to_disconnect + def _process_packets(self): for engineIO_packet in self._recv_packet(): try: @@ -192,8 +283,6 @@ class EngineIO(object): def _process_packet(self, packet): engineIO_packet_type, engineIO_packet_data = packet print('engineIO_packet_type = %s' % engineIO_packet_type) - engineIO_packet_data_parsed = _parse_engineIO_data( - engineIO_packet_data) # Launch callbacks namespace = self.get_namespace() try: @@ -209,30 +298,31 @@ class EngineIO(object): except KeyError: raise PacketError( 'unexpected engine.io packet type (%s)' % engineIO_packet_type) - delegate(engineIO_packet_data_parsed, namespace._find_packet_callback) + delegate(engineIO_packet_data, namespace._find_packet_callback) if engineIO_packet_type is 4: return engineIO_packet_data - def _on_open(self, data_parsed, find_packet_callback): - pass + def _on_open(self, data, find_packet_callback): + find_packet_callback('open')() - def _on_close(self, data_parsed, find_packet_callback): - pass + def _on_close(self, data, find_packet_callback): + find_packet_callback('close')() - def _on_ping(self, data_parsed, find_packet_callback): - pass + def _on_ping(self, data, find_packet_callback): + self._pong(data) + find_packet_callback('ping')(data) - def _on_pong(self, data_parsed, find_packet_callback): - pass + def _on_pong(self, data, find_packet_callback): + find_packet_callback('pong')(data) - def _on_message(self, data_parsed, find_packet_callback): - pass + def _on_message(self, data, find_packet_callback): + find_packet_callback('message')(data) - def _on_upgrade(self, data_parsed, find_packet_callback): - pass + def _on_upgrade(self, data, find_packet_callback): + find_packet_callback('upgrade')() - def _on_noop(self, data_parsed, find_packet_callback): - pass + def _on_noop(self, data, find_packet_callback): + find_packet_callback('noop')() def _get_timestamp(self): timestamp = '%s-%s' % ( @@ -247,13 +337,14 @@ class EngineIO(object): 'transport': 'polling', 't': self._get_timestamp(), 'sid': self._session_id, - }, data=_encode_content([ + }, data=_encode_engineIO_content([ (engineIO_packet_type, engineIO_packet_data), ]), headers={ 'content-type': 'application/octet-stream', }) print('message()') - engineIO_packets = _decode_content(response.content) + print(response.content) + engineIO_packets = _decode_engineIO_content(response.content) for engineIO_packet_type, engineIO_packet_data in engineIO_packets: socketIO_packet_type = int(get_character(engineIO_packet_data, 0)) socketIO_packet_data = engineIO_packet_data[1:] @@ -261,21 +352,43 @@ class EngineIO(object): print('socketIO_packet_type = %s' % socketIO_packet_type) print('socketIO_packet_data = %s' % socketIO_packet_data) - def _ping(self): + def _ping(self, engineIO_packet_data=''): engineIO_packet_type = 2 - engineIO_packet_data = '' response = self._http_session.post(self._url, params={ 'EIO': self._engineIO_protocol, 'transport': 'polling', 't': self._get_timestamp(), 'sid': self._session_id, - }, data=_encode_content([ + }, data=_encode_engineIO_content([ (engineIO_packet_type, engineIO_packet_data), ]), headers={ 'content-type': 'application/octet-stream', }) print('ping()') - engineIO_packets = _decode_content(response.content) + print(response.content) + engineIO_packets = _decode_engineIO_content(response.content) + for engineIO_packet_type, engineIO_packet_data in engineIO_packets: + socketIO_packet_type = int(get_character(engineIO_packet_data, 0)) + socketIO_packet_data = engineIO_packet_data[1:] + print('engineIO_packet_type = %s' % engineIO_packet_type) + print('socketIO_packet_type = %s' % socketIO_packet_type) + print('socketIO_packet_data = %s' % socketIO_packet_data) + + def _pong(self, engineIO_packet_data=''): + engineIO_packet_type = 3 + response = self._http_session.post(self._url, params={ + 'EIO': self._engineIO_protocol, + 'transport': 'polling', + 't': self._get_timestamp(), + 'sid': self._session_id, + }, data=_encode_engineIO_content([ + (engineIO_packet_type, engineIO_packet_data), + ]), headers={ + 'content-type': 'application/octet-stream', + }) + print('pong()') + print(response.content) + engineIO_packets = _decode_engineIO_content(response.content) for engineIO_packet_type, engineIO_packet_data in engineIO_packets: socketIO_packet_type = int(get_character(engineIO_packet_data, 0)) socketIO_packet_data = engineIO_packet_data[1:] @@ -294,7 +407,7 @@ class EngineIO(object): 'sid': self._session_id, }, timeout=self._ping_timeout) - for engineIO_packet in _decode_content(response.content): + for engineIO_packet in _decode_engineIO_content(response.content): yield engineIO_packet def _log(self, level, msg, *attrs): @@ -309,11 +422,20 @@ class SocketIO(EngineIO): wait_for_connection=True, transports=TRANSPORTS, resource='socket.io', **kw): self._namespace_by_path = {} + self._callback_by_ack_id = {} + self._ack_id = 0 super(SocketIO, self).__init__( host, port, Namespace, wait_for_connection, transports, resource, **kw) + def on(self, event, callback, path=''): + try: + namespace = self.get_namespace(path) + except PacketError: + namespace = self.define(SocketIONamespace, path) + return namespace.on(event, callback) + def define(self, Namespace, path=''): if path: self._connect(path) @@ -326,13 +448,45 @@ class SocketIO(EngineIO): except KeyError: raise PacketError('undefined socket.io namespace (%s)' % path) + def wait(self, seconds=None, for_callbacks=False): + super(SocketIO, self).wait(seconds, for_callbacks=for_callbacks) + + def wait_for_callbacks(self, seconds=None): + self.wait(seconds, for_callbacks=True) + + def _should_stop_waiting(self, for_callbacks): + # Use __transport to make sure that we do not reconnect inadvertently + if for_callbacks and not self.__transport.has_ack_callback: + return True + return super(SocketIO, self)._should_stop_waiting() + def emit(self, event, *args, **kw): + path = kw.get('path', '') + callback, args = find_callback(args, kw) + self._emit(path, event, args, callback) + + def _emit(self, path, event, args, callback): socketIO_packet_type = 2 - socketIO_packet_data = json.dumps([event]) + + socketIO_packet_data = json.dumps([event] + list(args)) + if callback: + ack_id = self._set_ack_callback(callback) if callback else '' + socketIO_packet_data = str(ack_id) + socketIO_packet_data + if path: + socketIO_packet_data = path + ',' + socketIO_packet_data + self._message(str(socketIO_packet_type) + socketIO_packet_data) - def on(self, event, callback): - pass + def _set_ack_callback(self, callback): + self._ack_id += 1 + self._callback_by_ack_id[self._ack_id] = callback + return self._ack_id + + def send(self, data='', callback=None): + args = [data] + if callback: + args.append(callback) + self.emit('message', *args) def _process_packet(self, packet): engineIO_packet_data = super(SocketIO, self)._process_packet(packet) @@ -341,8 +495,6 @@ class SocketIO(EngineIO): socketIO_packet_type = int(get_character(engineIO_packet_data, 0)) socketIO_packet_data = engineIO_packet_data[1:] print('socketIO_packet_type = %s' % socketIO_packet_type) - socketIO_packet_data_parsed = _parse_socketIO_data( - socketIO_packet_data) # Launch callbacks namespace = self.get_namespace() try: @@ -358,33 +510,46 @@ class SocketIO(EngineIO): except KeyError: raise PacketError( 'unexpected socket.io packet type (%s)' % socketIO_packet_type) - delegate(socketIO_packet_data_parsed, namespace._find_packet_callback) + delegate(socketIO_packet_data, namespace._find_packet_callback) return socketIO_packet_data - def _on_connect(self, data_parsed, find_packet_callback): + def _on_connect(self, data, find_packet_callback): find_packet_callback('connect')() - def _on_disconnect(self, data_parsed, find_packet_callback): + def _on_disconnect(self, data, find_packet_callback): find_packet_callback('disconnect')() - def _on_event(self, data_parsed, find_packet_callback): + def _on_event(self, data, find_packet_callback): + data_parsed = _parse_socketIO_data(data) args = data_parsed.args + try: + event = args.pop(0) + except IndexError: + raise PacketError('missing event name') if data_parsed.ack_id: args.append(self._prepare_to_send_ack( data_parsed.path, data_parsed.ack_id)) - find_packet_callback(data_parsed.event)(*args) + find_packet_callback(event)(*args) - def _on_ack(self, data_parsed, find_packet_callback): - pass + def _on_ack(self, data, find_packet_callback): + data_parsed = _parse_socketIO_data(data) + try: + ack_callback = self._get_ack_callback(data_parsed.ack_id) + except KeyError: + return + ack_callback(*data_parsed.args) - def _on_error(self, data_parsed, find_packet_callback): - pass + def _get_ack_callback(self, ack_id): + return self._callback_by_ack_id.pop(ack_id) - def _on_binary_event(self, data_parsed, find_packet_callback): - pass + def _on_error(self, data, find_packet_callback): + find_packet_callback('error')(data) - def _on_binary_ack(self, data_parsed, find_packet_callback): - pass + def _on_binary_event(self, data, find_packet_callback): + self._log(logging.WARNING, '[not implemented] binary event') + + def _on_binary_ack(self, data, find_packet_callback): + self._log(logging.WARNING, '[not implemented] binary ack') def _prepare_to_send_ack(self, path, ack_id): 'Return function that acknowledges the server' @@ -421,6 +586,8 @@ class HeartbeatThread(threading.Thread): def hurry(self): self._adrenaline.set() + self._rest.set() + self._rest.clear() def stop(self): self._stop.set() @@ -436,7 +603,7 @@ def find_callback(args, kw=None): return None, args -def _decode_content(content): +def _decode_engineIO_content(content): packets = [] content_index = 0 content_length = len(content) @@ -454,7 +621,7 @@ def _decode_content(content): return packets -def _encode_content(packets): +def _encode_engineIO_content(packets): parts = [] for packet_type, packet_data in packets: packet_string = str(packet_type) + str(packet_data) @@ -491,10 +658,6 @@ def _make_packet_header(packet_string): return ''.join(chr(x) for x in header_digits) -def _parse_engineIO_data(data): - return EngineIOData(data=get_unicode(data)) - - def _parse_socketIO_data(data): data = get_unicode(data) if data.startswith('/'): @@ -510,14 +673,11 @@ def _parse_socketIO_data(data): data = data[1:] except (ValueError, IndexError): ack_id = None - if data: - x = json.loads(data) - event = x[0] - args = x[1:] - else: - event = '' + try: + args = json.loads(data) + except ValueError: args = [] - return SocketIOData(path=path, ack_id=ack_id, event=event, args=args) + return SocketIOData(path=path, ack_id=ack_id, args=args) def _yield_warning_screen(seconds=None): diff --git a/socketIO_client/tests/__init__.py b/socketIO_client/tests/__init__.py index ed194c3..75be733 100644 --- a/socketIO_client/tests/__init__.py +++ b/socketIO_client/tests/__init__.py @@ -6,11 +6,19 @@ from .. import SocketIO, LoggingSocketIONamespace, find_callback HOST = 'localhost' PORT = 8000 +DATA = 'xxx' +PAYLOAD = {'xxx': 'yyy'} logging.basicConfig(level=logging.DEBUG) class BaseMixin(object): + def setUp(self): + self.called_on_response = False + + def tearDown(self): + del self.socketIO + def test_emit(self): 'Emit' namespace = self.socketIO.define(Namespace) @@ -20,6 +28,84 @@ class BaseMixin(object): 'emit_response': (), }) + def test_emit_with_payload(self): + 'Emit with payload' + namespace = self.socketIO.define(Namespace) + self.socketIO.emit('emit_with_payload', PAYLOAD) + self.socketIO.wait(self.wait_time_in_seconds) + self.assertEqual(namespace.args_by_event, { + 'emit_with_payload_response': (PAYLOAD,), + }) + + def test_emit_with_multiple_payloads(self): + 'Emit with multiple payloads' + namespace = self.socketIO.define(Namespace) + self.socketIO.emit('emit_with_multiple_payloads', PAYLOAD, PAYLOAD) + self.socketIO.wait(self.wait_time_in_seconds) + self.assertEqual(namespace.args_by_event, { + 'emit_with_multiple_payloads_response': (PAYLOAD, PAYLOAD), + }) + + def test_emit_with_callback(self): + 'Emit with callback' + self.socketIO.define(LoggingSocketIONamespace) + self.socketIO.emit('emit_with_callback', self.on_response) + self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) + self.assertTrue(self.called_on_response) + + def test_emit_with_callback_with_payload(self): + 'Emit with callback with payload' + self.socketIO.define(LoggingSocketIONamespace) + self.socketIO.emit( + 'emit_with_callback_with_payload', self.on_response) + self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) + self.assertTrue(self.called_on_response) + + def test_emit_with_callback_with_multiple_payloads(self): + 'Emit with callback with multiple payloads' + self.socketIO.define(LoggingSocketIONamespace) + self.socketIO.emit( + 'emit_with_callback_with_multiple_payloads', self.on_response) + self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) + self.assertTrue(self.called_on_response) + + def test_emit_with_event(self): + 'Emit to trigger an event' + self.socketIO.on('emit_with_event_response', self.on_response) + self.socketIO.emit('emit_with_event', PAYLOAD) + self.socketIO.wait(self.wait_time_in_seconds) + self.assertTrue(self.called_on_response) + + def test_send(self): + 'Send' + namespace = self.socketIO.define(Namespace) + self.socketIO.send() + self.socketIO.wait(self.wait_time_in_seconds) + self.assertEqual(namespace.response, 'message_response') + + def test_send_with_data(self): + 'Send with data' + namespace = self.socketIO.define(Namespace) + self.socketIO.send(DATA) + self.socketIO.wait(self.wait_time_in_seconds) + self.assertEqual(namespace.response, DATA) + + def on_response(self, *args): + for arg in args: + if isinstance(arg, dict): + self.assertEqual(arg, PAYLOAD) + else: + self.assertEqual(arg, DATA) + self.called_on_response = True + + +# class Test_WebsocketTransport(TestCase, BaseMixin): + + # def setUp(self): + # super(Test_WebsocketTransport, self).setUp() + # self.socketIO = SocketIO(HOST, PORT, transports=['websocket']) + # self.wait_time_in_seconds = 0.1 + class Test_XHR_PollingTransport(TestCase, BaseMixin): @@ -32,10 +118,21 @@ class Test_XHR_PollingTransport(TestCase, BaseMixin): class Namespace(LoggingSocketIONamespace): def initialize(self): + self.called_on_disconnect = False self.args_by_event = {} + self.response = None + + def on_disconnect(self): + self.called_on_disconnect = True + + def on_wait_with_disconnect_response(self): + self.disconnect() def on_event(self, event, *args): callback, args = find_callback(args) if callback: callback(*args) self.args_by_event[event] = args + + def on_message(self, data): + self.response = data diff --git a/socketIO_client/tests/index.html b/socketIO_client/tests/index.html index 86299b8..df37df0 100644 --- a/socketIO_client/tests/index.html +++ b/socketIO_client/tests/index.html @@ -1,4 +1,25 @@ diff --git a/socketIO_client/tests/proxy.js b/socketIO_client/tests/proxy.js index f5fbfca..6179788 100644 --- a/socketIO_client/tests/proxy.js +++ b/socketIO_client/tests/proxy.js @@ -1,5 +1,8 @@ var proxy = require('http-proxy').createProxyServer({ target: {host: 'localhost', port: 9000} +}).on('error', function(err, req, res) { + console.log('[ERROR] %s', err); + res.end(); }); var server = require('http').createServer(function(req, res) { console.log('[REQUEST.%s] %s', req.method, req.url); diff --git a/socketIO_client/tests/serve.js b/socketIO_client/tests/serve.js index c3e10c8..40af3e1 100644 --- a/socketIO_client/tests/serve.js +++ b/socketIO_client/tests/serve.js @@ -43,9 +43,9 @@ io.on('connection', function(socket) { socket.on('emit_with_event', function(payload) { socket.emit('emit_with_event_response', payload); }); - socket.on('ack', function(payload) { - socket.emit('ack_response', payload, function(payload) { - socket.emit('ack_callback_response', payload); + socket.on('trigger_server_expects_callback', function(payload) { + socket.emit('server_expects_callback', payload, function(payload) { + socket.emit('server_received_callback', payload); }); }); socket.on('aaa', function() { @@ -66,9 +66,9 @@ io.of('/chat').on('connection', function(socket) { socket.on('aaa', function() { socket.emit('aaa_response', 'in chat'); }); - socket.on('ack', function(payload) { - socket.emit('ack_response', payload, function(payload) { - socket.emit('ack_callback_response', payload); + socket.on('trigger_server_expects_callback', function(payload) { + socket.emit('server_expects_callback', payload, function(payload) { + socket.emit('server_received_callback', payload); }); }); });