From cd92f1c0cc5a605af16e04b5677bbd8e6ae1812b Mon Sep 17 00:00:00 2001 From: Roy Hyunjin Han Date: Tue, 17 Feb 2015 11:25:18 -0500 Subject: [PATCH] Start from scratch --- CHANGES.rst | 4 + MANIFEST.in | 2 +- README.rst | 7 +- TODO.goals | 4 +- setup.py | 27 +- socketIO_client/__init__.py | 523 +----------------- socketIO_client/exceptions.py | 14 - socketIO_client/tests.py | 225 -------- socketIO_client/tests/__init__.py | 41 ++ socketIO_client/tests/index.html | 4 + .../tests/serve.js | 37 +- socketIO_client/transports.py | 340 ------------ 12 files changed, 105 insertions(+), 1123 deletions(-) delete mode 100644 socketIO_client/exceptions.py delete mode 100644 socketIO_client/tests.py create mode 100644 socketIO_client/tests/__init__.py create mode 100644 socketIO_client/tests/index.html rename serve-tests.js => socketIO_client/tests/serve.js (68%) delete mode 100644 socketIO_client/transports.py diff --git a/CHANGES.rst b/CHANGES.rst index 9fcbea8..b7e7cb1 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,7 @@ +0.6.1 +----- +- Upgraded to socket.io protocol 1.x thanks to Sean Arietta and Joe Palmer + 0.5.5 ----- - Fixed reconnection in the event of server restart diff --git a/MANIFEST.in b/MANIFEST.in index 346209d..abf7482 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,3 +1,3 @@ recursive-include socketIO_client * -include *.rst +include *.html *.js *.rst global-exclude *.pyc diff --git a/README.rst b/README.rst index 7bc0f48..9dab304 100644 --- a/README.rst +++ b/README.rst @@ -1,4 +1,4 @@ -.. image:: https://travis-ci.org/invisibleroads/socketIO-client.svg?branch=v0.5.4 +.. image:: https://travis-ci.org/invisibleroads/socketIO-client.svg?branch=v0.6.1 :target: https://travis-ci.org/invisibleroads/socketIO-client @@ -30,9 +30,10 @@ Activate isolated environment. :: VIRTUAL_ENV=$HOME/.virtualenv source $VIRTUAL_ENV/bin/activate -Launch your socket.io server. :: +Launch a socket.io server. :: - node serve-tests.js + PACKAGE_FOLDER=`python -c "import os, socketIO_client; print(os.path.dirname(socketIO_client.__file__))"` + node $PACKAGE_FOLDER/tests/serve.js For debugging information, run these commands first. :: diff --git a/TODO.goals b/TODO.goals index a3b2527..84ab464 100644 --- a/TODO.goals +++ b/TODO.goals @@ -1,7 +1,5 @@ Release 0.6.1 #41 #52 - + Update serve-tests.js - + Integrate serve-tests.js changes from sarietta - = Put tests in index.html + Put tests in index.html Update tests Merge sarietta's pull request Revive heartbeat as separate process diff --git a/setup.py b/setup.py index b98da09..bc147cb 100644 --- a/setup.py +++ b/setup.py @@ -1,17 +1,24 @@ -import os -from setuptools import setup, find_packages +from os.path import abspath, dirname, join +from setuptools import find_packages, setup -here = os.path.abspath(os.path.dirname(__file__)) -README = open(os.path.join(here, 'README.rst')).read() -CHANGES = open(os.path.join(here, 'CHANGES.rst')).read() +REQUIREMENTS = [ + 'requests', + 'six', + 'websocket-client', +] +HERE = dirname(abspath(__file__)) +DESCRIPTION = '\n\n'.join(open(join(HERE, _)).read() for _ in [ + 'README.rst', + 'CHANGES.rst', +]) setup( name='socketIO-client', - version='0.5.5', + version='0.6.1', description='A socket.io client library', - long_description=README + '\n\n' + CHANGES, + long_description=DESCRIPTION, license='MIT', classifiers=[ 'Intended Audience :: Developers', @@ -22,11 +29,7 @@ setup( author='Roy Hyunjin Han', author_email='rhh@crosscompute.com', url='https://github.com/invisibleroads/socketIO-client', - install_requires=[ - 'requests', - 'six', - 'websocket-client', - ], + install_requires=REQUIREMENTS, tests_require=[ 'nose', 'coverage', diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index f06b5cb..d5fa6e5 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -1,522 +1,21 @@ -import logging -import json -import requests -import time -from collections import namedtuple -try: - from urllib.parse import urlparse as parse_url -except ImportError: - from urlparse import urlparse as parse_url - -from .exceptions import ( - SocketIOError, ConnectionError, TimeoutError, PacketError) -from .transports import ( - _get_response, TRANSPORTS, - _WebsocketTransport, _XHR_PollingTransport, _JSONP_PollingTransport) - - __version__ = '0.5.4' -_SocketIOSession = namedtuple('_SocketIOSession', [ - 'id', - 'heartbeat_timeout', - 'server_supported_transports', -]) -_log = logging.getLogger(__name__) -PROTOCOL_VERSION = 1 -RETRY_INTERVAL_IN_SECONDS = 1 + + +class EngineIO(object): + pass + + +class SocketIO(EngineIO): + pass class BaseNamespace(object): - 'Define client behavior' - - def __init__(self, _transport, path): - self._transport = _transport - self.path = path - self._was_connected = False - self._callback_by_event = {} - self.initialize() - - def initialize(self): - 'Initialize custom variables here; you can override this method' - pass - - def message(self, data='', callback=None): - self._transport.message(self.path, data, callback) - - def emit(self, event, *args, **kw): - callback, args = find_callback(args, kw) - self._transport.emit(self.path, event, args, callback) - - def disconnect(self): - self._transport.disconnect(self.path) - - def on(self, event, callback): - 'Define a callback to handle a custom event emitted by the server' - self._callback_by_event[event] = callback - - def on_connect(self): - 'Called after server connects; you can override this method' - pass - - def on_disconnect(self): - 'Called after server disconnects; you can override this method' - pass - - def on_heartbeat(self): - 'Called after server sends a heartbeat; you can override this method' - pass - - def on_message(self, data): - 'Called after server sends a message; you can override this method' - pass - - def on_event(self, event, *args): - """ - Called after server sends an event; you can override this method. - Called only if a custom event handler does not exist, - such as one defined by namespace.on('my_event', my_function). - """ - callback, args = find_callback(args) - if callback: - callback(*args) - - def on_error(self, reason, advice): - 'Called after server sends an error; you can override this method' - pass - - def on_noop(self): - 'Called after server sends a noop; you can override this method' - pass - - def on_open(self, *args): - pass - - def on_close(self, *args): - pass - - def on_retry(self, *args): - pass - - def on_reconnect(self, *args): - pass - - def _find_event_callback(self, event): - # Check callbacks defined by on() - try: - return self._callback_by_event[event] - except KeyError: - pass - # Convert connect to reconnect if we have seen connect already - if event == 'connect': - if not self._was_connected: - self._was_connected = True - else: - event = 'reconnect' - # Check callbacks defined explicitly or use on_event() - return getattr( - self, - 'on_' + event.replace(' ', '_'), - lambda *args: self.on_event(event, *args)) + pass class LoggingNamespace(BaseNamespace): - - def _log(self, level, msg, *attrs): - _log.log(level, '%s: %s' % (self._transport._url, msg), *attrs) - - def on_connect(self): - self._log(logging.DEBUG, '%s [connect]', self.path) - super(LoggingNamespace, self).on_connect() - - def on_disconnect(self): - self._log(logging.DEBUG, '%s [disconnect]', self.path) - super(LoggingNamespace, self).on_disconnect() - - def on_heartbeat(self): - self._log(logging.DEBUG, '%s [heartbeat]', self.path) - super(LoggingNamespace, self).on_heartbeat() - - def on_message(self, data): - self._log(logging.INFO, '%s [message] %s', self.path, data) - super(LoggingNamespace, self).on_message(data) - - def on_event(self, event, *args): - callback, args = find_callback(args) - arguments = [repr(_) for _ in args] - if callback: - arguments.append('callback(*args)') - self._log(logging.INFO, '%s [event] %s(%s)', self.path, event, - ', '.join(arguments)) - super(LoggingNamespace, self).on_event(event, *args) - - def on_error(self, reason, advice): - self._log(logging.INFO, '%s [error] %s', self.path, advice) - super(LoggingNamespace, self).on_error(reason, advice) - - def on_noop(self): - self._log(logging.INFO, '%s [noop]', self.path) - super(LoggingNamespace, self).on_noop() - - def on_open(self, *args): - self._log(logging.INFO, '%s [open] %s', self.path, args) - super(LoggingNamespace, self).on_open(*args) - - def on_close(self, *args): - self._log(logging.INFO, '%s [close] %s', self.path, args) - super(LoggingNamespace, self).on_close(*args) - - def on_retry(self, *args): - self._log(logging.INFO, '%s [retry] %s', self.path, args) - super(LoggingNamespace, self).on_retry(*args) - - def on_reconnect(self, *args): - self._log(logging.INFO, '%s [reconnect] %s', self.path, args) - super(LoggingNamespace, self).on_reconnect(*args) - - -class SocketIO(object): - - """Create a socket.io client that connects to a socket.io server - at the specified host and port. - - - Define the behavior of the client by specifying a custom Namespace. - - Prefix host with https:// to use SSL. - - Set wait_for_connection=True to block until we have a connection. - - Specify desired transports=['websocket', 'xhr-polling']. - - Pass query params, headers, cookies, proxies as keyword arguments. - - SocketIO('localhost', 8000, - params={'q': 'qqq'}, - headers={'Authorization': 'Basic ' + b64encode('username:password')}, - cookies={'a': 'aaa'}, - proxies={'https': 'https://proxy.example.com:8080'}) - """ - - def __init__( - self, host, port=None, Namespace=None, - wait_for_connection=True, transports=TRANSPORTS, - resource='socket.io', **kw): - self.is_secure, self._base_url = _parse_host(host, port, resource) - self.wait_for_connection = wait_for_connection - self._namespace_by_path = {} - self._client_supported_transports = transports - self._kw = kw - if Namespace: - self.define(Namespace) - - def _log(self, level, msg, *attrs): - _log.log(level, '%s: %s' % (self._base_url, msg), *attrs) - - def __enter__(self): - return self - - def __exit__(self, *exception_pack): - self.disconnect() - - def __del__(self): - self.disconnect() - - def define(self, Namespace, path=''): - if path: - self._transport.connect(path) - namespace = Namespace(self._transport, path) - self._namespace_by_path[path] = namespace - return namespace - - def on(self, event, callback, path=''): - if path not in self._namespace_by_path: - self.define(BaseNamespace, path) - return self.get_namespace(path).on(event, callback) - - def message(self, data='', callback=None, path=''): - self._transport.message(path, data, callback) - - def emit(self, event, *args, **kw): - path = kw.get('path', '') - callback, args = find_callback(args, kw) - self._transport.emit(path, event, args, callback) - - def wait(self, seconds=None, for_callbacks=False): - """Wait in a loop and process events as defined in the namespaces. - - - Omit seconds, i.e. call wait() without arguments, to wait forever. - """ - warning_screen = _yield_warning_screen(seconds) - timeout = min(self._heartbeat_interval, seconds) - for elapsed_time in warning_screen: - if self._stop_waiting(for_callbacks): - break - try: - try: - self._process_events(timeout) - except TimeoutError: - pass - next(self._heartbeat_pacemaker) - except ConnectionError as e: - try: - warning = Exception('[connection error] %s' % e) - warning_screen.throw(warning) - except StopIteration: - self._log(logging.WARNING, warning) - try: - namespace = self._namespace_by_path[''] - namespace.on_disconnect() - except KeyError: - pass - - def _process_events(self, timeout=None): - for packet in self._transport.recv_packet(timeout): - try: - self._process_packet(packet) - except PacketError as e: - self._log(logging.WARNING, '[packet error] %s', e) - - def _process_packet(self, packet): - code, packet_id, path, data = packet - namespace = self.get_namespace(path) - delegate = self._get_delegate(code) - delegate(packet, namespace._find_event_callback) - - def _stop_waiting(self, for_callbacks): - # Use __transport to make sure that we do not reconnect inadvertently - if for_callbacks and not self.__transport.has_ack_callback: - return True - if self.__transport._wants_to_disconnect: - return True - return False - - def wait_for_callbacks(self, seconds=None): - self.wait(seconds, for_callbacks=True) - - def disconnect(self, path=''): - try: - self._transport.disconnect(path) - except ReferenceError: - pass - try: - namespace = self._namespace_by_path[path] - namespace.on_disconnect() - del self._namespace_by_path[path] - except KeyError: - pass - - @property - def connected(self): - try: - transport = self.__transport - except AttributeError: - return False - else: - return transport.connected - - @property - def _transport(self): - try: - if self.connected: - return self.__transport - except AttributeError: - pass - socketIO_session = self._get_socketIO_session() - supported_transports = self._get_supported_transports(socketIO_session) - self._heartbeat_pacemaker = self._make_heartbeat_pacemaker( - heartbeat_timeout=socketIO_session.heartbeat_timeout) - next(self._heartbeat_pacemaker) - warning_screen = _yield_warning_screen(seconds=None) - for elapsed_time in warning_screen: - try: - self._transport_name = supported_transports.pop(0) - except IndexError: - raise ConnectionError('Could not negotiate a transport') - try: - self.__transport = self._get_transport( - socketIO_session, self._transport_name) - break - except ConnectionError: - pass - for path, namespace in self._namespace_by_path.items(): - namespace._transport = self.__transport - if path: - self.__transport.connect(path) - return self.__transport - - def _get_socketIO_session(self): - warning_screen = _yield_warning_screen(seconds=None) - for elapsed_time in warning_screen: - try: - return _get_socketIO_session( - self.is_secure, self._base_url, **self._kw) - except ConnectionError as e: - if not self.wait_for_connection: - raise - warning = Exception('[waiting for connection] %s' % e) - try: - warning_screen.throw(warning) - except StopIteration: - self._log(logging.WARNING, warning) - - def _get_supported_transports(self, session): - self._log( - logging.DEBUG, '[transports available] %s', - ' '.join(session.server_supported_transports)) - supported_transports = [ - x for x in self._client_supported_transports if - x in session.server_supported_transports] - if not supported_transports: - raise SocketIOError(' '.join([ - 'could not negotiate a transport:', - 'client supports %s but' % ', '.join( - self._client_supported_transports), - 'server supports %s' % ', '.join( - session.server_supported_transports), - ])) - return supported_transports - - def _get_transport(self, session, transport_name): - self._log(logging.DEBUG, '[transport chosen] %s', transport_name) - return { - 'websocket': _WebsocketTransport, - 'xhr-polling': _XHR_PollingTransport, - 'jsonp-polling': _JSONP_PollingTransport, - }[transport_name](session, self.is_secure, self._base_url, **self._kw) - - def _make_heartbeat_pacemaker(self, heartbeat_timeout): - self._heartbeat_interval = heartbeat_timeout / 2 - heartbeat_time = time.time() - while True: - yield - if time.time() - heartbeat_time > self._heartbeat_interval: - heartbeat_time = time.time() - self._transport.send_heartbeat() - - def get_namespace(self, path=''): - try: - return self._namespace_by_path[path] - except KeyError: - raise PacketError('unhandled namespace path (%s)' % path) - - def _get_delegate(self, code): - try: - return { - '0': self._on_disconnect, - '1': self._on_connect, - '2': self._on_heartbeat, - '3': self._on_message, - '4': self._on_json, - '5': self._on_event, - '6': self._on_ack, - '7': self._on_error, - '8': self._on_noop, - }[code] - except KeyError: - raise PacketError('unexpected code (%s)' % code) - - def _on_disconnect(self, packet, find_event_callback): - find_event_callback('disconnect')() - - def _on_connect(self, packet, find_event_callback): - find_event_callback('connect')() - - def _on_heartbeat(self, packet, find_event_callback): - find_event_callback('heartbeat')() - - def _on_message(self, packet, find_event_callback): - code, packet_id, path, data = packet - args = [data] - if packet_id: - args.append(self._prepare_to_send_ack(path, packet_id)) - find_event_callback('message')(*args) - - def _on_json(self, packet, find_event_callback): - code, packet_id, path, data = packet - args = [json.loads(data)] - if packet_id: - args.append(self._prepare_to_send_ack(path, packet_id)) - find_event_callback('message')(*args) - - def _on_event(self, packet, find_event_callback): - code, packet_id, path, data = packet - value_by_name = json.loads(data) - event = value_by_name['name'] - args = value_by_name.get('args', []) - if packet_id: - args.append(self._prepare_to_send_ack(path, packet_id)) - find_event_callback(event)(*args) - - def _on_ack(self, packet, find_event_callback): - code, packet_id, path, data = packet - data_parts = data.split('+', 1) - packet_id = data_parts[0] - try: - ack_callback = self._transport.get_ack_callback(packet_id) - except KeyError: - return - args = json.loads(data_parts[1]) if len(data_parts) > 1 else [] - ack_callback(*args) - - def _on_error(self, packet, find_event_callback): - code, packet_id, path, data = packet - reason, advice = data.split('+', 1) - find_event_callback('error')(reason, advice) - - def _on_noop(self, packet, find_event_callback): - find_event_callback('noop')() - - def _prepare_to_send_ack(self, path, packet_id): - 'Return function that acknowledges the server' - return lambda *args: self._transport.ack(path, packet_id, *args) + pass def find_callback(args, kw=None): - 'Return callback whether passed as a last argument or as a keyword' - if args and callable(args[-1]): - return args[-1], args[:-1] - try: - return kw['callback'], args - except (KeyError, TypeError): - return None, args - - -def _parse_host(host, port, resource): - if not host.startswith('http'): - host = 'http://' + host - url_pack = parse_url(host) - is_secure = url_pack.scheme == 'https' - port = port or url_pack.port or (443 if is_secure else 80) - base_url = '%s:%d%s/%s/%s' % ( - url_pack.hostname, port, url_pack.path, resource, PROTOCOL_VERSION) - return is_secure, base_url - - -def _yield_warning_screen(seconds=None): - last_warning = None - for elapsed_time in _yield_elapsed_time(seconds): - try: - yield elapsed_time - except Exception as warning: - warning = str(warning) - if last_warning != warning: - last_warning = warning - _log.warn(warning) - time.sleep(RETRY_INTERVAL_IN_SECONDS) - - -def _yield_elapsed_time(seconds=None): - start_time = time.time() - if seconds is None: - while True: - yield time.time() - start_time - while time.time() - start_time < seconds: - yield time.time() - start_time - - -def _get_socketIO_session(is_secure, base_url, **kw): - server_url = '%s://%s/' % ('https' if is_secure else 'http', base_url) - try: - response = _get_response(requests.get, server_url, **kw) - except TimeoutError as e: - raise ConnectionError(e) - response_parts = response.text.split(':') - return _SocketIOSession( - id=response_parts[0], - heartbeat_timeout=int(response_parts[1]), - server_supported_transports=response_parts[3].split(',')) + pass diff --git a/socketIO_client/exceptions.py b/socketIO_client/exceptions.py deleted file mode 100644 index ed2b4d2..0000000 --- a/socketIO_client/exceptions.py +++ /dev/null @@ -1,14 +0,0 @@ -class SocketIOError(Exception): - pass - - -class ConnectionError(SocketIOError): - pass - - -class TimeoutError(SocketIOError): - pass - - -class PacketError(SocketIOError): - pass diff --git a/socketIO_client/tests.py b/socketIO_client/tests.py deleted file mode 100644 index 400d626..0000000 --- a/socketIO_client/tests.py +++ /dev/null @@ -1,225 +0,0 @@ -import logging -import time -from unittest import TestCase - -from . import SocketIO, LoggingNamespace, find_callback -from .transports import TIMEOUT_IN_SECONDS - - -HOST = 'localhost' -PORT = 8000 -DATA = 'xxx' -PAYLOAD = {'xxx': 'yyy'} -logging.basicConfig(level=logging.DEBUG) - - -class BaseMixin(object): - - def setUp(self): - self.called_on_response = False - - def tearDown(self): - del self.socketIO - - def on_response(self, *args): - for arg in args: - if isinstance(arg, dict): - self.assertEqual(arg, PAYLOAD) - else: - self.assertEqual(arg, DATA) - self.called_on_response = True - - def test_disconnect(self): - 'Disconnect' - self.socketIO.define(LoggingNamespace) - self.assertTrue(self.socketIO.connected) - self.socketIO.disconnect() - self.assertFalse(self.socketIO.connected) - # Use context manager - with SocketIO(HOST, PORT, Namespace) as self.socketIO: - namespace = self.socketIO.get_namespace() - self.assertFalse(namespace.called_on_disconnect) - self.assertTrue(self.socketIO.connected) - self.assertTrue(namespace.called_on_disconnect) - self.assertFalse(self.socketIO.connected) - - def test_message(self): - 'Message' - namespace = self.socketIO.define(Namespace) - self.socketIO.message() - self.socketIO.wait(self.wait_time_in_seconds) - self.assertEqual(namespace.response, 'message_response') - - def test_message_with_data(self): - 'Message with data' - namespace = self.socketIO.define(Namespace) - self.socketIO.message(DATA) - self.socketIO.wait(self.wait_time_in_seconds) - self.assertEqual(namespace.response, DATA) - - def test_message_with_payload(self): - 'Message with payload' - namespace = self.socketIO.define(Namespace) - self.socketIO.message(PAYLOAD) - self.socketIO.wait(self.wait_time_in_seconds) - self.assertEqual(namespace.response, PAYLOAD) - - def test_message_with_callback(self): - 'Message with callback' - self.socketIO.define(LoggingNamespace) - self.socketIO.message(callback=self.on_response) - self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) - self.assertTrue(self.called_on_response) - - def test_message_with_callback_with_data(self): - 'Message with callback with data' - self.socketIO.define(LoggingNamespace) - self.socketIO.message(DATA, self.on_response) - self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) - self.assertTrue(self.called_on_response) - - def test_emit(self): - 'Emit' - namespace = self.socketIO.define(Namespace) - self.socketIO.emit('emit') - self.socketIO.wait(self.wait_time_in_seconds) - self.assertEqual(namespace.args_by_event, { - 'emit_response': (), - }) - - def test_emit_with_payload(self): - 'Emit with payload' - namespace = self.socketIO.define(Namespace) - self.socketIO.emit('emit_with_payload', PAYLOAD) - self.socketIO.wait(self.wait_time_in_seconds) - self.assertEqual(namespace.args_by_event, { - 'emit_with_payload_response': (PAYLOAD,), - }) - - def test_emit_with_multiple_payloads(self): - 'Emit with multiple payloads' - namespace = self.socketIO.define(Namespace) - self.socketIO.emit('emit_with_multiple_payloads', PAYLOAD, PAYLOAD) - self.socketIO.wait(self.wait_time_in_seconds) - self.assertEqual(namespace.args_by_event, { - 'emit_with_multiple_payloads_response': (PAYLOAD, PAYLOAD), - }) - - def test_emit_with_callback(self): - 'Emit with callback' - self.socketIO.define(LoggingNamespace) - self.socketIO.emit('emit_with_callback', self.on_response) - self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) - self.assertTrue(self.called_on_response) - - def test_emit_with_callback_with_payload(self): - 'Emit with callback with payload' - self.socketIO.define(LoggingNamespace) - self.socketIO.emit( - 'emit_with_callback_with_payload', self.on_response) - self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) - self.assertTrue(self.called_on_response) - - def test_emit_with_callback_with_multiple_payloads(self): - 'Emit with callback with multiple payloads' - self.socketIO.define(LoggingNamespace) - self.socketIO.emit( - 'emit_with_callback_with_multiple_payloads', self.on_response) - self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) - self.assertTrue(self.called_on_response) - - def test_emit_with_event(self): - 'Emit to trigger an event' - self.socketIO.on('emit_with_event_response', self.on_response) - self.socketIO.emit('emit_with_event', PAYLOAD) - self.socketIO.wait(self.wait_time_in_seconds) - self.assertTrue(self.called_on_response) - - def test_ack(self): - 'Trigger server callback' - namespace = self.socketIO.define(Namespace) - self.socketIO.emit('ack', PAYLOAD) - self.socketIO.wait(self.wait_time_in_seconds) - self.assertEqual(namespace.args_by_event, { - 'ack_response': (PAYLOAD,), - 'ack_callback_response': (PAYLOAD,), - }) - - def test_wait_with_disconnect(self): - 'Exit loop when the client wants to disconnect' - self.socketIO.define(Namespace) - self.socketIO.emit('wait_with_disconnect') - timeout_in_seconds = 5 - start_time = time.time() - self.socketIO.wait(timeout_in_seconds) - self.assertTrue(time.time() - start_time < timeout_in_seconds) - - def test_namespace_emit(self): - 'Behave differently in different namespaces' - main_namespace = self.socketIO.define(Namespace) - chat_namespace = self.socketIO.define(Namespace, '/chat') - news_namespace = self.socketIO.define(Namespace, '/news') - news_namespace.emit('emit_with_payload', PAYLOAD) - self.socketIO.wait(self.wait_time_in_seconds) - self.assertEqual(main_namespace.args_by_event, {}) - self.assertEqual(chat_namespace.args_by_event, {}) - self.assertEqual(news_namespace.args_by_event, { - 'emit_with_payload_response': (PAYLOAD,), - }) - - def test_namespace_ack(self): - 'Trigger server callback' - chat_namespace = self.socketIO.define(Namespace, '/chat') - chat_namespace.emit('ack', PAYLOAD) - self.socketIO.wait(self.wait_time_in_seconds) - self.assertEqual(chat_namespace.args_by_event, { - 'ack_response': (PAYLOAD,), - 'ack_callback_response': (PAYLOAD,), - }) - - -class Test_WebsocketTransport(TestCase, BaseMixin): - - def setUp(self): - super(Test_WebsocketTransport, self).setUp() - self.socketIO = SocketIO(HOST, PORT, transports=['websocket']) - self.wait_time_in_seconds = 0.1 - - -class Test_XHR_PollingTransport(TestCase, BaseMixin): - - def setUp(self): - super(Test_XHR_PollingTransport, self).setUp() - self.socketIO = SocketIO(HOST, PORT, transports=['xhr-polling']) - self.wait_time_in_seconds = TIMEOUT_IN_SECONDS + 1 - - -class Test_JSONP_PollingTransport(TestCase, BaseMixin): - - def setUp(self): - super(Test_JSONP_PollingTransport, self).setUp() - self.socketIO = SocketIO(HOST, PORT, transports=['jsonp-polling']) - self.wait_time_in_seconds = TIMEOUT_IN_SECONDS + 1 - - -class Namespace(LoggingNamespace): - - def initialize(self): - self.response = None - self.args_by_event = {} - self.called_on_disconnect = False - - def on_disconnect(self): - self.called_on_disconnect = True - - def on_message(self, data): - self.response = data - - def on_event(self, event, *args): - callback, args = find_callback(args) - if callback: - callback(*args) - self.args_by_event[event] = args - - def on_wait_with_disconnect_response(self): - self.disconnect() diff --git a/socketIO_client/tests/__init__.py b/socketIO_client/tests/__init__.py new file mode 100644 index 0000000..d7f6c61 --- /dev/null +++ b/socketIO_client/tests/__init__.py @@ -0,0 +1,41 @@ +import logging +from unittest import TestCase + +from .. import SocketIO, LoggingNamespace, find_callback + + +HOST = 'localhost' +PORT = 8000 +logging.basicConfig(level=logging.DEBUG) + + +class BaseMixin(object): + + def test_emit(self): + 'Emit' + namespace = self.socketIO.define(Namespace) + self.socketIO.emit('emit') + self.socketIO.wait(self.wait_time_in_seconds) + self.assertEqual(namespace.args_by_event, { + 'emit_response': (), + }) + + +class Test_XHR_PollingTransport(TestCase, BaseMixin): + + def setUp(self): + super(Test_XHR_PollingTransport, self).setUp() + self.socketIO = SocketIO(HOST, PORT, transports=['xhr-polling']) + self.wait_time_in_seconds = 1 + + +class Namespace(LoggingNamespace): + + def initialize(self): + self.args_by_event = {} + + def on_event(self, event, *args): + callback, args = find_callback(args) + if callback: + callback(*args) + self.args_by_event[event] = args diff --git a/socketIO_client/tests/index.html b/socketIO_client/tests/index.html new file mode 100644 index 0000000..86299b8 --- /dev/null +++ b/socketIO_client/tests/index.html @@ -0,0 +1,4 @@ + + diff --git a/serve-tests.js b/socketIO_client/tests/serve.js similarity index 68% rename from serve-tests.js rename to socketIO_client/tests/serve.js index 6c2edfc..7dd9232 100644 --- a/serve-tests.js +++ b/socketIO_client/tests/serve.js @@ -1,17 +1,25 @@ -var io = require('socket.io').listen(8000); +// DEBUG=* node serve.js -var main = io.of('').on('connection', function(socket) { +var app = require('http').createServer(serve).listen(8000); +var io = require('socket.io')(app); +var fs = require('fs'); +var PAYLOAD = {'xxx': 'yyy'}; + +io.on('connection', function(socket) { socket.on('message', function(data, fn) { - if (fn) { // Client expects a callback + if (fn) { + // Client requests callback if (data) { fn(data); } else { fn(); } } else if (typeof data === 'object') { - socket.json.send(data ? data : 'message_response'); // object or null + // Data has type object or is null + socket.json.send(data ? data : 'message_response'); } else { - socket.send(data ? data : 'message_response'); // string or '' + // Data has type string or is '' + socket.send(data ? data : 'message_response'); } }); socket.on('emit', function() { @@ -20,8 +28,8 @@ var main = io.of('').on('connection', function(socket) { socket.on('emit_with_payload', function(payload) { socket.emit('emit_with_payload_response', payload); }); - socket.on('emit_with_multiple_payloads', function(payload, payload) { - socket.emit('emit_with_multiple_payloads_response', payload, payload); + socket.on('emit_with_multiple_payloads', function(payload1, payload2) { + socket.emit('emit_with_multiple_payloads_response', payload1, payload2); }); socket.on('emit_with_callback', function(fn) { fn(); @@ -44,16 +52,14 @@ var main = io.of('').on('connection', function(socket) { socket.emit('aaa_response', PAYLOAD); }); socket.on('bbb', function(payload, fn) { - if (fn) { - fn(payload); - } + if (fn) fn(payload); }); socket.on('wait_with_disconnect', function() { socket.emit('wait_with_disconnect_response'); }); }); -var chat = io.of('/chat').on('connection', function (socket) { +io.of('/chat').on('connection', function(socket) { socket.on('emit_with_payload', function(payload) { socket.emit('emit_with_payload_response', payload); }); @@ -67,7 +73,7 @@ var chat = io.of('/chat').on('connection', function (socket) { }); }); -var news = io.of('/news').on('connection', function (socket) { +io.of('/news').on('connection', function(socket) { socket.on('emit_with_payload', function(payload) { socket.emit('emit_with_payload_response', payload); }); @@ -76,4 +82,9 @@ var news = io.of('/news').on('connection', function (socket) { }); }); -var PAYLOAD = {'xxx': 'yyy'}; +function serve(req, res) { + fs.readFile(__dirname + '/index.html', function(err, data) { + res.writeHead(200); + res.end(data); + }); +} diff --git a/socketIO_client/transports.py b/socketIO_client/transports.py deleted file mode 100644 index 2887efa..0000000 --- a/socketIO_client/transports.py +++ /dev/null @@ -1,340 +0,0 @@ -import codecs -import json -import logging -import re -import requests -import six -import socket -import sys -import time -import websocket - -from .exceptions import ConnectionError, TimeoutError - - -if not hasattr(websocket, 'create_connection'): - sys.exit("""Incompatible websocket implementation -- Please make sure that you have websocket-client installed -- Please remove other websocket implementations""") - - -TRANSPORTS = 'websocket', 'xhr-polling', 'jsonp-polling' -BOUNDARY = six.u('\ufffd') -TIMEOUT_IN_SECONDS = 3 -_log = logging.getLogger(__name__) -escape_unicode = lambda x: codecs.getdecoder('unicode_escape')(x)[0] -try: - unicode -except NameError: - encode_unicode = lambda x: x -else: - encode_unicode = lambda x: unicode(x).encode('utf-8') - - -class _AbstractTransport(object): - - def __init__(self): - self._packet_id = 0 - self._callback_by_packet_id = {} - self._wants_to_disconnect = False - self._packets = [] - - def _log(self, level, msg, *attrs): - _log.log(level, '[%s] %s' % (self._url, msg), *attrs) - - def disconnect(self, path=''): - if not path: - self._wants_to_disconnect = True - if not self.connected: - return - if path: - self.send_packet(0, path) - else: - self.close() - - def connect(self, path): - self.send_packet(1, path) - - def send_heartbeat(self): - self.send_packet(2) - - def message(self, path, data, callback): - if isinstance(data, six.string_types): - code = 3 - else: - code = 4 - data = json.dumps(data, ensure_ascii=False) - self.send_packet(code, path, data, callback) - - def emit(self, path, event, args, callback): - data = json.dumps(dict(name=event, args=args), ensure_ascii=False) - self.send_packet(5, path, data, callback) - - def ack(self, path, packet_id, *args): - packet_id = packet_id.rstrip('+') - data = '%s+%s' % ( - packet_id, - json.dumps(args, ensure_ascii=False), - ) if args else packet_id - self.send_packet(6, path, data) - - def noop(self, path=''): - self.send_packet(8, path) - - def send_packet(self, code, path='', data='', callback=None): - packet_id = self.set_ack_callback(callback) if callback else '' - packet_parts = str(code), packet_id, path, encode_unicode(data) - packet_text = ':'.join(packet_parts) - self.send(packet_text) - self._log(logging.DEBUG, '[packet sent] %s', packet_text) - - def recv_packet(self, timeout=None): - try: - while self._packets: - yield self._packets.pop(0) - except IndexError: - pass - for packet_text in self.recv(timeout=timeout): - self._log(logging.DEBUG, '[packet received] %s', packet_text) - try: - packet_parts = packet_text.split(':', 3) - except AttributeError: - self._log(logging.WARNING, '[packet error] %s', packet_text) - continue - code, packet_id, path, data = None, None, None, None - packet_count = len(packet_parts) - if 4 == packet_count: - code, packet_id, path, data = packet_parts - elif 3 == packet_count: - code, packet_id, path = packet_parts - elif 1 == packet_count: - code = packet_parts[0] - yield code, packet_id, path, data - - def _enqueue_packet(self, packet): - self._packets.append(packet) - - def set_ack_callback(self, callback): - 'Set callback to be called after server sends an acknowledgment' - self._packet_id += 1 - self._callback_by_packet_id[str(self._packet_id)] = callback - return '%s+' % self._packet_id - - def get_ack_callback(self, packet_id): - 'Get callback to be called after server sends an acknowledgment' - callback = self._callback_by_packet_id[packet_id] - del self._callback_by_packet_id[packet_id] - return callback - - @property - def has_ack_callback(self): - return True if self._callback_by_packet_id else False - - -class _WebsocketTransport(_AbstractTransport): - - def __init__(self, socketIO_session, is_secure, base_url, **kw): - super(_WebsocketTransport, self).__init__() - url = '%s://%s/websocket/%s' % ( - 'wss' if is_secure else 'ws', - base_url, socketIO_session.id) - self._url = url - http_session = _prepare_http_session(kw) - req = http_session.prepare_request(requests.Request('GET', url)) - headers = ['%s: %s' % item for item in req.headers.items()] - try: - self._connection = websocket.create_connection(url, header=headers) - except socket.timeout as e: - raise ConnectionError(e) - except socket.error as e: - raise ConnectionError(e) - self._connection.settimeout(TIMEOUT_IN_SECONDS) - - @property - def connected(self): - return self._connection.connected - - def send(self, packet_text): - try: - self._connection.send(packet_text) - except websocket.WebSocketTimeoutException as e: - message = 'timed out while sending %s (%s)' % (packet_text, e) - self._log(logging.WARNING, message) - raise TimeoutError(e) - except socket.error as e: - message = 'disconnected while sending %s (%s)' % (packet_text, e) - self._log(logging.WARNING, message) - raise ConnectionError(message) - - def recv(self, timeout=None): - if timeout: - self._connection.settimeout(timeout) - try: - yield self._connection.recv() - except websocket.WebSocketTimeoutException as e: - raise TimeoutError(e) - except websocket.SSLError as e: - if 'timed out' in e.message: - raise TimeoutError(e) - else: - raise ConnectionError(e) - except websocket.WebSocketConnectionClosedException as e: - raise ConnectionError('connection closed (%s)' % e) - except socket.error as e: - raise ConnectionError(e) - - def close(self): - self._connection.close() - - -class _XHR_PollingTransport(_AbstractTransport): - - def __init__(self, socketIO_session, is_secure, base_url, **kw): - super(_XHR_PollingTransport, self).__init__() - self._url = '%s://%s/xhr-polling/%s' % ( - 'https' if is_secure else 'http', - base_url, socketIO_session.id) - self._connected = True - self._http_session = _prepare_http_session(kw) - # Create connection - for packet in self.recv_packet(): - self._enqueue_packet(packet) - - @property - def connected(self): - return self._connected - - @property - def _params(self): - return dict(t=int(time.time())) - - def send(self, packet_text): - _get_response( - self._http_session.post, - self._url, - params=self._params, - data=packet_text, - timeout=TIMEOUT_IN_SECONDS) - - def recv(self, timeout=None): - response = _get_response( - self._http_session.get, - self._url, - params=self._params, - timeout=timeout or TIMEOUT_IN_SECONDS, - stream=True) - response_text = response.text - if not response_text.startswith(BOUNDARY): - yield response_text - return - for packet_text in _yield_text_from_framed_data(response_text): - yield packet_text - - def close(self): - _get_response( - self._http_session.get, - self._url, - params=dict(list(self._params.items()) + [('disconnect', True)])) - self._connected = False - - -class _JSONP_PollingTransport(_AbstractTransport): - - RESPONSE_PATTERN = re.compile(r'io.j\[(\d+)\]\("(.*)"\);') - - def __init__(self, socketIO_session, is_secure, base_url, **kw): - super(_JSONP_PollingTransport, self).__init__() - self._url = '%s://%s/jsonp-polling/%s' % ( - 'https' if is_secure else 'http', - base_url, socketIO_session.id) - self._connected = True - self._http_session = _prepare_http_session(kw) - self._id = 0 - # Create connection - for packet in self.recv_packet(): - self._enqueue_packet(packet) - - @property - def connected(self): - return self._connected - - @property - def _params(self): - return dict(t=int(time.time()), i=self._id) - - def send(self, packet_text): - _get_response( - self._http_session.post, - self._url, - params=self._params, - data='d=%s' % requests.utils.quote(json.dumps(packet_text)), - headers={'content-type': 'application/x-www-form-urlencoded'}, - timeout=TIMEOUT_IN_SECONDS) - - def recv(self, timeout=None): - 'Decode the JavaScript response so that we can parse it as JSON' - response = _get_response( - self._http_session.get, - self._url, - params=self._params, - headers={'content-type': 'text/javascript; charset=UTF-8'}, - timeout=timeout or TIMEOUT_IN_SECONDS) - response_text = response.text - try: - self._id, response_text = self.RESPONSE_PATTERN.match( - response_text).groups() - except AttributeError: - self._log(logging.WARNING, '[packet error] %s', response_text) - return - if not response_text.startswith(BOUNDARY): - yield escape_unicode(response_text) - return - for packet_text in _yield_text_from_framed_data( - response_text, escape_unicode): - yield packet_text - - def close(self): - _get_response( - self._http_session.get, - self._url, - params=dict(list(self._params.items()) + [('disconnect', True)])) - self._connected = False - - -def _yield_text_from_framed_data(framed_data, parse=lambda x: x): - parts = [parse(x) for x in framed_data.split(BOUNDARY)] - for text_length, text in zip(parts[1::2], parts[2::2]): - if text_length != str(len(text)): - warning = 'invalid declared length=%s for packet_text=%s' % ( - text_length, text) - _log.warn('[packet error] %s', warning) - continue - yield text - - -def _get_response(request, *args, **kw): - try: - response = request(*args, **kw) - except requests.exceptions.Timeout as e: - raise TimeoutError(e) - except requests.exceptions.ConnectionError as e: - raise ConnectionError(e) - except requests.exceptions.SSLError as e: - raise ConnectionError('could not negotiate SSL (%s)' % e) - status = response.status_code - if 200 != status: - raise ConnectionError('unexpected status code (%s)' % status) - return response - - -def _prepare_http_session(kw): - http_session = requests.Session() - http_session.headers.update(kw.get('headers', {})) - http_session.auth = kw.get('auth') - http_session.proxies.update(kw.get('proxies', {})) - http_session.hooks.update(kw.get('hooks', {})) - http_session.params.update(kw.get('params', {})) - http_session.verify = kw.get('verify') - http_session.cert = kw.get('cert') - http_session.cookies.update(kw.get('cookies', {})) - return http_session