diff --git a/.travis.yml b/.travis.yml index e6415c3..b0bc004 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,12 +7,14 @@ before_install: - sudo apt-get update - sudo apt-get install nodejs install: - - npm install -G socket.io@0.9 + - npm install -G socket.io + - npm install -G http-proxy - pip install -U requests - pip install -U six - pip install -U websocket-client - pip install -U coverage before_script: - - node serve-tests.js & + - DEBUG=* node socketIO_client/tests/serve.js & + - DEBUG=* node socketIO_client/tests/proxy.js & - sleep 3 script: nosetests diff --git a/CHANGES.rst b/CHANGES.rst index 9fcbea8..b7e7cb1 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,7 @@ +0.6.1 +----- +- Upgraded to socket.io protocol 1.x thanks to Sean Arietta and Joe Palmer + 0.5.5 ----- - Fixed reconnection in the event of server restart diff --git a/MANIFEST.in b/MANIFEST.in index 346209d..abf7482 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,3 +1,3 @@ recursive-include socketIO_client * -include *.rst +include *.html *.js *.rst global-exclude *.pyc diff --git a/README.rst b/README.rst index 7bc0f48..967514e 100644 --- a/README.rst +++ b/README.rst @@ -1,4 +1,4 @@ -.. image:: https://travis-ci.org/invisibleroads/socketIO-client.svg?branch=v0.5.4 +.. image:: https://travis-ci.org/invisibleroads/socketIO-client.svg?branch=0.6.1 :target: https://travis-ci.org/invisibleroads/socketIO-client @@ -32,11 +32,17 @@ Activate isolated environment. :: Launch your socket.io server. :: - node serve-tests.js + # Get package folder + PACKAGE_FOLDER=`python -c "import os, socketIO_client; print(os.path.dirname(socketIO_client.__file__))"` + # Start socket.io server + DEBUG=* node $PACKAGE_FOLDER/tests/serve.js + # Start proxy server in a separate terminal on the same machine + DEBUG=* node $PACKAGE_FOLDER/tests/proxy.js For debugging information, run these commands first. :: import logging + logging.getLogger('requests').setLevel(logging.WARNING) logging.basicConfig(level=logging.DEBUG) Emit. :: @@ -52,7 +58,7 @@ Emit with callback. :: from socketIO_client import SocketIO, LoggingNamespace def on_bbb_response(*args): - print 'on_bbb_response', args + print('on_bbb_response', args) with SocketIO('localhost', 8000, LoggingNamespace) as socketIO: socketIO.emit('bbb', {'xxx': 'yyy'}, on_bbb_response) @@ -63,7 +69,7 @@ Define events. :: from socketIO_client import SocketIO, LoggingNamespace def on_aaa_response(*args): - print 'on_aaa_response', args + print('on_aaa_response', args) socketIO = SocketIO('localhost', 8000, LoggingNamespace) socketIO.on('aaa_response', on_aaa_response) @@ -77,7 +83,7 @@ Define events in a namespace. :: class Namespace(BaseNamespace): def on_aaa_response(self, *args): - print 'on_aaa_response', args + print('on_aaa_response', args) self.emit('bbb') socketIO = SocketIO('localhost', 8000, Namespace) @@ -91,7 +97,7 @@ Define standard events. :: class Namespace(BaseNamespace): def on_connect(self): - print '[Connected]' + print('[Connected]') socketIO = SocketIO('localhost', 8000, Namespace) socketIO.wait(seconds=1) @@ -103,12 +109,12 @@ Define different namespaces on a single socket. :: class ChatNamespace(BaseNamespace): def on_aaa_response(self, *args): - print 'on_aaa_response', args + print('on_aaa_response', args) class NewsNamespace(BaseNamespace): def on_aaa_response(self, *args): - print 'on_aaa_response', args + print('on_aaa_response', args) socketIO = SocketIO('localhost', 8000) chat_namespace = socketIO.define(ChatNamespace, '/chat') @@ -139,7 +145,7 @@ Wait forever. :: from socketIO_client import SocketIO - socketIO = SocketIO('localhost') + socketIO = SocketIO('localhost', 8000) socketIO.wait() diff --git a/TODO.goals b/TODO.goals index 677aa74..bde212b 100644 --- a/TODO.goals +++ b/TODO.goals @@ -1,10 +1,11 @@ -Revive heartbeat as separate process thanks to sarietta - Check that heartbeats are sent even with short wait time #64 -Restore namespace as separate process #50 -Upgrade to socket.io 1.2 #41 #52 +Release 0.6.1 #41 #52 Merge sarietta's pull request +Add Websocket transport + Update proxy to include websocket depending on argument + Use prepared request to get headers from http_session + Include https://github.com/invisibleroads/socketIO-client/issues/68 +Add test for on_reconnect using sarietta's bash scripts +Consider logging packets sent and received Implement rooms #65 -Run under Python 3 #51 -Use continuous integration with TravisCI -Credit everyone who took the time to submit an issue or pull request -Release 0.6.1 +Implement binary event +Implement binary ack diff --git a/setup.py b/setup.py index b98da09..51ab595 100644 --- a/setup.py +++ b/setup.py @@ -1,17 +1,24 @@ -import os -from setuptools import setup, find_packages +from os.path import abspath, dirname, join +from setuptools import find_packages, setup -here = os.path.abspath(os.path.dirname(__file__)) -README = open(os.path.join(here, 'README.rst')).read() -CHANGES = open(os.path.join(here, 'CHANGES.rst')).read() +REQUIREMENTS = [ + 'requests', + 'six', + 'websocket-client', +] +HERE = dirname(abspath(__file__)) +DESCRIPTION = '\n\n'.join(open(join(HERE, _)).read() for _ in [ + 'README.rst', + 'CHANGES.rst', +]) setup( - name='socketIO-client', - version='0.5.5', + name='socketIO_client', + version='0.6.1', description='A socket.io client library', - long_description=README + '\n\n' + CHANGES, + long_description=DESCRIPTION, license='MIT', classifiers=[ 'Intended Audience :: Developers', @@ -22,11 +29,7 @@ setup( author='Roy Hyunjin Han', author_email='rhh@crosscompute.com', url='https://github.com/invisibleroads/socketIO-client', - install_requires=[ - 'requests', - 'six', - 'websocket-client', - ], + install_requires=REQUIREMENTS, tests_require=[ 'nose', 'coverage', diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index f06b5cb..7518384 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -1,181 +1,270 @@ -import logging -import json -import requests -import time -from collections import namedtuple -try: - from urllib.parse import urlparse as parse_url -except ImportError: - from urlparse import urlparse as parse_url - -from .exceptions import ( - SocketIOError, ConnectionError, TimeoutError, PacketError) -from .transports import ( - _get_response, TRANSPORTS, - _WebsocketTransport, _XHR_PollingTransport, _JSONP_PollingTransport) +from .exceptions import ConnectionError, TimeoutError, PacketError +from .heartbeats import HeartbeatThread +from .logs import LoggingMixin +from .namespaces import ( + EngineIONamespace, SocketIONamespace, LoggingSocketIONamespace, + find_callback) +from .parsers import ( + parse_host, parse_engineIO_session, + format_socketIO_packet_data, parse_socketIO_packet_data, + get_namespace_path) +from .symmetries import get_character +from .transports import XHR_PollingTransport, prepare_http_session, TRANSPORTS -__version__ = '0.5.4' -_SocketIOSession = namedtuple('_SocketIOSession', [ - 'id', - 'heartbeat_timeout', - 'server_supported_transports', -]) -_log = logging.getLogger(__name__) -PROTOCOL_VERSION = 1 -RETRY_INTERVAL_IN_SECONDS = 1 +__all__ = 'SocketIO', 'SocketIONamespace' +__version__ = '0.6.1' +BaseNamespace = SocketIONamespace +LoggingNamespace = LoggingSocketIONamespace -class BaseNamespace(object): - 'Define client behavior' +def retry(f): + def wrap(*args, **kw): + self = args[0] + try: + return f(*args, **kw) + except (TimeoutError, ConnectionError): + self._opened = False + return f(*args, **kw) + return wrap - def __init__(self, _transport, path): - self._transport = _transport - self.path = path - self._was_connected = False - self._callback_by_event = {} - self.initialize() - def initialize(self): - 'Initialize custom variables here; you can override this method' +class EngineIO(LoggingMixin): + + def __init__( + self, host, port=None, Namespace=EngineIONamespace, + wait_for_connection=True, transports=TRANSPORTS, + resource='engine.io', hurry_interval_in_seconds=1, **kw): + self._is_secure, self._url = parse_host(host, port, resource) + self._wait_for_connection = wait_for_connection + self._client_transports = transports + self._hurry_interval_in_seconds = hurry_interval_in_seconds + self._kw = kw + self._log_name = self._url + self._wants_to_close = False + self._opened = False + if Namespace: + self.define(Namespace) + self._transport + + # Connect + + @property + def _transport(self): + if self._opened: + return self._transport_instance + self._engineIO_session = self._get_engineIO_session() + self._transport_instance = self._negotiate_transport() + self._connect_namespaces() + self._opened = True + self._reset_heartbeat() + return self._transport_instance + + def _get_engineIO_session(self): + warning_screen = self._yield_warning_screen() + self._http_session = prepare_http_session(self._kw) + for elapsed_time in warning_screen: + transport = XHR_PollingTransport( + self._http_session, self._is_secure, self._url) + try: + engineIO_packet_type, engineIO_packet_data = next( + transport.recv_packet()) + break + except (TimeoutError, ConnectionError) as e: + if not self._wait_for_connection: + raise + warning = Exception('[waiting for connection] %s' % e) + warning_screen.throw(warning) + assert engineIO_packet_type == 0 + return parse_engineIO_session(engineIO_packet_data) + + def _negotiate_transport(self): + self._transport_name = 'xhr-polling' + return self._get_transport(self._transport_name) + + def _reset_heartbeat(self): + try: + self._heartbeat_thread.halt() + except AttributeError: + pass + ping_interval = self._engineIO_session.ping_interval + if self._transport_name.endswith('-polling'): + hurry_interval_in_seconds = self._hurry_interval_in_seconds + else: + hurry_interval_in_seconds = ping_interval + self._heartbeat_thread = HeartbeatThread( + send_heartbeat=self._ping, + relax_interval_in_seconds=ping_interval, + hurry_interval_in_seconds=hurry_interval_in_seconds) + self._heartbeat_thread.start() + + def _connect_namespaces(self): pass - def message(self, data='', callback=None): - self._transport.message(self.path, data, callback) + def _get_transport(self, transport_name): + self._debug('[transport selected] %s', transport_name) + SelectedTransport = { + 'xhr-polling': XHR_PollingTransport, + }[transport_name] + return SelectedTransport( + self._http_session, self._is_secure, self._url, + self._engineIO_session) - def emit(self, event, *args, **kw): - callback, args = find_callback(args, kw) - self._transport.emit(self.path, event, args, callback) + def __enter__(self): + return self - def disconnect(self): - self._transport.disconnect(self.path) + def __exit__(self, *exception_pack): + self._close() + + def __del__(self): + self._close() + + # Define + + def define(self, Namespace): + self._namespace = namespace = Namespace(self) + return namespace def on(self, event, callback): - 'Define a callback to handle a custom event emitted by the server' - self._callback_by_event[event] = callback - - def on_connect(self): - 'Called after server connects; you can override this method' - pass - - def on_disconnect(self): - 'Called after server disconnects; you can override this method' - pass - - def on_heartbeat(self): - 'Called after server sends a heartbeat; you can override this method' - pass - - def on_message(self, data): - 'Called after server sends a message; you can override this method' - pass - - def on_event(self, event, *args): - """ - Called after server sends an event; you can override this method. - Called only if a custom event handler does not exist, - such as one defined by namespace.on('my_event', my_function). - """ - callback, args = find_callback(args) - if callback: - callback(*args) - - def on_error(self, reason, advice): - 'Called after server sends an error; you can override this method' - pass - - def on_noop(self): - 'Called after server sends a noop; you can override this method' - pass - - def on_open(self, *args): - pass - - def on_close(self, *args): - pass - - def on_retry(self, *args): - pass - - def on_reconnect(self, *args): - pass - - def _find_event_callback(self, event): - # Check callbacks defined by on() try: - return self._callback_by_event[event] + namespace = self.get_namespace() + except PacketError: + namespace = self.define(EngineIONamespace) + return namespace.on(event, callback) + + def get_namespace(self): + try: + return self._namespace + except AttributeError: + raise PacketError('undefined engine.io namespace') + + # Act + + def send(self, engineIO_packet_data): + self._message(engineIO_packet_data) + + @retry + def _open(self): + engineIO_packet_type = 0 + self._transport.send_packet(engineIO_packet_type, '') + + def _close(self): + self._wants_to_close = True + self._heartbeat_thread.halt() + if not self._opened: + return + engineIO_packet_type = 1 + self._transport.send_packet(engineIO_packet_type, '') + self._opened = False + + @retry + def _ping(self, engineIO_packet_data=''): + engineIO_packet_type = 2 + self._transport.send_packet(engineIO_packet_type, engineIO_packet_data) + + @retry + def _pong(self, engineIO_packet_data=''): + engineIO_packet_type = 3 + self._transport.send_packet(engineIO_packet_type, engineIO_packet_data) + + @retry + def _message(self, engineIO_packet_data): + engineIO_packet_type = 4 + self._transport.send_packet(engineIO_packet_type, engineIO_packet_data) + self._debug('[socket.io packet sent] %s', engineIO_packet_data) + + @retry + def _upgrade(self): + engineIO_packet_type = 5 + self._transport.send_packet(engineIO_packet_type, '') + + @retry + def _noop(self): + engineIO_packet_type = 6 + self._transport.send_packet(engineIO_packet_type, '') + + # React + + def wait(self, seconds=None, **kw): + 'Wait in a loop and react to events as defined in the namespaces' + self._heartbeat_thread.hurry() + warning_screen = self._yield_warning_screen(seconds) + for elapsed_time in warning_screen: + if self._should_stop_waiting(**kw): + break + try: + try: + self._process_packets() + except TimeoutError: + pass + except ConnectionError as e: + try: + warning = Exception('[connection error] %s' % e) + warning_screen.throw(warning) + except StopIteration: + self._warn(warning) + try: + namespace = self.get_namespace() + namespace.on_disconnect() + except PacketError: + pass + self._heartbeat_thread.relax() + + def _should_stop_waiting(self): + return self._wants_to_close + + def _process_packets(self): + for engineIO_packet in self._transport.recv_packet(): + try: + self._process_packet(engineIO_packet) + except PacketError as e: + self._warn('[packet error] %s', e) + + def _process_packet(self, packet): + engineIO_packet_type, engineIO_packet_data = packet + # Launch callbacks + namespace = self.get_namespace() + try: + delegate = { + 0: self._on_open, + 1: self._on_close, + 2: self._on_ping, + 3: self._on_pong, + 4: self._on_message, + 5: self._on_upgrade, + 6: self._on_noop, + }[engineIO_packet_type] except KeyError: - pass - # Convert connect to reconnect if we have seen connect already - if event == 'connect': - if not self._was_connected: - self._was_connected = True - else: - event = 'reconnect' - # Check callbacks defined explicitly or use on_event() - return getattr( - self, - 'on_' + event.replace(' ', '_'), - lambda *args: self.on_event(event, *args)) + raise PacketError( + 'unexpected engine.io packet type (%s)' % engineIO_packet_type) + delegate(engineIO_packet_data, namespace._find_packet_callback) + if engineIO_packet_type is 4: + return engineIO_packet_data + + def _on_open(self, data, find_packet_callback): + find_packet_callback('open')() + + def _on_close(self, data, find_packet_callback): + find_packet_callback('close')() + + def _on_ping(self, data, find_packet_callback): + self._pong(data) + find_packet_callback('ping')(data) + + def _on_pong(self, data, find_packet_callback): + find_packet_callback('pong')(data) + + def _on_message(self, data, find_packet_callback): + find_packet_callback('message')(data) + + def _on_upgrade(self, data, find_packet_callback): + find_packet_callback('upgrade')() + + def _on_noop(self, data, find_packet_callback): + find_packet_callback('noop')() -class LoggingNamespace(BaseNamespace): - - def _log(self, level, msg, *attrs): - _log.log(level, '%s: %s' % (self._transport._url, msg), *attrs) - - def on_connect(self): - self._log(logging.DEBUG, '%s [connect]', self.path) - super(LoggingNamespace, self).on_connect() - - def on_disconnect(self): - self._log(logging.DEBUG, '%s [disconnect]', self.path) - super(LoggingNamespace, self).on_disconnect() - - def on_heartbeat(self): - self._log(logging.DEBUG, '%s [heartbeat]', self.path) - super(LoggingNamespace, self).on_heartbeat() - - def on_message(self, data): - self._log(logging.INFO, '%s [message] %s', self.path, data) - super(LoggingNamespace, self).on_message(data) - - def on_event(self, event, *args): - callback, args = find_callback(args) - arguments = [repr(_) for _ in args] - if callback: - arguments.append('callback(*args)') - self._log(logging.INFO, '%s [event] %s(%s)', self.path, event, - ', '.join(arguments)) - super(LoggingNamespace, self).on_event(event, *args) - - def on_error(self, reason, advice): - self._log(logging.INFO, '%s [error] %s', self.path, advice) - super(LoggingNamespace, self).on_error(reason, advice) - - def on_noop(self): - self._log(logging.INFO, '%s [noop]', self.path) - super(LoggingNamespace, self).on_noop() - - def on_open(self, *args): - self._log(logging.INFO, '%s [open] %s', self.path, args) - super(LoggingNamespace, self).on_open(*args) - - def on_close(self, *args): - self._log(logging.INFO, '%s [close] %s', self.path, args) - super(LoggingNamespace, self).on_close(*args) - - def on_retry(self, *args): - self._log(logging.INFO, '%s [retry] %s', self.path, args) - super(LoggingNamespace, self).on_retry(*args) - - def on_reconnect(self, *args): - self._log(logging.INFO, '%s [reconnect] %s', self.path, args) - super(LoggingNamespace, self).on_reconnect(*args) - - -class SocketIO(object): - +class SocketIO(EngineIO): """Create a socket.io client that connects to a socket.io server at the specified host and port. @@ -185,7 +274,8 @@ class SocketIO(object): - Specify desired transports=['websocket', 'xhr-polling']. - Pass query params, headers, cookies, proxies as keyword arguments. - SocketIO('localhost', 8000, + SocketIO( + 'localhost', 8000, params={'q': 'qqq'}, headers={'Authorization': 'Basic ' + b64encode('username:password')}, cookies={'a': 'aaa'}, @@ -193,330 +283,186 @@ class SocketIO(object): """ def __init__( - self, host, port=None, Namespace=None, + self, host, port=None, Namespace=SocketIONamespace, wait_for_connection=True, transports=TRANSPORTS, - resource='socket.io', **kw): - self.is_secure, self._base_url = _parse_host(host, port, resource) - self.wait_for_connection = wait_for_connection + resource='socket.io', hurry_interval_in_seconds=1, **kw): self._namespace_by_path = {} - self._client_supported_transports = transports - self._kw = kw - if Namespace: - self.define(Namespace) + self._callback_by_ack_id = {} + self._ack_id = 0 + super(SocketIO, self).__init__( + host, port, Namespace, wait_for_connection, transports, + resource, hurry_interval_in_seconds, **kw) - def _log(self, level, msg, *attrs): - _log.log(level, '%s: %s' % (self._base_url, msg), *attrs) - - def __enter__(self): - return self - - def __exit__(self, *exception_pack): - self.disconnect() - - def __del__(self): - self.disconnect() - - def define(self, Namespace, path=''): - if path: - self._transport.connect(path) - namespace = Namespace(self._transport, path) - self._namespace_by_path[path] = namespace - return namespace - - def on(self, event, callback, path=''): - if path not in self._namespace_by_path: - self.define(BaseNamespace, path) - return self.get_namespace(path).on(event, callback) - - def message(self, data='', callback=None, path=''): - self._transport.message(path, data, callback) - - def emit(self, event, *args, **kw): - path = kw.get('path', '') - callback, args = find_callback(args, kw) - self._transport.emit(path, event, args, callback) - - def wait(self, seconds=None, for_callbacks=False): - """Wait in a loop and process events as defined in the namespaces. - - - Omit seconds, i.e. call wait() without arguments, to wait forever. - """ - warning_screen = _yield_warning_screen(seconds) - timeout = min(self._heartbeat_interval, seconds) - for elapsed_time in warning_screen: - if self._stop_waiting(for_callbacks): - break - try: - try: - self._process_events(timeout) - except TimeoutError: - pass - next(self._heartbeat_pacemaker) - except ConnectionError as e: - try: - warning = Exception('[connection error] %s' % e) - warning_screen.throw(warning) - except StopIteration: - self._log(logging.WARNING, warning) - try: - namespace = self._namespace_by_path[''] - namespace.on_disconnect() - except KeyError: - pass - - def _process_events(self, timeout=None): - for packet in self._transport.recv_packet(timeout): - try: - self._process_packet(packet) - except PacketError as e: - self._log(logging.WARNING, '[packet error] %s', e) - - def _process_packet(self, packet): - code, packet_id, path, data = packet - namespace = self.get_namespace(path) - delegate = self._get_delegate(code) - delegate(packet, namespace._find_event_callback) - - def _stop_waiting(self, for_callbacks): - # Use __transport to make sure that we do not reconnect inadvertently - if for_callbacks and not self.__transport.has_ack_callback: - return True - if self.__transport._wants_to_disconnect: - return True - return False - - def wait_for_callbacks(self, seconds=None): - self.wait(seconds, for_callbacks=True) - - def disconnect(self, path=''): - try: - self._transport.disconnect(path) - except ReferenceError: - pass - try: - namespace = self._namespace_by_path[path] - namespace.on_disconnect() - del self._namespace_by_path[path] - except KeyError: - pass + # Connect @property def connected(self): - try: - transport = self.__transport - except AttributeError: - return False - else: - return transport.connected + return self._opened - @property - def _transport(self): - try: - if self.connected: - return self.__transport - except AttributeError: - pass - socketIO_session = self._get_socketIO_session() - supported_transports = self._get_supported_transports(socketIO_session) - self._heartbeat_pacemaker = self._make_heartbeat_pacemaker( - heartbeat_timeout=socketIO_session.heartbeat_timeout) - next(self._heartbeat_pacemaker) - warning_screen = _yield_warning_screen(seconds=None) - for elapsed_time in warning_screen: - try: - self._transport_name = supported_transports.pop(0) - except IndexError: - raise ConnectionError('Could not negotiate a transport') - try: - self.__transport = self._get_transport( - socketIO_session, self._transport_name) - break - except ConnectionError: - pass + def _connect_namespaces(self): for path, namespace in self._namespace_by_path.items(): - namespace._transport = self.__transport + namespace._transport = self._transport_instance if path: - self.__transport.connect(path) - return self.__transport + self.connect(path) - def _get_socketIO_session(self): - warning_screen = _yield_warning_screen(seconds=None) - for elapsed_time in warning_screen: - try: - return _get_socketIO_session( - self.is_secure, self._base_url, **self._kw) - except ConnectionError as e: - if not self.wait_for_connection: - raise - warning = Exception('[waiting for connection] %s' % e) - try: - warning_screen.throw(warning) - except StopIteration: - self._log(logging.WARNING, warning) + def __exit__(self, *exception_pack): + self.disconnect() + super(SocketIO, self).__exit__(*exception_pack) - def _get_supported_transports(self, session): - self._log( - logging.DEBUG, '[transports available] %s', - ' '.join(session.server_supported_transports)) - supported_transports = [ - x for x in self._client_supported_transports if - x in session.server_supported_transports] - if not supported_transports: - raise SocketIOError(' '.join([ - 'could not negotiate a transport:', - 'client supports %s but' % ', '.join( - self._client_supported_transports), - 'server supports %s' % ', '.join( - session.server_supported_transports), - ])) - return supported_transports + def __del__(self): + self.disconnect() + super(SocketIO, self).__del__() - def _get_transport(self, session, transport_name): - self._log(logging.DEBUG, '[transport chosen] %s', transport_name) - return { - 'websocket': _WebsocketTransport, - 'xhr-polling': _XHR_PollingTransport, - 'jsonp-polling': _JSONP_PollingTransport, - }[transport_name](session, self.is_secure, self._base_url, **self._kw) + # Define - def _make_heartbeat_pacemaker(self, heartbeat_timeout): - self._heartbeat_interval = heartbeat_timeout / 2 - heartbeat_time = time.time() - while True: - yield - if time.time() - heartbeat_time > self._heartbeat_interval: - heartbeat_time = time.time() - self._transport.send_heartbeat() + def define(self, Namespace, path=''): + if path: + self.connect(path) + self._namespace_by_path[path] = namespace = Namespace(self, path) + return namespace + + def on(self, event, callback, path=''): + try: + namespace = self.get_namespace(path) + except PacketError: + namespace = self.define(SocketIONamespace, path) + return namespace.on(event, callback) def get_namespace(self, path=''): try: return self._namespace_by_path[path] except KeyError: - raise PacketError('unhandled namespace path (%s)' % path) + raise PacketError('undefined socket.io namespace (%s)' % path) - def _get_delegate(self, code): + # Act + + def connect(self, path): + socketIO_packet_type = 0 + socketIO_packet_data = format_socketIO_packet_data(path) + self._message(str(socketIO_packet_type) + socketIO_packet_data) + + def disconnect(self, path=''): + if not self._opened: + return + if path: + socketIO_packet_type = 1 + socketIO_packet_data = format_socketIO_packet_data(path) + self._message(str(socketIO_packet_type) + socketIO_packet_data) + else: + self._close() try: - return { - '0': self._on_disconnect, - '1': self._on_connect, - '2': self._on_heartbeat, - '3': self._on_message, - '4': self._on_json, - '5': self._on_event, - '6': self._on_ack, - '7': self._on_error, - '8': self._on_noop, - }[code] + namespace = self._namespace_by_path.pop(path) + namespace.on_disconnect() except KeyError: - raise PacketError('unexpected code (%s)' % code) + pass - def _on_disconnect(self, packet, find_event_callback): - find_event_callback('disconnect')() + def emit(self, event, *args, **kw): + path = kw.get('path', '') + callback, args = find_callback(args, kw) + ack_id = self._set_ack_callback(callback) if callback else None + args = [event] + list(args) + socketIO_packet_type = 2 + socketIO_packet_data = format_socketIO_packet_data(path, ack_id, args) + self._message(str(socketIO_packet_type) + socketIO_packet_data) - def _on_connect(self, packet, find_event_callback): - find_event_callback('connect')() - - def _on_heartbeat(self, packet, find_event_callback): - find_event_callback('heartbeat')() - - def _on_message(self, packet, find_event_callback): - code, packet_id, path, data = packet + def send(self, data='', callback=None, **kw): + path = kw.get('path', '') args = [data] - if packet_id: - args.append(self._prepare_to_send_ack(path, packet_id)) - find_event_callback('message')(*args) + if callback: + args.append(callback) + self.emit('message', *args, path=path) - def _on_json(self, packet, find_event_callback): - code, packet_id, path, data = packet - args = [json.loads(data)] - if packet_id: - args.append(self._prepare_to_send_ack(path, packet_id)) - find_event_callback('message')(*args) + def _ack(self, path, ack_id, *args): + socketIO_packet_type = 3 + socketIO_packet_data = format_socketIO_packet_data(path, ack_id, args) + self._message(str(socketIO_packet_type) + socketIO_packet_data) - def _on_event(self, packet, find_event_callback): - code, packet_id, path, data = packet - value_by_name = json.loads(data) - event = value_by_name['name'] - args = value_by_name.get('args', []) - if packet_id: - args.append(self._prepare_to_send_ack(path, packet_id)) - find_event_callback(event)(*args) + # React - def _on_ack(self, packet, find_event_callback): - code, packet_id, path, data = packet - data_parts = data.split('+', 1) - packet_id = data_parts[0] + def wait(self, seconds=None, for_callbacks=False): + super(SocketIO, self).wait(seconds, for_callbacks=for_callbacks) + + def wait_for_callbacks(self, seconds=None): + self.wait(seconds, for_callbacks=True) + + def _should_stop_waiting(self, for_callbacks): + if for_callbacks and not self._has_ack_callback: + return True + return super(SocketIO, self)._should_stop_waiting() + + def _process_packet(self, packet): + engineIO_packet_data = super(SocketIO, self)._process_packet(packet) + if engineIO_packet_data is None: + return + self._debug('[socket.io packet received] %s', engineIO_packet_data) + socketIO_packet_type = int(get_character(engineIO_packet_data, 0)) + socketIO_packet_data = engineIO_packet_data[1:] + # Launch callbacks + path = get_namespace_path(socketIO_packet_data) + namespace = self.get_namespace(path) try: - ack_callback = self._transport.get_ack_callback(packet_id) + delegate = { + 0: self._on_connect, + 1: self._on_disconnect, + 2: self._on_event, + 3: self._on_ack, + 4: self._on_error, + 5: self._on_binary_event, + 6: self._on_binary_ack, + }[socketIO_packet_type] + except KeyError: + raise PacketError( + 'unexpected socket.io packet type (%s)' % socketIO_packet_type) + delegate(socketIO_packet_data, namespace._find_packet_callback) + return socketIO_packet_data + + def _on_connect(self, data, find_packet_callback): + find_packet_callback('connect')() + + def _on_disconnect(self, data, find_packet_callback): + find_packet_callback('disconnect')() + + def _on_event(self, data, find_packet_callback): + data_parsed = parse_socketIO_packet_data(data) + args = data_parsed.args + try: + event = args.pop(0) + except IndexError: + raise PacketError('missing event name') + if data_parsed.ack_id is not None: + args.append(self._prepare_to_send_ack( + data_parsed.path, data_parsed.ack_id)) + find_packet_callback(event)(*args) + + def _on_ack(self, data, find_packet_callback): + data_parsed = parse_socketIO_packet_data(data) + try: + ack_callback = self._get_ack_callback(data_parsed.ack_id) except KeyError: return - args = json.loads(data_parts[1]) if len(data_parts) > 1 else [] - ack_callback(*args) + ack_callback(*data_parsed.args) - def _on_error(self, packet, find_event_callback): - code, packet_id, path, data = packet - reason, advice = data.split('+', 1) - find_event_callback('error')(reason, advice) + def _on_error(self, data, find_packet_callback): + find_packet_callback('error')(data) - def _on_noop(self, packet, find_event_callback): - find_event_callback('noop')() + def _on_binary_event(self, data, find_packet_callback): + self._warn('[not implemented] binary event') - def _prepare_to_send_ack(self, path, packet_id): + def _on_binary_ack(self, data, find_packet_callback): + self._warn('[not implemented] binary ack') + + def _prepare_to_send_ack(self, path, ack_id): 'Return function that acknowledges the server' - return lambda *args: self._transport.ack(path, packet_id, *args) + return lambda *args: self._ack(path, ack_id, *args) + def _set_ack_callback(self, callback): + self._ack_id += 1 + self._callback_by_ack_id[self._ack_id] = callback + return self._ack_id -def find_callback(args, kw=None): - 'Return callback whether passed as a last argument or as a keyword' - if args and callable(args[-1]): - return args[-1], args[:-1] - try: - return kw['callback'], args - except (KeyError, TypeError): - return None, args + def _get_ack_callback(self, ack_id): + return self._callback_by_ack_id.pop(ack_id) - -def _parse_host(host, port, resource): - if not host.startswith('http'): - host = 'http://' + host - url_pack = parse_url(host) - is_secure = url_pack.scheme == 'https' - port = port or url_pack.port or (443 if is_secure else 80) - base_url = '%s:%d%s/%s/%s' % ( - url_pack.hostname, port, url_pack.path, resource, PROTOCOL_VERSION) - return is_secure, base_url - - -def _yield_warning_screen(seconds=None): - last_warning = None - for elapsed_time in _yield_elapsed_time(seconds): - try: - yield elapsed_time - except Exception as warning: - warning = str(warning) - if last_warning != warning: - last_warning = warning - _log.warn(warning) - time.sleep(RETRY_INTERVAL_IN_SECONDS) - - -def _yield_elapsed_time(seconds=None): - start_time = time.time() - if seconds is None: - while True: - yield time.time() - start_time - while time.time() - start_time < seconds: - yield time.time() - start_time - - -def _get_socketIO_session(is_secure, base_url, **kw): - server_url = '%s://%s/' % ('https' if is_secure else 'http', base_url) - try: - response = _get_response(requests.get, server_url, **kw) - except TimeoutError as e: - raise ConnectionError(e) - response_parts = response.text.split(':') - return _SocketIOSession( - id=response_parts[0], - heartbeat_timeout=int(response_parts[1]), - server_supported_transports=response_parts[3].split(',')) + @property + def _has_ack_callback(self): + return True if self._callback_by_ack_id else False diff --git a/socketIO_client/heartbeats.py b/socketIO_client/heartbeats.py new file mode 100644 index 0000000..db2599d --- /dev/null +++ b/socketIO_client/heartbeats.py @@ -0,0 +1,47 @@ +from threading import Thread, Event + +from .exceptions import ConnectionError, TimeoutError + + +class HeartbeatThread(Thread): + + daemon = True + + def __init__( + self, send_heartbeat, + relax_interval_in_seconds, + hurry_interval_in_seconds): + super(HeartbeatThread, self).__init__() + self._send_heartbeat = send_heartbeat + self._relax_interval_in_seconds = relax_interval_in_seconds + self._hurry_interval_in_seconds = hurry_interval_in_seconds + self._adrenaline = Event() + self._rest = Event() + self._halt = Event() + + def run(self): + try: + while not self._halt.is_set(): + try: + self._send_heartbeat() + except TimeoutError: + pass + if self._adrenaline.is_set(): + interval_in_seconds = self._hurry_interval_in_seconds + else: + interval_in_seconds = self._relax_interval_in_seconds + self._rest.wait(interval_in_seconds) + except ConnectionError: + pass + + def relax(self): + self._adrenaline.clear() + + def hurry(self): + self._adrenaline.set() + self._rest.set() + self._rest.clear() + + def halt(self): + self._rest.set() + self._halt.set() diff --git a/socketIO_client/logs.py b/socketIO_client/logs.py new file mode 100644 index 0000000..ff37d19 --- /dev/null +++ b/socketIO_client/logs.py @@ -0,0 +1,38 @@ +import logging +import time + + +class LoggingMixin(object): + + def _log(self, level, msg, *attrs): + logging.log(level, '%s %s' % (self._log_name, msg), *attrs) + + def _debug(self, msg, *attrs): + self._log(logging.DEBUG, msg, *attrs) + + def _info(self, msg, *attrs): + self._log(logging.INFO, msg, *attrs) + + def _warn(self, msg, *attrs): + self._log(logging.WARNING, msg, *attrs) + + def _yield_warning_screen(self, seconds=None): + last_warning = None + for elapsed_time in _yield_elapsed_time(seconds): + try: + yield elapsed_time + except Exception as warning: + warning = str(warning) + if last_warning != warning: + last_warning = warning + self._warn(warning) + time.sleep(1) + + +def _yield_elapsed_time(seconds=None): + start_time = time.time() + if seconds is None: + while True: + yield time.time() - start_time + while time.time() - start_time < seconds: + yield time.time() - start_time diff --git a/socketIO_client/namespaces.py b/socketIO_client/namespaces.py new file mode 100644 index 0000000..84cd65c --- /dev/null +++ b/socketIO_client/namespaces.py @@ -0,0 +1,220 @@ +from .logs import LoggingMixin + + +class EngineIONamespace(LoggingMixin): + 'Define engine.io client behavior' + + def __init__(self, io): + self._io = io + self._callback_by_event = {} + self._log_name = io._url + self.initialize() + + def initialize(self): + """Initialize custom variables here. + You can override this method.""" + + def on(self, event, callback): + 'Define a callback to handle an event emitted by the server' + self._callback_by_event[event] = callback + + def send(self, data): + 'Send a message' + self._io.send(data) + + def on_open(self): + """Called after engine.io connects. + You can override this method.""" + + def on_close(self): + """Called after engine.io disconnects. + You can override this method.""" + + def on_ping(self, data): + """Called after engine.io sends a ping packet. + You can override this method.""" + + def on_pong(self, data): + """Called after engine.io sends a pong packet. + You can override this method.""" + + def on_message(self, data): + """Called after engine.io sends a message packet. + You can override this method.""" + + def on_upgrade(self): + """Called after engine.io sends an upgrade packet. + You can override this method.""" + + def on_noop(self): + """Called after engine.io sends a noop packet. + You can override this method.""" + + def _find_packet_callback(self, event): + # Check callbacks defined by on() + try: + return self._callback_by_event[event] + except KeyError: + pass + # Check callbacks defined explicitly + return getattr(self, 'on_' + event) + + +class SocketIONamespace(EngineIONamespace): + 'Define socket.io client behavior' + + def __init__(self, io, path): + self.path = path + super(SocketIONamespace, self).__init__(io) + + def connect(self): + self._io.connect(self.path) + + def disconnect(self): + self._io.disconnect(self.path) + + def emit(self, event, *args, **kw): + self._io.emit(event, path=self.path, *args, **kw) + + def send(self, data='', callback=None): + self._io.send(data, callback) + + def on_connect(self): + """Called after socket.io connects. + You can override this method.""" + + def on_reconnect(self): + """Called after socket.io reconnects. + You can override this method.""" + + def on_disconnect(self): + """Called after socket.io disconnects. + You can override this method.""" + + def on_event(self, event, *args): + """ + Called if there is no matching event handler. + You can override this method. + There are three ways to define an event handler: + + - Call socketIO.on() + + socketIO = SocketIO('localhost', 8000) + socketIO.on('my_event', my_function) + + - Call namespace.on() + + namespace = socketIO.get_namespace() + namespace.on('my_event', my_function) + + - Define namespace.on_xxx + + class Namespace(SocketIONamespace): + + def on_my_event(self, *args): + my_function(*args) + + socketIO.define(Namespace)""" + + def on_error(self, data): + """Called after socket.io sends an error packet. + You can override this method.""" + + def _find_packet_callback(self, event): + # Interpret events + if event == 'connect': + if not hasattr(self, '_was_connected'): + self._was_connected = True + else: + event = 'reconnect' + # Check callbacks defined by on() + try: + return self._callback_by_event[event] + except KeyError: + pass + # Check callbacks defined explicitly or use on_event() + return getattr( + self, 'on_' + event.replace(' ', '_'), + lambda *args: self.on_event(event, *args)) + + +class LoggingEngineIONamespace(EngineIONamespace): + + def on_open(self): + self._debug('[open]') + super(LoggingEngineIONamespace, self).on_open() + + def on_close(self): + self._debug('[close]') + super(LoggingEngineIONamespace, self).on_close() + + def on_ping(self, data): + self._debug('[ping] %s', data) + super(LoggingEngineIONamespace, self).on_ping(data) + + def on_pong(self, data): + self._debug('[pong] %s', data) + super(LoggingEngineIONamespace, self).on_pong(data) + + def on_message(self, data): + self._debug('[message] %s', data) + super(LoggingEngineIONamespace, self).on_message(data) + + def on_upgrade(self): + self._debug('[upgrade]') + super(LoggingEngineIONamespace, self).on_upgrade() + + def on_noop(self): + self._debug('[noop]') + super(LoggingEngineIONamespace, self).on_noop() + + def on_event(self, event, *args): + callback, args = find_callback(args) + arguments = [repr(_) for _ in args] + if callback: + arguments.append('callback(*args)') + self._info('[event] %s(%s)', event, ', '.join(arguments)) + super(LoggingEngineIONamespace, self).on_event(event, *args) + + +class LoggingSocketIONamespace(SocketIONamespace): + + def on_connect(self): + self._debug('%s[connect]', _make_logging_header(self.path)) + super(LoggingSocketIONamespace, self).on_connect() + + def on_reconnect(self): + self._debug('%s[reconnect]', _make_logging_header(self.path)) + super(LoggingSocketIONamespace, self).on_reconnect() + + def on_disconnect(self): + self._debug('%s[disconnect]', _make_logging_header(self.path)) + super(LoggingSocketIONamespace, self).on_disconnect() + + def on_event(self, event, *args): + callback, args = find_callback(args) + arguments = [repr(_) for _ in args] + if callback: + arguments.append('callback(*args)') + self._info( + '%s[event] %s(%s)', _make_logging_header(self.path), event, + ', '.join(arguments)) + super(LoggingSocketIONamespace, self).on_event(event, *args) + + def on_error(self, data): + self._debug('%s[error] %s', _make_logging_header(self.path), data) + super(LoggingSocketIONamespace, self).on_error() + + +def find_callback(args, kw=None): + 'Return callback whether passed as a last argument or as a keyword' + if args and callable(args[-1]): + return args[-1], args[:-1] + try: + return kw['callback'], args + except (KeyError, TypeError): + return None, args + + +def _make_logging_header(path): + return path + ' ' if path else '' diff --git a/socketIO_client/parsers.py b/socketIO_client/parsers.py new file mode 100644 index 0000000..c962d77 --- /dev/null +++ b/socketIO_client/parsers.py @@ -0,0 +1,127 @@ +import json +from collections import namedtuple + +from .symmetries import ( + decode_string, encode_string, get_byte, get_character, parse_url) + + +EngineIOSession = namedtuple('EngineIOSession', [ + 'id', 'ping_interval', 'ping_timeout', 'transport_upgrades']) +SocketIOData = namedtuple('SocketIOData', ['path', 'ack_id', 'args']) + + +def parse_host(host, port, resource): + if not host.startswith('http'): + host = 'http://' + host + url_pack = parse_url(host) + is_secure = url_pack.scheme == 'https' + port = port or url_pack.port or (443 if is_secure else 80) + url = '%s:%d%s/%s' % (url_pack.hostname, port, url_pack.path, resource) + return is_secure, url + + +def parse_engineIO_session(engineIO_packet_data): + d = json.loads(decode_string(engineIO_packet_data)) + return EngineIOSession( + id=d['sid'], + ping_interval=d['pingInterval'] / float(1000), + ping_timeout=d['pingTimeout'] / float(1000), + transport_upgrades=d['upgrades']) + + +def encode_engineIO_content(engineIO_packets): + content = bytearray() + for packet_type, packet_data in engineIO_packets: + packet_string = encode_string(str(packet_type) + packet_data) + content.extend(_make_packet_header(packet_string) + packet_string) + return content + + +def decode_engineIO_content(content): + content_index = 0 + content_length = len(content) + while content_index < content_length: + try: + content_index, packet_length = _read_packet_length( + content, content_index) + except IndexError: + break + content_index, packet_string = _read_packet_string( + content, content_index, packet_length) + engineIO_packet_type = int(get_character(packet_string, 0)) + engineIO_packet_data = packet_string[1:] + yield engineIO_packet_type, engineIO_packet_data + + +def format_socketIO_packet_data(path=None, ack_id=None, args=None): + socketIO_packet_data = json.dumps(args, ensure_ascii=False) if args else '' + if ack_id is not None: + socketIO_packet_data = str(ack_id) + socketIO_packet_data + if path: + socketIO_packet_data = path + ',' + socketIO_packet_data + return socketIO_packet_data + + +def parse_socketIO_packet_data(socketIO_packet_data): + data = decode_string(socketIO_packet_data) + if data.startswith('/'): + try: + path, data = data.split(',', 1) + except ValueError: + path = data + data = '' + else: + path = '' + try: + ack_id_string, data = data.split('[', 1) + data = '[' + data + ack_id = int(ack_id_string) + except (ValueError, IndexError): + ack_id = None + try: + args = json.loads(data) + except ValueError: + args = [] + return SocketIOData(path=path, ack_id=ack_id, args=args) + + +def get_namespace_path(socketIO_packet_data): + if not socketIO_packet_data.startswith(b'/'): + return '' + # Loop incrementally in case there is binary data + parts = [] + for i in range(len(socketIO_packet_data)): + character = get_character(socketIO_packet_data, i) + if ',' == character: + break + parts.append(character) + return ''.join(parts) + + +def _make_packet_header(packet_string): + length_string = str(len(packet_string)) + header_digits = bytearray([0]) + for i in range(len(length_string)): + header_digits.append(ord(length_string[i]) - 48) + header_digits.append(255) + return header_digits + + +def _read_packet_length(content, content_index): + while get_byte(content, content_index) != 0: + content_index += 1 + content_index += 1 + packet_length_string = '' + byte = get_byte(content, content_index) + while byte != 255: + packet_length_string += str(byte) + content_index += 1 + byte = get_byte(content, content_index) + return content_index, int(packet_length_string) + + +def _read_packet_string(content, content_index, packet_length): + while get_byte(content, content_index) == 255: + content_index += 1 + packet_string = content[content_index:content_index + packet_length] + return content_index + packet_length, packet_string diff --git a/socketIO_client/symmetries.py b/socketIO_client/symmetries.py new file mode 100644 index 0000000..b061a20 --- /dev/null +++ b/socketIO_client/symmetries.py @@ -0,0 +1,21 @@ +import six +try: + from urllib.parse import urlparse as parse_url +except ImportError: + from urlparse import urlparse as parse_url + + +def get_byte(x, index): + return six.indexbytes(x, index) + + +def get_character(x, index): + return chr(six.indexbytes(x, index)) + + +def encode_string(x): + return x.encode('utf-8') + + +def decode_string(x): + return x.decode('utf-8') diff --git a/socketIO_client/tests.py b/socketIO_client/tests/__init__.py similarity index 70% rename from socketIO_client/tests.py rename to socketIO_client/tests/__init__.py index 400d626..2ce4856 100644 --- a/socketIO_client/tests.py +++ b/socketIO_client/tests/__init__.py @@ -2,8 +2,7 @@ import logging import time from unittest import TestCase -from . import SocketIO, LoggingNamespace, find_callback -from .transports import TIMEOUT_IN_SECONDS +from .. import SocketIO, LoggingNamespace, find_callback HOST = 'localhost' @@ -16,22 +15,15 @@ logging.basicConfig(level=logging.DEBUG) class BaseMixin(object): def setUp(self): + super(BaseMixin, self).setUp() self.called_on_response = False def tearDown(self): + super(BaseMixin, self).tearDown() del self.socketIO - def on_response(self, *args): - for arg in args: - if isinstance(arg, dict): - self.assertEqual(arg, PAYLOAD) - else: - self.assertEqual(arg, DATA) - self.called_on_response = True - def test_disconnect(self): 'Disconnect' - self.socketIO.define(LoggingNamespace) self.assertTrue(self.socketIO.connected) self.socketIO.disconnect() self.assertFalse(self.socketIO.connected) @@ -43,41 +35,6 @@ class BaseMixin(object): self.assertTrue(namespace.called_on_disconnect) self.assertFalse(self.socketIO.connected) - def test_message(self): - 'Message' - namespace = self.socketIO.define(Namespace) - self.socketIO.message() - self.socketIO.wait(self.wait_time_in_seconds) - self.assertEqual(namespace.response, 'message_response') - - def test_message_with_data(self): - 'Message with data' - namespace = self.socketIO.define(Namespace) - self.socketIO.message(DATA) - self.socketIO.wait(self.wait_time_in_seconds) - self.assertEqual(namespace.response, DATA) - - def test_message_with_payload(self): - 'Message with payload' - namespace = self.socketIO.define(Namespace) - self.socketIO.message(PAYLOAD) - self.socketIO.wait(self.wait_time_in_seconds) - self.assertEqual(namespace.response, PAYLOAD) - - def test_message_with_callback(self): - 'Message with callback' - self.socketIO.define(LoggingNamespace) - self.socketIO.message(callback=self.on_response) - self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) - self.assertTrue(self.called_on_response) - - def test_message_with_callback_with_data(self): - 'Message with callback with data' - self.socketIO.define(LoggingNamespace) - self.socketIO.message(DATA, self.on_response) - self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) - self.assertTrue(self.called_on_response) - def test_emit(self): 'Emit' namespace = self.socketIO.define(Namespace) @@ -107,14 +64,12 @@ class BaseMixin(object): def test_emit_with_callback(self): 'Emit with callback' - self.socketIO.define(LoggingNamespace) self.socketIO.emit('emit_with_callback', self.on_response) self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) self.assertTrue(self.called_on_response) def test_emit_with_callback_with_payload(self): 'Emit with callback with payload' - self.socketIO.define(LoggingNamespace) self.socketIO.emit( 'emit_with_callback_with_payload', self.on_response) self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) @@ -122,7 +77,6 @@ class BaseMixin(object): def test_emit_with_callback_with_multiple_payloads(self): 'Emit with callback with multiple payloads' - self.socketIO.define(LoggingNamespace) self.socketIO.emit( 'emit_with_callback_with_multiple_payloads', self.on_response) self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) @@ -135,14 +89,28 @@ class BaseMixin(object): self.socketIO.wait(self.wait_time_in_seconds) self.assertTrue(self.called_on_response) - def test_ack(self): - 'Trigger server callback' + def test_send(self): + 'Send' namespace = self.socketIO.define(Namespace) - self.socketIO.emit('ack', PAYLOAD) + self.socketIO.send() + self.socketIO.wait(self.wait_time_in_seconds) + self.assertEqual(namespace.response, 'message_response') + + def test_send_with_data(self): + 'Send with data' + namespace = self.socketIO.define(Namespace) + self.socketIO.send(DATA) + self.socketIO.wait(self.wait_time_in_seconds) + self.assertEqual(namespace.response, DATA) + + def test_ack(self): + 'Respond to a server callback request' + namespace = self.socketIO.define(Namespace) + self.socketIO.emit('trigger_server_expects_callback', PAYLOAD) self.socketIO.wait(self.wait_time_in_seconds) self.assertEqual(namespace.args_by_event, { - 'ack_response': (PAYLOAD,), - 'ack_callback_response': (PAYLOAD,), + 'server_expects_callback': (PAYLOAD,), + 'server_received_callback': (PAYLOAD,), }) def test_wait_with_disconnect(self): @@ -168,52 +136,45 @@ class BaseMixin(object): }) def test_namespace_ack(self): - 'Trigger server callback' + 'Respond to a server callback request within a namespace' chat_namespace = self.socketIO.define(Namespace, '/chat') - chat_namespace.emit('ack', PAYLOAD) + chat_namespace.emit('trigger_server_expects_callback', PAYLOAD) self.socketIO.wait(self.wait_time_in_seconds) self.assertEqual(chat_namespace.args_by_event, { - 'ack_response': (PAYLOAD,), - 'ack_callback_response': (PAYLOAD,), + 'server_expects_callback': (PAYLOAD,), + 'server_received_callback': (PAYLOAD,), }) - -class Test_WebsocketTransport(TestCase, BaseMixin): - - def setUp(self): - super(Test_WebsocketTransport, self).setUp() - self.socketIO = SocketIO(HOST, PORT, transports=['websocket']) - self.wait_time_in_seconds = 0.1 + def on_response(self, *args): + for arg in args: + if isinstance(arg, dict): + self.assertEqual(arg, PAYLOAD) + else: + self.assertEqual(arg, DATA) + self.called_on_response = True -class Test_XHR_PollingTransport(TestCase, BaseMixin): +class Test_XHR_PollingTransport(BaseMixin, TestCase): def setUp(self): super(Test_XHR_PollingTransport, self).setUp() - self.socketIO = SocketIO(HOST, PORT, transports=['xhr-polling']) - self.wait_time_in_seconds = TIMEOUT_IN_SECONDS + 1 - - -class Test_JSONP_PollingTransport(TestCase, BaseMixin): - - def setUp(self): - super(Test_JSONP_PollingTransport, self).setUp() - self.socketIO = SocketIO(HOST, PORT, transports=['jsonp-polling']) - self.wait_time_in_seconds = TIMEOUT_IN_SECONDS + 1 + self.socketIO = SocketIO(HOST, PORT, LoggingNamespace, transports=[ + 'xhr-polling']) + self.wait_time_in_seconds = 1 class Namespace(LoggingNamespace): def initialize(self): - self.response = None - self.args_by_event = {} self.called_on_disconnect = False + self.args_by_event = {} + self.response = None def on_disconnect(self): self.called_on_disconnect = True - def on_message(self, data): - self.response = data + def on_wait_with_disconnect_response(self): + self.disconnect() def on_event(self, event, *args): callback, args = find_callback(args) @@ -221,5 +182,5 @@ class Namespace(LoggingNamespace): callback(*args) self.args_by_event[event] = args - def on_wait_with_disconnect_response(self): - self.disconnect() + def on_message(self, data): + self.response = data diff --git a/socketIO_client/tests/index.html b/socketIO_client/tests/index.html new file mode 100644 index 0000000..df37df0 --- /dev/null +++ b/socketIO_client/tests/index.html @@ -0,0 +1,25 @@ + + diff --git a/socketIO_client/tests/proxy.js b/socketIO_client/tests/proxy.js new file mode 100644 index 0000000..6179788 --- /dev/null +++ b/socketIO_client/tests/proxy.js @@ -0,0 +1,36 @@ +var proxy = require('http-proxy').createProxyServer({ + target: {host: 'localhost', port: 9000} +}).on('error', function(err, req, res) { + console.log('[ERROR] %s', err); + res.end(); +}); +var server = require('http').createServer(function(req, res) { + console.log('[REQUEST.%s] %s', req.method, req.url); + console.log(req['headers']); + if (req.method == 'POST') { + var body = ''; + req.on('data', function (data) { + body += data; + }); + req.on('end', function () { + print_body('[REQUEST.BODY] ', body); + }); + } + var write = res.write; + res.write = function(data) { + print_body('[RESPONSE.BODY] ', data); + write.call(res, data); + } + proxy.web(req, res); +}); +function print_body(header, body) { + var text = String(body); + console.log(header + text); + if (text.charCodeAt(0) != 0) return; + for (var i = 0; i < text.length; i++) { + var character_code = text.charCodeAt(i); + console.log('body[%s] = %s = %s', i, text[i], character_code); + if (character_code == 65533) break; + } +} +server.listen(8000); diff --git a/serve-tests.js b/socketIO_client/tests/serve.js similarity index 54% rename from serve-tests.js rename to socketIO_client/tests/serve.js index 6c2edfc..40af3e1 100644 --- a/serve-tests.js +++ b/socketIO_client/tests/serve.js @@ -1,17 +1,25 @@ -var io = require('socket.io').listen(8000); +// DEBUG=* node serve.js -var main = io.of('').on('connection', function(socket) { +var app = require('http').createServer(serve).listen(9000); +var io = require('socket.io')(app); +var fs = require('fs'); +var PAYLOAD = {'xxx': 'yyy'}; + +io.on('connection', function(socket) { socket.on('message', function(data, fn) { - if (fn) { // Client expects a callback + if (fn) { + // Client requests callback if (data) { fn(data); } else { fn(); } } else if (typeof data === 'object') { - socket.json.send(data ? data : 'message_response'); // object or null + // Data has type object or is null + socket.json.send(data ? data : 'message_response'); } else { - socket.send(data ? data : 'message_response'); // string or '' + // Data has type string or is '' + socket.send(data ? data : 'message_response'); } }); socket.on('emit', function() { @@ -20,8 +28,8 @@ var main = io.of('').on('connection', function(socket) { socket.on('emit_with_payload', function(payload) { socket.emit('emit_with_payload_response', payload); }); - socket.on('emit_with_multiple_payloads', function(payload, payload) { - socket.emit('emit_with_multiple_payloads_response', payload, payload); + socket.on('emit_with_multiple_payloads', function(payload1, payload2) { + socket.emit('emit_with_multiple_payloads_response', payload1, payload2); }); socket.on('emit_with_callback', function(fn) { fn(); @@ -35,39 +43,37 @@ var main = io.of('').on('connection', function(socket) { socket.on('emit_with_event', function(payload) { socket.emit('emit_with_event_response', payload); }); - socket.on('ack', function(payload) { - socket.emit('ack_response', payload, function(payload) { - socket.emit('ack_callback_response', payload); + socket.on('trigger_server_expects_callback', function(payload) { + socket.emit('server_expects_callback', payload, function(payload) { + socket.emit('server_received_callback', payload); }); }); socket.on('aaa', function() { socket.emit('aaa_response', PAYLOAD); }); socket.on('bbb', function(payload, fn) { - if (fn) { - fn(payload); - } + if (fn) fn(payload); }); socket.on('wait_with_disconnect', function() { socket.emit('wait_with_disconnect_response'); }); }); -var chat = io.of('/chat').on('connection', function (socket) { +io.of('/chat').on('connection', function(socket) { socket.on('emit_with_payload', function(payload) { socket.emit('emit_with_payload_response', payload); }); socket.on('aaa', function() { socket.emit('aaa_response', 'in chat'); }); - socket.on('ack', function(payload) { - socket.emit('ack_response', payload, function(payload) { - socket.emit('ack_callback_response', payload); + socket.on('trigger_server_expects_callback', function(payload) { + socket.emit('server_expects_callback', payload, function(payload) { + socket.emit('server_received_callback', payload); }); }); }); -var news = io.of('/news').on('connection', function (socket) { +io.of('/news').on('connection', function(socket) { socket.on('emit_with_payload', function(payload) { socket.emit('emit_with_payload_response', payload); }); @@ -76,4 +82,9 @@ var news = io.of('/news').on('connection', function (socket) { }); }); -var PAYLOAD = {'xxx': 'yyy'}; +function serve(req, res) { + fs.readFile(__dirname + '/index.html', function(err, data) { + res.writeHead(200); + res.end(data); + }); +} diff --git a/socketIO_client/transports.py b/socketIO_client/transports.py index b6cc4e3..dc187cc 100644 --- a/socketIO_client/transports.py +++ b/socketIO_client/transports.py @@ -1,320 +1,82 @@ -import codecs -import json -import logging -import re import requests -import six -import socket -import sys import time -import websocket._exceptions from .exceptions import ConnectionError, TimeoutError +from .parsers import decode_engineIO_content, encode_engineIO_content -if not hasattr(websocket, 'create_connection'): - sys.exit("""Incompatible websocket implementation -- Please make sure that you have websocket-client installed -- Please remove other websocket implementations""") +ENGINEIO_PROTOCOL = 3 +TRANSPORTS = 'websocket', 'xhr-polling' -TRANSPORTS = 'websocket', 'xhr-polling', 'jsonp-polling' -BOUNDARY = six.u('\ufffd') -TIMEOUT_IN_SECONDS = 3 -_log = logging.getLogger(__name__) -escape_unicode = lambda x: codecs.getdecoder('unicode_escape')(x)[0] -try: - unicode -except NameError: - encode_unicode = lambda x: x -else: - encode_unicode = lambda x: unicode(x).encode('utf-8') +class AbstractTransport(object): + + def __init__(self, http_session, is_secure, url, engineIO_session=None): + self.http_session = http_session + self.is_secure = is_secure + self.url = url + self.engineIO_session = engineIO_session + + def recv_packet(self): + pass + + def send_packet(self, engineIO_packet_type, engineIO_packet_data): + pass -class _AbstractTransport(object): +class XHR_PollingTransport(AbstractTransport): - def __init__(self): - self._packet_id = 0 - self._callback_by_packet_id = {} - self._wants_to_disconnect = False - self._packets = [] - - def _log(self, level, msg, *attrs): - _log.log(level, '[%s] %s' % (self._url, msg), *attrs) - - def disconnect(self, path=''): - if not path: - self._wants_to_disconnect = True - if not self.connected: - return - if path: - self.send_packet(0, path) + def __init__(self, http_session, is_secure, url, engineIO_session=None): + super(XHR_PollingTransport, self).__init__( + http_session, is_secure, url, engineIO_session) + self.http_url = '%s://%s/' % ('https' if is_secure else 'http', url) + self.params = { + 'EIO': ENGINEIO_PROTOCOL, + 'transport': 'polling', + } + if engineIO_session: + self.request_index = 1 + self.kw_get = dict(timeout=engineIO_session.ping_timeout) + self.kw_post = dict(headers={ + 'content-type': 'application/octet-stream', + }) + self.params['sid'] = engineIO_session.id else: - self.close() + self.request_index = 0 + self.kw_get = {} + self.kw_post = {} - def connect(self, path): - self.send_packet(1, path) + def recv_packet(self): + params = dict(self.params) + params['t'] = self._get_timestamp() + response = get_response( + self.http_session.get, + self.http_url, + params=params, + **self.kw_get) + for engineIO_packet in decode_engineIO_content(response.content): + yield engineIO_packet - def send_heartbeat(self): - self.send_packet(2) + def send_packet(self, engineIO_packet_type, engineIO_packet_data): + params = dict(self.params) + params['t'] = self._get_timestamp() + response = get_response( + self.http_session.post, + self.http_url, + params=params, + data=encode_engineIO_content([ + (engineIO_packet_type, engineIO_packet_data), + ]), + **self.kw_post) + assert response.content == b'ok' - def message(self, path, data, callback): - if isinstance(data, six.string_types): - code = 3 - else: - code = 4 - data = json.dumps(data, ensure_ascii=False) - self.send_packet(code, path, data, callback) - - def emit(self, path, event, args, callback): - data = json.dumps(dict(name=event, args=args), ensure_ascii=False) - self.send_packet(5, path, data, callback) - - def ack(self, path, packet_id, *args): - packet_id = packet_id.rstrip('+') - data = '%s+%s' % ( - packet_id, - json.dumps(args, ensure_ascii=False), - ) if args else packet_id - self.send_packet(6, path, data) - - def noop(self, path=''): - self.send_packet(8, path) - - def send_packet(self, code, path='', data='', callback=None): - packet_id = self.set_ack_callback(callback) if callback else '' - packet_parts = str(code), packet_id, path, encode_unicode(data) - packet_text = ':'.join(packet_parts) - self.send(packet_text) - self._log(logging.DEBUG, '[packet sent] %s', packet_text) - - def recv_packet(self, timeout=None): - try: - while self._packets: - yield self._packets.pop(0) - except IndexError: - pass - for packet_text in self.recv(timeout=timeout): - self._log(logging.DEBUG, '[packet received] %s', packet_text) - try: - packet_parts = packet_text.split(':', 3) - except AttributeError: - self._log(logging.WARNING, '[packet error] %s', packet_text) - continue - code, packet_id, path, data = None, None, None, None - packet_count = len(packet_parts) - if 4 == packet_count: - code, packet_id, path, data = packet_parts - elif 3 == packet_count: - code, packet_id, path = packet_parts - elif 1 == packet_count: - code = packet_parts[0] - yield code, packet_id, path, data - - def _enqueue_packet(self, packet): - self._packets.append(packet) - - def set_ack_callback(self, callback): - 'Set callback to be called after server sends an acknowledgment' - self._packet_id += 1 - self._callback_by_packet_id[str(self._packet_id)] = callback - return '%s+' % self._packet_id - - def get_ack_callback(self, packet_id): - 'Get callback to be called after server sends an acknowledgment' - callback = self._callback_by_packet_id[packet_id] - del self._callback_by_packet_id[packet_id] - return callback - - @property - def has_ack_callback(self): - return True if self._callback_by_packet_id else False + def _get_timestamp(self): + timestamp = '%s-%s' % (int(time.time() * 1000), self.request_index) + self.request_index += 1 + return timestamp -class _WebsocketTransport(_AbstractTransport): - - def __init__(self, socketIO_session, is_secure, base_url, **kw): - super(_WebsocketTransport, self).__init__() - url = '%s://%s/websocket/%s' % ( - 'wss' if is_secure else 'ws', - base_url, socketIO_session.id) - self._url = url - http_session = _prepare_http_session(kw) - req = http_session.prepare_request(requests.Request('GET', url)) - headers = ['%s: %s' % item for item in req.headers.items()] - try: - self._connection = websocket.create_connection(url, header=headers) - except socket.timeout as e: - raise ConnectionError(e) - except socket.error as e: - raise ConnectionError(e) - except websocket._exceptions.WebSocketException as e: - raise ConnectionError(e) - self._connection.settimeout(TIMEOUT_IN_SECONDS) - - @property - def connected(self): - return self._connection.connected - - def send(self, packet_text): - try: - self._connection.send(packet_text) - except websocket.WebSocketTimeoutException as e: - message = 'timed out while sending %s (%s)' % (packet_text, e) - self._log(logging.WARNING, message) - raise TimeoutError(e) - except socket.error as e: - message = 'disconnected while sending %s (%s)' % (packet_text, e) - self._log(logging.WARNING, message) - raise ConnectionError(message) - - def recv(self, timeout=None): - if timeout: - self._connection.settimeout(timeout) - try: - yield self._connection.recv() - except websocket.WebSocketTimeoutException as e: - raise TimeoutError(e) - except websocket.SSLError as e: - if 'timed out' in e.message: - raise TimeoutError(e) - else: - raise ConnectionError(e) - except websocket.WebSocketConnectionClosedException as e: - raise ConnectionError('connection closed (%s)' % e) - except socket.error as e: - raise ConnectionError(e) - - def close(self): - self._connection.close() - - -class _XHR_PollingTransport(_AbstractTransport): - - def __init__(self, socketIO_session, is_secure, base_url, **kw): - super(_XHR_PollingTransport, self).__init__() - self._url = '%s://%s/xhr-polling/%s' % ( - 'https' if is_secure else 'http', - base_url, socketIO_session.id) - self._connected = True - self._http_session = _prepare_http_session(kw) - # Create connection - for packet in self.recv_packet(): - self._enqueue_packet(packet) - - @property - def connected(self): - return self._connected - - @property - def _params(self): - return dict(t=int(time.time())) - - def send(self, packet_text): - _get_response( - self._http_session.post, - self._url, - params=self._params, - data=packet_text, - timeout=TIMEOUT_IN_SECONDS) - - def recv(self, timeout=None): - response = _get_response( - self._http_session.get, - self._url, - params=self._params, - timeout=timeout or TIMEOUT_IN_SECONDS, - stream=True) - response_text = response.text - if not response_text.startswith(BOUNDARY): - yield response_text - return - for packet_text in _yield_text_from_framed_data(response_text): - yield packet_text - - def close(self): - _get_response( - self._http_session.get, - self._url, - params=dict(list(self._params.items()) + [('disconnect', True)])) - self._connected = False - - -class _JSONP_PollingTransport(_AbstractTransport): - - RESPONSE_PATTERN = re.compile(r'io.j\[(\d+)\]\("(.*)"\);') - - def __init__(self, socketIO_session, is_secure, base_url, **kw): - super(_JSONP_PollingTransport, self).__init__() - self._url = '%s://%s/jsonp-polling/%s' % ( - 'https' if is_secure else 'http', - base_url, socketIO_session.id) - self._connected = True - self._http_session = _prepare_http_session(kw) - self._id = 0 - # Create connection - for packet in self.recv_packet(): - self._enqueue_packet(packet) - - @property - def connected(self): - return self._connected - - @property - def _params(self): - return dict(t=int(time.time()), i=self._id) - - def send(self, packet_text): - _get_response( - self._http_session.post, - self._url, - params=self._params, - data='d=%s' % requests.utils.quote(json.dumps(packet_text)), - headers={'content-type': 'application/x-www-form-urlencoded'}, - timeout=TIMEOUT_IN_SECONDS) - - def recv(self, timeout=None): - 'Decode the JavaScript response so that we can parse it as JSON' - response = _get_response( - self._http_session.get, - self._url, - params=self._params, - headers={'content-type': 'text/javascript; charset=UTF-8'}, - timeout=timeout or TIMEOUT_IN_SECONDS) - response_text = response.text - try: - self._id, response_text = self.RESPONSE_PATTERN.match( - response_text).groups() - except AttributeError: - self._log(logging.WARNING, '[packet error] %s', response_text) - return - if not response_text.startswith(BOUNDARY): - yield escape_unicode(response_text) - return - for packet_text in _yield_text_from_framed_data( - response_text, escape_unicode): - yield packet_text - - def close(self): - _get_response( - self._http_session.get, - self._url, - params=dict(list(self._params.items()) + [('disconnect', True)])) - self._connected = False - - -def _yield_text_from_framed_data(framed_data, parse=lambda x: x): - parts = [parse(x) for x in framed_data.split(BOUNDARY)] - for text_length, text in zip(parts[1::2], parts[2::2]): - if text_length != str(len(text)): - warning = 'invalid declared length=%s for packet_text=%s' % ( - text_length, text) - _log.warn('[packet error] %s', warning) - continue - yield text - - -def _get_response(request, *args, **kw): +def get_response(request, *args, **kw): try: response = request(*args, **kw) except requests.exceptions.Timeout as e: @@ -323,13 +85,13 @@ def _get_response(request, *args, **kw): raise ConnectionError(e) except requests.exceptions.SSLError as e: raise ConnectionError('could not negotiate SSL (%s)' % e) - status = response.status_code - if 200 != status: - raise ConnectionError('unexpected status code (%s)' % status) + status_code = response.status_code + if 200 != status_code: + raise ConnectionError('unexpected status code (%s)' % status_code) return response -def _prepare_http_session(kw): +def prepare_http_session(kw): http_session = requests.Session() http_session.headers.update(kw.get('headers', {})) http_session.auth = kw.get('auth')