diff --git a/.travis.yml b/.travis.yml index e6415c3..1db3eb1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,12 +7,13 @@ before_install: - sudo apt-get update - sudo apt-get install nodejs install: - - npm install -G socket.io@0.9 + - npm install -G socket.io + - npm install -G yargs - pip install -U requests - pip install -U six - pip install -U websocket-client - pip install -U coverage before_script: - - node serve-tests.js & + - DEBUG=* node socketIO_client/tests/serve.js & - sleep 3 script: nosetests diff --git a/CHANGES.rst b/CHANGES.rst index b4dbbab..8970d11 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,22 @@ +0.6.5 +----- +- Updated wait loop to be more responsive under websocket transport + +0.6.4 +----- +- Fixed support for Python 3 +- Fixed thread cleanup + +0.6.3 +----- +- Upgraded to socket.io protocol 1.x for websocket transport +- Added locks to fix concurrency issues with polling transport +- Fixed SSL support + +0.6.1 +----- +- Upgraded to socket.io protocol 1.x thanks to Sean Arietta and Joe Palmer + 0.5.6 ----- - Backported to support requests 0.8.2 diff --git a/LICENSE b/LICENSE index 4e68cea..dca6c1b 100644 --- a/LICENSE +++ b/LICENSE @@ -1,7 +1,19 @@ Copyright (c) 2013 Roy Hyunjin Han and contributors -Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: -The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/MANIFEST.in b/MANIFEST.in index 346209d..abf7482 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,3 +1,3 @@ recursive-include socketIO_client * -include *.rst +include *.html *.js *.rst global-exclude *.pyc diff --git a/README.rst b/README.rst index b864c88..4f544b2 100644 --- a/README.rst +++ b/README.rst @@ -1,4 +1,4 @@ -.. image:: https://travis-ci.org/invisibleroads/socketIO-client.svg?branch=v0.5.4 +.. image:: https://travis-ci.org/invisibleroads/socketIO-client.svg?branch=master :target: https://travis-ci.org/invisibleroads/socketIO-client @@ -6,7 +6,7 @@ socketIO-client =============== Here is a `socket.io `_ client library for Python. You can use it to write test code for your socket.io server. -Please note that this version implements `socket.io protocol 0.9 `_, which is compatible with `gevent-socketio `_. If you want to communicate using `socket.io protocol 1.x `_, please use `socketIO-client 0.6.3 `_ or higher. +Please note that this version implements `socket.io protocol 1.x `_, which is not backwards compatible. If you want to communicate using `socket.io protocol 0.9 `_ (which is compatible with `gevent-socketio `_), please use `socketIO-client 0.5.6 `_. Installation @@ -34,12 +34,18 @@ Activate isolated environment. :: Launch your socket.io server. :: - npm install -g socket.io@0.9 - node serve-tests.js + # Get package folder + PACKAGE_FOLDER=`python -c "import os, socketIO_client;\ + print(os.path.dirname(socketIO_client.__file__))"` + # Start socket.io server + DEBUG=* node $PACKAGE_FOLDER/tests/serve.js + # Start proxy server in a separate terminal on the same machine + DEBUG=* node $PACKAGE_FOLDER/tests/proxy.js For debugging information, run these commands first. :: import logging + logging.getLogger('requests').setLevel(logging.WARNING) logging.basicConfig(level=logging.DEBUG) Emit. :: @@ -55,7 +61,7 @@ Emit with callback. :: from socketIO_client import SocketIO, LoggingNamespace def on_bbb_response(*args): - print 'on_bbb_response', args + print('on_bbb_response', args) with SocketIO('localhost', 8000, LoggingNamespace) as socketIO: socketIO.emit('bbb', {'xxx': 'yyy'}, on_bbb_response) @@ -66,7 +72,7 @@ Define events. :: from socketIO_client import SocketIO, LoggingNamespace def on_aaa_response(*args): - print 'on_aaa_response', args + print('on_aaa_response', args) socketIO = SocketIO('localhost', 8000, LoggingNamespace) socketIO.on('aaa_response', on_aaa_response) @@ -80,7 +86,7 @@ Define events in a namespace. :: class Namespace(BaseNamespace): def on_aaa_response(self, *args): - print 'on_aaa_response', args + print('on_aaa_response', args) self.emit('bbb') socketIO = SocketIO('localhost', 8000, Namespace) @@ -94,7 +100,7 @@ Define standard events. :: class Namespace(BaseNamespace): def on_connect(self): - print '[Connected]' + print('[Connected]') socketIO = SocketIO('localhost', 8000, Namespace) socketIO.wait(seconds=1) @@ -106,12 +112,12 @@ Define different namespaces on a single socket. :: class ChatNamespace(BaseNamespace): def on_aaa_response(self, *args): - print 'on_aaa_response', args + print('on_aaa_response', args) class NewsNamespace(BaseNamespace): def on_aaa_response(self, *args): - print 'on_aaa_response', args + print('on_aaa_response', args) socketIO = SocketIO('localhost', 8000) chat_namespace = socketIO.define(ChatNamespace, '/chat') @@ -132,7 +138,8 @@ Specify params, headers, cookies, proxies thanks to the `requests `_ wrote the `socket.io specification `_. +- `Guillermo Rauch `_ wrote the `socket.io specification `_. - `Hiroki Ohtani `_ wrote `websocket-client `_. - `rod `_ wrote a `prototype for a Python client to a socket.io server `_. - `Alexandre Bourget `_ wrote `gevent-socketio `_, which is a socket.io server written in Python. - `Paul Kienzle `_, `Zac Lee `_, `Josh VanderLinden `_, `Ian Fitzpatrick `_, `Lucas Klein `_, `Rui Chicoria `_, `Travis Odom `_, `Patrick Huber `_, `Brad Campbell `_, `Daniel `_, `Sean Arietta `_ submitted code to expand support of the socket.io protocol. - `Bernard Pratz `_, `Francis Bull `_ wrote prototypes to support xhr-polling and jsonp-polling. -- `Eric Chen `_, `Denis Zinevich `_, `Thiago Hersan `_, `Nayef Copty `_, `Jörgen Karlsson `_, `Branden Ghena `_, `Tim Landscheidt `_ suggested ways to make the connection more robust. +- `Eric Chen `_, `Denis Zinevich `_, `Thiago Hersan `_, `Nayef Copty `_, `Jörgen Karlsson `_, `Branden Ghena `_, `Tim Landscheidt `_, `Matt Porritt `_ suggested ways to make the connection more robust. - `Merlijn van Deen `_, `Frederic Sureau `_, `Marcus Cobden `_, `Drew Hutchison `_, `wuurrd `_, `Adam Kecer `_, `Alex Monk `_, `Vishal P R `_, `John Vandenberg `_, `Thomas Grainger `_ proposed changes that make the library more friendly and practical for you! diff --git a/TODO.goals b/TODO.goals index e69de29..d72b083 100644 --- a/TODO.goals +++ b/TODO.goals @@ -0,0 +1,48 @@ += Consider supporting both protocols in the same library + https://github.com/invisibleroads/socketIO-client/issues/95 +Add binary support + https://github.com/invisibleroads/socketIO-client/pull/85 + https://github.com/invisibleroads/socketIO-client/issues/70 + https://github.com/invisibleroads/socketIO-client/issues/71 + https://github.com/invisibleroads/socketIO-client/issues/91 +Check requests dependency + https://github.com/invisibleroads/socketIO-client/issues/92 + https://github.com/invisibleroads/socketIO-client/pull/93 + https://github.com/invisibleroads/socketIO-client/commit/b288d89c15d452a30bfeb00f38494f59f71f5a43 +Check SSL + https://github.com/invisibleroads/socketIO-client/issues/86 + https://github.com/invisibleroads/socketIO-client/pull/87 +Look into invalid namespace handling + https://github.com/invisibleroads/socketIO-client/issues/84 +Check unicode issues + https://github.com/invisibleroads/socketIO-client/issues/81 +Check python3 support for socketIO-client 0.5.6 + https://github.com/invisibleroads/socketIO-client/issues/83 +Check OK assertion + https://github.com/invisibleroads/socketIO-client/issues/99 + https://github.com/invisibleroads/socketIO-client/pull/103 +Check why connected=True after termination + https://github.com/invisibleroads/socketIO-client/issues/80 + https://github.com/invisibleroads/socketIO-client/issues/98 +Consider catching heartbeat thread exception + https://github.com/invisibleroads/socketIO-client/issues/100 +Check why it blocks when defining a namespace + https://github.com/invisibleroads/socketIO-client/issues/96 +Check why transports are not being set + https://github.com/invisibleroads/socketIO-client/issues/102 +Look at 404 not found error + https://github.com/invisibleroads/socketIO-client/issues/101 +Look at socketio off and socketio once + https://github.com/invisibleroads/socketIO-client/pull/94 +Implement rooms + https://github.com/invisibleroads/socketIO-client/issues/72 + https://github.com/invisibleroads/socketIO-client/pull/65 +Check tests + https://github.com/invisibleroads/socketIO-client/issues/90 +Check whether it works on Windows 8 + https://github.com/invisibleroads/socketIO-client/issues/97 +Add debian packaging support + https://github.com/invisibleroads/socketIO-client/pull/89 + ++ Review issues and pull requests ++ Order issues diff --git a/serve-tests.js b/serve-tests.js deleted file mode 100644 index 6c2edfc..0000000 --- a/serve-tests.js +++ /dev/null @@ -1,79 +0,0 @@ -var io = require('socket.io').listen(8000); - -var main = io.of('').on('connection', function(socket) { - socket.on('message', function(data, fn) { - if (fn) { // Client expects a callback - if (data) { - fn(data); - } else { - fn(); - } - } else if (typeof data === 'object') { - socket.json.send(data ? data : 'message_response'); // object or null - } else { - socket.send(data ? data : 'message_response'); // string or '' - } - }); - socket.on('emit', function() { - socket.emit('emit_response'); - }); - socket.on('emit_with_payload', function(payload) { - socket.emit('emit_with_payload_response', payload); - }); - socket.on('emit_with_multiple_payloads', function(payload, payload) { - socket.emit('emit_with_multiple_payloads_response', payload, payload); - }); - socket.on('emit_with_callback', function(fn) { - fn(); - }); - socket.on('emit_with_callback_with_payload', function(fn) { - fn(PAYLOAD); - }); - socket.on('emit_with_callback_with_multiple_payloads', function(fn) { - fn(PAYLOAD, PAYLOAD); - }); - socket.on('emit_with_event', function(payload) { - socket.emit('emit_with_event_response', payload); - }); - socket.on('ack', function(payload) { - socket.emit('ack_response', payload, function(payload) { - socket.emit('ack_callback_response', payload); - }); - }); - socket.on('aaa', function() { - socket.emit('aaa_response', PAYLOAD); - }); - socket.on('bbb', function(payload, fn) { - if (fn) { - fn(payload); - } - }); - socket.on('wait_with_disconnect', function() { - socket.emit('wait_with_disconnect_response'); - }); -}); - -var chat = io.of('/chat').on('connection', function (socket) { - socket.on('emit_with_payload', function(payload) { - socket.emit('emit_with_payload_response', payload); - }); - socket.on('aaa', function() { - socket.emit('aaa_response', 'in chat'); - }); - socket.on('ack', function(payload) { - socket.emit('ack_response', payload, function(payload) { - socket.emit('ack_callback_response', payload); - }); - }); -}); - -var news = io.of('/news').on('connection', function (socket) { - socket.on('emit_with_payload', function(payload) { - socket.emit('emit_with_payload_response', payload); - }); - socket.on('aaa', function() { - socket.emit('aaa_response', 'in news'); - }); -}); - -var PAYLOAD = {'xxx': 'yyy'}; diff --git a/setup.py b/setup.py index 7a7567b..a2a00e3 100644 --- a/setup.py +++ b/setup.py @@ -1,32 +1,38 @@ -import os -from setuptools import setup, find_packages +import io +from os.path import abspath, dirname, join +from setuptools import find_packages, setup -here = os.path.abspath(os.path.dirname(__file__)) -README = open(os.path.join(here, 'README.rst')).read() -CHANGES = open(os.path.join(here, 'CHANGES.rst')).read() +REQUIREMENTS = [ + 'requests', + 'six', + 'websocket-client', +] +HERE = dirname(abspath(__file__)) +LOAD_TEXT = lambda name: io.open(join(HERE, name), encoding='UTF-8').read() +DESCRIPTION = '\n\n'.join(LOAD_TEXT(_) for _ in [ + 'README.rst', + 'CHANGES.rst', +]) setup( - name='socketIO-client', - version='0.5.6', + name='socketIO_client', + version='0.6.5', description='A socket.io client library', - long_description=README + '\n\n' + CHANGES, + long_description=DESCRIPTION, license='MIT', classifiers=[ 'Intended Audience :: Developers', 'Programming Language :: Python', 'License :: OSI Approved :: MIT License', + 'Development Status :: 5 - Production/Stable', ], keywords='socket.io node.js', author='Roy Hyunjin Han', author_email='rhh@crosscompute.com', url='https://github.com/invisibleroads/socketIO-client', - install_requires=[ - 'requests', - 'six', - 'websocket-client', - ], + install_requires=REQUIREMENTS, tests_require=[ 'nose', 'coverage', diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index 208e139..fbf2c7d 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -1,182 +1,307 @@ -import logging -import json -import requests -import time -from collections import namedtuple -try: - from urllib.parse import urlparse as parse_url -except ImportError: - from urlparse import urlparse as parse_url - -from .exceptions import ( - SocketIOError, ConnectionError, TimeoutError, PacketError) -from .symmetries import _get_text +from .exceptions import ConnectionError, TimeoutError, PacketError +from .heartbeats import HeartbeatThread +from .logs import LoggingMixin +from .namespaces import ( + EngineIONamespace, SocketIONamespace, LoggingSocketIONamespace, + find_callback) +from .parsers import ( + parse_host, parse_engineIO_session, + format_socketIO_packet_data, parse_socketIO_packet_data, + get_namespace_path) +from .symmetries import get_character from .transports import ( - _get_response, TRANSPORTS, - _WebsocketTransport, _XHR_PollingTransport, _JSONP_PollingTransport) + WebsocketTransport, XHR_PollingTransport, prepare_http_session, TRANSPORTS) -__version__ = '0.5.4' -_SocketIOSession = namedtuple('_SocketIOSession', [ - 'id', - 'heartbeat_timeout', - 'server_supported_transports', -]) -_log = logging.getLogger(__name__) -PROTOCOL_VERSION = 1 -RETRY_INTERVAL_IN_SECONDS = 1 +__all__ = 'SocketIO', 'SocketIONamespace' +__version__ = '0.6.3' +BaseNamespace = SocketIONamespace +LoggingNamespace = LoggingSocketIONamespace -class BaseNamespace(object): - 'Define client behavior' +def retry(f): + def wrap(*args, **kw): + self = args[0] + try: + return f(*args, **kw) + except (TimeoutError, ConnectionError): + self._opened = False + return f(*args, **kw) + return wrap - def __init__(self, _transport, path): - self._transport = _transport - self.path = path - self._was_connected = False - self._callback_by_event = {} - self.initialize() - def initialize(self): - 'Initialize custom variables here; you can override this method' +class EngineIO(LoggingMixin): + + def __init__( + self, host, port=None, Namespace=EngineIONamespace, + wait_for_connection=True, transports=TRANSPORTS, + resource='engine.io', hurry_interval_in_seconds=1, **kw): + self._is_secure, self._url = parse_host(host, port, resource) + self._wait_for_connection = wait_for_connection + self._client_transports = transports + self._hurry_interval_in_seconds = hurry_interval_in_seconds + self._http_session = prepare_http_session(kw) + + self._log_name = self._url + self._wants_to_close = False + self._opened = False + + if Namespace: + self.define(Namespace) + self._transport + + # Connect + + @property + def _transport(self): + if self._opened: + return self._transport_instance + self._engineIO_session = self._get_engineIO_session() + self._negotiate_transport() + self._connect_namespaces() + self._opened = True + self._reset_heartbeat() + return self._transport_instance + + def _get_engineIO_session(self): + warning_screen = self._yield_warning_screen() + for elapsed_time in warning_screen: + transport = XHR_PollingTransport( + self._http_session, self._is_secure, self._url) + try: + engineIO_packet_type, engineIO_packet_data = next( + transport.recv_packet()) + break + except (TimeoutError, ConnectionError) as e: + if not self._wait_for_connection: + raise + warning = Exception('[waiting for connection] %s' % e) + warning_screen.throw(warning) + assert engineIO_packet_type == 0 # engineIO_packet_type == open + return parse_engineIO_session(engineIO_packet_data) + + def _negotiate_transport(self): + self._transport_instance = self._get_transport('xhr-polling') + self.transport_name = 'xhr-polling' + is_ws_client = 'websocket' in self._client_transports + is_ws_server = 'websocket' in self._engineIO_session.transport_upgrades + if is_ws_client and is_ws_server: + try: + transport = self._get_transport('websocket') + transport.send_packet(2, 'probe') + for packet_type, packet_data in transport.recv_packet(): + if packet_type == 3 and packet_data == b'probe': + transport.send_packet(5, '') + self._transport_instance = transport + self.transport_name = 'websocket' + else: + self._warn('unexpected engine.io packet') + except Exception: + pass + self._debug('[transport selected] %s', self.transport_name) + + def _reset_heartbeat(self): + try: + self._heartbeat_thread.halt() + hurried = self._heartbeat_thread.hurried + except AttributeError: + hurried = False + ping_interval = self._engineIO_session.ping_interval + if self.transport_name.endswith('-polling'): + # Use ping/pong to unblock recv for polling transport + hurry_interval_in_seconds = self._hurry_interval_in_seconds + else: + # Use timeout to unblock recv for websocket transport + hurry_interval_in_seconds = ping_interval + self._heartbeat_thread = HeartbeatThread( + send_heartbeat=self._ping, + relax_interval_in_seconds=ping_interval, + hurry_interval_in_seconds=hurry_interval_in_seconds) + self._heartbeat_thread.start() + if hurried: + self._heartbeat_thread.hurry() + self._debug('[heartbeat reset]') + + def _connect_namespaces(self): pass - def message(self, data='', callback=None): - self._transport.message(self.path, data, callback) + def _get_transport(self, transport_name): + SelectedTransport = { + 'xhr-polling': XHR_PollingTransport, + 'websocket': WebsocketTransport, + }[transport_name] + return SelectedTransport( + self._http_session, self._is_secure, self._url, + self._engineIO_session) - def emit(self, event, *args, **kw): - callback, args = find_callback(args, kw) - self._transport.emit(self.path, event, args, callback) + def __enter__(self): + return self - def disconnect(self): - self._transport.disconnect(self.path) + def __exit__(self, *exception_pack): + self._close() + + def __del__(self): + self._close() + + # Define + + def define(self, Namespace): + self._namespace = namespace = Namespace(self) + return namespace def on(self, event, callback): - 'Define a callback to handle a custom event emitted by the server' - self._callback_by_event[event] = callback - - def on_connect(self): - 'Called after server connects; you can override this method' - pass - - def on_disconnect(self): - 'Called after server disconnects; you can override this method' - pass - - def on_heartbeat(self): - 'Called after server sends a heartbeat; you can override this method' - pass - - def on_message(self, data): - 'Called after server sends a message; you can override this method' - pass - - def on_event(self, event, *args): - """ - Called after server sends an event; you can override this method. - Called only if a custom event handler does not exist, - such as one defined by namespace.on('my_event', my_function). - """ - callback, args = find_callback(args) - if callback: - callback(args) - - def on_error(self, reason, advice): - 'Called after server sends an error; you can override this method' - pass - - def on_noop(self): - 'Called after server sends a noop; you can override this method' - pass - - def on_open(self, *args): - pass - - def on_close(self, *args): - pass - - def on_retry(self, *args): - pass - - def on_reconnect(self, *args): - pass - - def _find_event_callback(self, event): - # Check callbacks defined by on() try: - return self._callback_by_event[event] - except KeyError: + namespace = self.get_namespace() + except PacketError: + namespace = self.define(EngineIONamespace) + return namespace.on(event, callback) + + def get_namespace(self): + try: + return self._namespace + except AttributeError: + raise PacketError('undefined engine.io namespace') + + # Act + + def send(self, engineIO_packet_data): + self._message(engineIO_packet_data) + + def _open(self): + engineIO_packet_type = 0 + self._transport_instance.send_packet(engineIO_packet_type) + + def _close(self): + self._wants_to_close = True + try: + self._heartbeat_thread.halt() + except AttributeError: pass - # Convert connect to reconnect if we have seen connect already - if event == 'connect': - if not self._was_connected: - self._was_connected = True - else: - event = 'reconnect' - # Check callbacks defined explicitly or use on_event() - return getattr( - self, - 'on_' + event.replace(' ', '_'), - lambda *args: self.on_event(event, args)) + if not self._opened: + return + engineIO_packet_type = 1 + try: + self._transport_instance.send_packet(engineIO_packet_type) + except (TimeoutError, ConnectionError): + pass + self._opened = False + + def _ping(self, engineIO_packet_data=''): + engineIO_packet_type = 2 + self._transport_instance.send_packet( + engineIO_packet_type, engineIO_packet_data) + + def _pong(self, engineIO_packet_data=''): + engineIO_packet_type = 3 + self._transport_instance.send_packet( + engineIO_packet_type, engineIO_packet_data) + + @retry + def _message(self, engineIO_packet_data, with_transport_instance=False): + engineIO_packet_type = 4 + if with_transport_instance: + transport = self._transport_instance + else: + transport = self._transport + transport.send_packet(engineIO_packet_type, engineIO_packet_data) + self._debug('[socket.io packet sent] %s', engineIO_packet_data) + + def _upgrade(self): + engineIO_packet_type = 5 + self._transport_instance.send_packet(engineIO_packet_type) + + def _noop(self): + engineIO_packet_type = 6 + self._transport_instance.send_packet(engineIO_packet_type) + + # React + + def wait(self, seconds=None, **kw): + 'Wait in a loop and react to events as defined in the namespaces' + # Use ping/pong to unblock recv for polling transport + self._heartbeat_thread.hurry() + # Use timeout to unblock recv for websocket transport + self._transport.set_timeout(seconds=1) + # Listen + warning_screen = self._yield_warning_screen(seconds) + for elapsed_time in warning_screen: + if self._should_stop_waiting(**kw): + break + try: + try: + self._process_packets() + except TimeoutError: + pass + except ConnectionError as e: + self._opened = False + try: + warning = Exception('[connection error] %s' % e) + warning_screen.throw(warning) + except StopIteration: + self._warn(warning) + try: + namespace = self.get_namespace() + namespace.on_disconnect() + except PacketError: + pass + self._heartbeat_thread.relax() + self._transport.set_timeout() + + def _should_stop_waiting(self): + return self._wants_to_close + + def _process_packets(self): + for engineIO_packet in self._transport.recv_packet(): + try: + self._process_packet(engineIO_packet) + except PacketError as e: + self._warn('[packet error] %s', e) + + def _process_packet(self, packet): + engineIO_packet_type, engineIO_packet_data = packet + # Launch callbacks + namespace = self.get_namespace() + try: + delegate = { + 0: self._on_open, + 1: self._on_close, + 2: self._on_ping, + 3: self._on_pong, + 4: self._on_message, + 5: self._on_upgrade, + 6: self._on_noop, + }[engineIO_packet_type] + except KeyError: + raise PacketError( + 'unexpected engine.io packet type (%s)' % engineIO_packet_type) + delegate(engineIO_packet_data, namespace) + if engineIO_packet_type is 4: + return engineIO_packet_data + + def _on_open(self, data, namespace): + namespace._find_packet_callback('open')() + + def _on_close(self, data, namespace): + namespace._find_packet_callback('close')() + + def _on_ping(self, data, namespace): + self._pong(data) + namespace._find_packet_callback('ping')(data) + + def _on_pong(self, data, namespace): + namespace._find_packet_callback('pong')(data) + + def _on_message(self, data, namespace): + namespace._find_packet_callback('message')(data) + + def _on_upgrade(self, data, namespace): + namespace._find_packet_callback('upgrade')() + + def _on_noop(self, data, namespace): + namespace._find_packet_callback('noop')() -class LoggingNamespace(BaseNamespace): - - def _log(self, level, msg, *attrs): - _log.log(level, '%s: %s' % (self._transport._url, msg), *attrs) - - def on_connect(self): - self._log(logging.DEBUG, '%s [connect]', self.path) - super(LoggingNamespace, self).on_connect() - - def on_disconnect(self): - self._log(logging.DEBUG, '%s [disconnect]', self.path) - super(LoggingNamespace, self).on_disconnect() - - def on_heartbeat(self): - self._log(logging.DEBUG, '%s [heartbeat]', self.path) - super(LoggingNamespace, self).on_heartbeat() - - def on_message(self, data): - self._log(logging.INFO, '%s [message] %s', self.path, data) - super(LoggingNamespace, self).on_message(data) - - def on_event(self, event, *args): - callback, args = find_callback(args) - arguments = [repr(_) for _ in args] - if callback: - arguments.append('callback(*args)') - self._log(logging.INFO, '%s [event] %s(%s)', self.path, event, - ', '.join(arguments)) - super(LoggingNamespace, self).on_event(event, args) - - def on_error(self, reason, advice): - self._log(logging.INFO, '%s [error] %s', self.path, advice) - super(LoggingNamespace, self).on_error(reason, advice) - - def on_noop(self): - self._log(logging.INFO, '%s [noop]', self.path) - super(LoggingNamespace, self).on_noop() - - def on_open(self, *args): - self._log(logging.INFO, '%s [open] %s', self.path, args) - super(LoggingNamespace, self).on_open(*args) - - def on_close(self, *args): - self._log(logging.INFO, '%s [close] %s', self.path, args) - super(LoggingNamespace, self).on_close(*args) - - def on_retry(self, *args): - self._log(logging.INFO, '%s [retry] %s', self.path, args) - super(LoggingNamespace, self).on_retry(*args) - - def on_reconnect(self, *args): - self._log(logging.INFO, '%s [reconnect] %s', self.path, args) - super(LoggingNamespace, self).on_reconnect(*args) - - -class SocketIO(object): - +class SocketIO(EngineIO): """Create a socket.io client that connects to a socket.io server at the specified host and port. @@ -186,7 +311,8 @@ class SocketIO(object): - Specify desired transports=['websocket', 'xhr-polling']. - Pass query params, headers, cookies, proxies as keyword arguments. - SocketIO('localhost', 8000, + SocketIO( + 'localhost', 8000, params={'q': 'qqq'}, headers={'Authorization': 'Basic ' + b64encode('username:password')}, cookies={'a': 'aaa'}, @@ -194,334 +320,196 @@ class SocketIO(object): """ def __init__( - self, host, port=None, Namespace=None, + self, host, port=None, Namespace=SocketIONamespace, wait_for_connection=True, transports=TRANSPORTS, - resource='socket.io', **kw): - self.is_secure, self._base_url = _parse_host(host, port, resource) - self.wait_for_connection = wait_for_connection + resource='socket.io', hurry_interval_in_seconds=1, **kw): self._namespace_by_path = {} - self._client_supported_transports = transports - self._kw = kw - if Namespace: - self.define(Namespace) + self._callback_by_ack_id = {} + self._ack_id = 0 + super(SocketIO, self).__init__( + host, port, Namespace, wait_for_connection, transports, + resource, hurry_interval_in_seconds, **kw) - def _log(self, level, msg, *attrs): - _log.log(level, '%s: %s' % (self._base_url, msg), *attrs) - - def __enter__(self): - return self - - def __exit__(self, *exception_pack): - self.disconnect() - - def __del__(self): - self.disconnect() - - def define(self, Namespace, path=''): - if path: - self._transport.connect(path) - namespace = Namespace(self._transport, path) - self._namespace_by_path[path] = namespace - return namespace - - def on(self, event, callback, path=''): - if path not in self._namespace_by_path: - self.define(BaseNamespace, path) - return self.get_namespace(path).on(event, callback) - - def message(self, data='', callback=None, path=''): - self._transport.message(path, data, callback) - - def emit(self, event, *args, **kw): - path = kw.get('path', '') - callback, args = find_callback(args, kw) - self._transport.emit(path, event, args, callback) - - def wait(self, seconds=None, for_callbacks=False, on_error=None, error_args=None): - """Wait in a loop and process events as defined in the namespaces. - - - Omit seconds, i.e. call wait() without arguments, to wait forever. - """ - warning_screen = _yield_warning_screen(seconds) - timeout = min(self._heartbeat_interval, seconds) - for elapsed_time in warning_screen: - if self._stop_waiting(for_callbacks): - break - try: - try: - self._process_events(timeout) - except TimeoutError: - pass - next(self._heartbeat_pacemaker) - except ConnectionError as e: - if on_error is not None and callable(on_error): - on_error(e, error_args) - try: - warning = Exception('[connection error] %s' % e) - warning_screen.throw(warning) - except StopIteration: - self._log(logging.WARNING, warning) - try: - namespace = self._namespace_by_path[''] - namespace.on_disconnect() - except KeyError: - pass - - def _process_events(self, timeout=None): - for packet in self._transport.recv_packet(timeout): - try: - self._process_packet(packet) - except PacketError as e: - self._log(logging.WARNING, '[packet error] %s', e) - - def _process_packet(self, packet): - code, packet_id, path, data = packet - namespace = self.get_namespace(path or '') - delegate = self._get_delegate(code, packet) - delegate(packet, namespace._find_event_callback) - - def _stop_waiting(self, for_callbacks): - # Use __transport to make sure that we do not reconnect inadvertently - if for_callbacks and not self.__transport.has_ack_callback: - return True - if self.__transport._wants_to_disconnect: - return True - return False - - def wait_for_callbacks(self, seconds=None): - self.wait(seconds, for_callbacks=True) - - def disconnect(self, path=''): - try: - self._transport.disconnect(path) - except ReferenceError: - pass - try: - namespace = self._namespace_by_path[path] - namespace.on_disconnect() - del self._namespace_by_path[path] - except KeyError: - pass + # Connect @property def connected(self): - try: - transport = self.__transport - except AttributeError: - return False - else: - return transport.connected + return self._opened - @property - def _transport(self): - try: - if self.connected: - return self.__transport - except AttributeError as e: - pass - socketIO_session = self._get_socketIO_session() - supported_transports = self._get_supported_transports(socketIO_session) - self._heartbeat_pacemaker = self._make_heartbeat_pacemaker( - heartbeat_timeout=socketIO_session.heartbeat_timeout) - next(self._heartbeat_pacemaker) - warning_screen = _yield_warning_screen(seconds=None) - for elapsed_time in warning_screen: - try: - self._transport_name = supported_transports.pop(0) - except IndexError: - raise ConnectionError('Could not negotiate a transport') - try: - self.__transport = self._get_transport( - socketIO_session, self._transport_name) - break - except ConnectionError: - pass + def _connect_namespaces(self): for path, namespace in self._namespace_by_path.items(): - namespace._transport = self.__transport + namespace._transport = self._transport_instance if path: - self.__transport.connect(path) - return self.__transport + self.connect(path, with_transport_instance=True) - def _get_socketIO_session(self): - warning_screen = _yield_warning_screen(seconds=None) - for elapsed_time in warning_screen: - try: - return _get_socketIO_session( - self.is_secure, self._base_url, **self._kw) - except ConnectionError as e: - if not self.wait_for_connection: - raise - warning = Exception('[waiting for connection] %s' % e) - try: - warning_screen.throw(warning) - except StopIteration: - self._log(logging.WARNING, warning) + def __exit__(self, *exception_pack): + self.disconnect() + super(SocketIO, self).__exit__(*exception_pack) - def _get_supported_transports(self, session): - self._log( - logging.DEBUG, '[transports available] %s', - ' '.join(session.server_supported_transports)) - supported_transports = [ - x for x in self._client_supported_transports if - x in session.server_supported_transports] - if not supported_transports: - raise SocketIOError(' '.join([ - 'could not negotiate a transport:', - 'client supports %s but' % ', '.join( - self._client_supported_transports), - 'server supports %s' % ', '.join( - session.server_supported_transports), - ])) - return supported_transports + def __del__(self): + self.disconnect() + super(SocketIO, self).__del__() - def _get_transport(self, session, transport_name): - self._log(logging.DEBUG, '[transport chosen] %s', transport_name) - return { - 'websocket': _WebsocketTransport, - 'xhr-polling': _XHR_PollingTransport, - 'jsonp-polling': _JSONP_PollingTransport, - }[transport_name](session, self.is_secure, self._base_url, **self._kw) + # Define - def _make_heartbeat_pacemaker(self, heartbeat_timeout): - self._heartbeat_interval = heartbeat_timeout / 2 - heartbeat_time = time.time() - while True: - yield - if time.time() - heartbeat_time > self._heartbeat_interval: - heartbeat_time = time.time() - self._transport.send_heartbeat() + def define(self, Namespace, path=''): + self._namespace_by_path[path] = namespace = Namespace(self, path) + if path: + self.connect(path) + self.wait(for_connect=True) + return namespace + + def on(self, event, callback, path=''): + try: + namespace = self.get_namespace(path) + except PacketError: + namespace = self.define(SocketIONamespace, path) + return namespace.on(event, callback) def get_namespace(self, path=''): try: return self._namespace_by_path[path] except KeyError: - raise PacketError('unhandled namespace path (%s)' % path) + raise PacketError('undefined socket.io namespace (%s)' % path) - def _get_delegate(self, code, packet): + # Act + + def connect(self, path, with_transport_instance=False): + socketIO_packet_type = 0 + socketIO_packet_data = format_socketIO_packet_data(path) + self._message( + str(socketIO_packet_type) + socketIO_packet_data, + with_transport_instance) + + def disconnect(self, path=''): + if not path or not self._opened: + self._close() + elif path: + socketIO_packet_type = 1 + socketIO_packet_data = format_socketIO_packet_data(path) + try: + self._message(str(socketIO_packet_type) + socketIO_packet_data) + except (TimeoutError, ConnectionError): + pass try: - return { - '0': self._on_disconnect, - '1': self._on_connect, - '2': self._on_heartbeat, - '3': self._on_message, - '4': self._on_json, - '5': self._on_event, - '6': self._on_ack, - '7': self._on_error, - '8': self._on_noop, - '': self._on_noop - }[code] + namespace = self._namespace_by_path.pop(path) + namespace.on_disconnect() except KeyError: - raise PacketError('unexpected code ({}): {}'.format([code], packet)) + pass - def _on_disconnect(self, packet, find_event_callback): - find_event_callback('disconnect')() + def emit(self, event, *args, **kw): + path = kw.get('path', '') + callback, args = find_callback(args, kw) + ack_id = self._set_ack_callback(callback) if callback else None + args = [event] + list(args) + socketIO_packet_type = 2 + socketIO_packet_data = format_socketIO_packet_data(path, ack_id, args) + self._message(str(socketIO_packet_type) + socketIO_packet_data) - def _on_connect(self, packet, find_event_callback): - find_event_callback('connect')() + def send(self, data='', callback=None, **kw): + path = kw.get('path', '') + args = [data] + if callback: + args.append(callback) + self.emit('message', *args, path=path) - def _on_heartbeat(self, packet, find_event_callback): - find_event_callback('heartbeat')() + def _ack(self, path, ack_id, *args): + socketIO_packet_type = 3 + socketIO_packet_data = format_socketIO_packet_data(path, ack_id, args) + self._message(str(socketIO_packet_type) + socketIO_packet_data) - def _on_message(self, packet, find_event_callback): - code, packet_id, path, data = packet - args = data - self._send_args('message', args, path, packet_id, find_event_callback) + # React - def _send_args(self, event, args, path, packet_id, find_event_callback): - ev_args = [args] - if packet_id: - ack = self._prepare_to_send_ack(path, packet_id) - ev_args.append(ack) - find_event_callback(event)(*ev_args) + def wait_for_callbacks(self, seconds=None): + self.wait(seconds, for_callbacks=True) - def _on_json(self, packet, find_event_callback): - code, packet_id, path, data = packet - args = json.loads(data) - self._send_args('message', args, path, packet_id, find_event_callback) + def _should_stop_waiting(self, for_connect=False, for_callbacks=False): + if for_connect: + for namespace in self._namespace_by_path.values(): + is_namespace_connected = getattr( + namespace, '_connected', False) + if not is_namespace_connected: + return False + return True + if for_callbacks and not self._has_ack_callback: + return True + return super(SocketIO, self)._should_stop_waiting() - def _on_event(self, packet, find_event_callback): - code, packet_id, path, data = packet - value_by_name = json.loads(data) - event = value_by_name['name'] - args = value_by_name.get('args', []) - self._send_args(event, args, path, packet_id, find_event_callback) - - def _on_ack(self, packet, find_event_callback): - code, packet_id, path, data = packet - data_parts = data.split('+', 1) - packet_id = data_parts[0] + def _process_packet(self, packet): + engineIO_packet_data = super(SocketIO, self)._process_packet(packet) + if engineIO_packet_data is None: + return + self._debug('[socket.io packet received] %s', engineIO_packet_data) + socketIO_packet_type = int(get_character(engineIO_packet_data, 0)) + socketIO_packet_data = engineIO_packet_data[1:] + # Launch callbacks + path = get_namespace_path(socketIO_packet_data) + namespace = self.get_namespace(path) try: - ack_callback = self._transport.get_ack_callback(packet_id) + delegate = { + 0: self._on_connect, + 1: self._on_disconnect, + 2: self._on_event, + 3: self._on_ack, + 4: self._on_error, + 5: self._on_binary_event, + 6: self._on_binary_ack, + }[socketIO_packet_type] + except KeyError: + raise PacketError( + 'unexpected socket.io packet type (%s)' % socketIO_packet_type) + delegate(socketIO_packet_data, namespace) + return socketIO_packet_data + + def _on_connect(self, data, namespace): + namespace._connected = True + namespace._find_packet_callback('connect')() + + def _on_disconnect(self, data, namespace): + namespace._connected = False + namespace._find_packet_callback('disconnect')() + + def _on_event(self, data, namespace): + data_parsed = parse_socketIO_packet_data(data) + args = data_parsed.args + try: + event = args.pop(0) + except IndexError: + raise PacketError('missing event name') + if data_parsed.ack_id is not None: + args.append(self._prepare_to_send_ack( + data_parsed.path, data_parsed.ack_id)) + namespace._find_packet_callback(event)(*args) + + def _on_ack(self, data, namespace): + data_parsed = parse_socketIO_packet_data(data) + try: + ack_callback = self._get_ack_callback(data_parsed.ack_id) except KeyError: return - args = json.loads(data_parts[1]) if len(data_parts) > 1 else [] - ack_callback(args) + ack_callback(*data_parsed.args) - def _on_error(self, packet, find_event_callback): - code, packet_id, path, data = packet - reason, advice = data.split('+', 1) - find_event_callback('error')(reason, advice) + def _on_error(self, data, namespace): + namespace._find_packet_callback('error')(data) - def _on_noop(self, packet, find_event_callback): - find_event_callback('noop')() + def _on_binary_event(self, data, namespace): + self._warn('[not implemented] binary event') - def _prepare_to_send_ack(self, path, packet_id): + def _on_binary_ack(self, data, namespace): + self._warn('[not implemented] binary ack') + + def _prepare_to_send_ack(self, path, ack_id): 'Return function that acknowledges the server' - return lambda *args: self._transport.ack(path, packet_id, *args) + return lambda *args: self._ack(path, ack_id, *args) + def _set_ack_callback(self, callback): + self._ack_id += 1 + self._callback_by_ack_id[self._ack_id] = callback + return self._ack_id -def find_callback(args, kw=None): - 'Return callback whether passed as a last argument or as a keyword' - if args and callable(args[-1]): - return args[-1], args[:-1] - try: - return kw['callback'], args - except (KeyError, TypeError): - return None, args + def _get_ack_callback(self, ack_id): + return self._callback_by_ack_id.pop(ack_id) - -def _parse_host(host, port, resource): - if not host.startswith('http'): - host = 'http://' + host - url_pack = parse_url(host) - is_secure = url_pack.scheme == 'https' - port = port or url_pack.port or (443 if is_secure else 80) - base_url = '%s:%d%s/%s/%s' % ( - url_pack.hostname, port, url_pack.path, resource, PROTOCOL_VERSION) - return is_secure, base_url - - -def _yield_warning_screen(seconds=None): - last_warning = None - for elapsed_time in _yield_elapsed_time(seconds): - try: - yield elapsed_time - except Exception as warning: - warning = str(warning) - if last_warning != warning: - last_warning = warning - _log.warn(warning) - time.sleep(RETRY_INTERVAL_IN_SECONDS) - - -def _yield_elapsed_time(seconds=None): - start_time = time.time() - if seconds is None: - while True: - yield time.time() - start_time - while time.time() - start_time < seconds: - yield time.time() - start_time - - -def _get_socketIO_session(is_secure, base_url, **kw): - server_url = '%s://%s/' % ('https' if is_secure else 'http', base_url) - try: - response = _get_response(requests.get, server_url, **kw) - except TimeoutError as e: - raise ConnectionError(e) - response_parts = _get_text(response).split(':') - return _SocketIOSession( - id=response_parts[0], - heartbeat_timeout=int(response_parts[1]), - server_supported_transports=response_parts[3].split(',')) + @property + def _has_ack_callback(self): + return True if self._callback_by_ack_id else False diff --git a/socketIO_client/heartbeats.py b/socketIO_client/heartbeats.py new file mode 100644 index 0000000..f79c5ca --- /dev/null +++ b/socketIO_client/heartbeats.py @@ -0,0 +1,52 @@ +import logging +from threading import Thread, Event + +from .exceptions import ConnectionError, TimeoutError + + +class HeartbeatThread(Thread): + + daemon = True + + def __init__( + self, send_heartbeat, + relax_interval_in_seconds, + hurry_interval_in_seconds): + super(HeartbeatThread, self).__init__() + self._send_heartbeat = send_heartbeat + self._relax_interval_in_seconds = relax_interval_in_seconds + self._hurry_interval_in_seconds = hurry_interval_in_seconds + self._adrenaline = Event() + self._rest = Event() + self._halt = Event() + + def run(self): + try: + while not self._halt.is_set(): + try: + self._send_heartbeat() + except TimeoutError: + pass + if self._adrenaline.is_set(): + interval_in_seconds = self._hurry_interval_in_seconds + else: + interval_in_seconds = self._relax_interval_in_seconds + self._rest.wait(interval_in_seconds) + except ConnectionError: + logging.debug('[heartbeat connection error]') + + def relax(self): + self._adrenaline.clear() + + def hurry(self): + self._adrenaline.set() + self._rest.set() + self._rest.clear() + + @property + def hurried(self): + return self._adrenaline.is_set() + + def halt(self): + self._rest.set() + self._halt.set() diff --git a/socketIO_client/logs.py b/socketIO_client/logs.py new file mode 100644 index 0000000..32b5d54 --- /dev/null +++ b/socketIO_client/logs.py @@ -0,0 +1,42 @@ +import logging +import time + + +class LoggingMixin(object): + + def _log(self, level, msg, *attrs): + logging.log(level, '%s %s' % (self._log_name, msg), *attrs) + + def _debug(self, msg, *attrs): + self._log(logging.DEBUG, msg, *attrs) + + def _info(self, msg, *attrs): + self._log(logging.INFO, msg, *attrs) + + def _warn(self, msg, *attrs): + self._log(logging.WARNING, msg, *attrs) + + def _yield_warning_screen(self, seconds=None): + last_warning = None + for elapsed_time in _yield_elapsed_time(seconds): + try: + yield elapsed_time + except Exception as warning: + warning = str(warning) + if last_warning != warning: + last_warning = warning + self._warn(warning) + time.sleep(1) + + +def _yield_elapsed_time(seconds=None): + start_time = time.time() + if seconds is None: + while True: + yield _get_elapsed_time(start_time) + while _get_elapsed_time(start_time) < seconds: + yield _get_elapsed_time(start_time) + + +def _get_elapsed_time(start_time): + return time.time() - start_time diff --git a/socketIO_client/namespaces.py b/socketIO_client/namespaces.py new file mode 100644 index 0000000..e78529a --- /dev/null +++ b/socketIO_client/namespaces.py @@ -0,0 +1,224 @@ +from .logs import LoggingMixin + + +class EngineIONamespace(LoggingMixin): + 'Define engine.io client behavior' + + def __init__(self, io): + self._io = io + self._callback_by_event = {} + self._log_name = io._url + self.initialize() + + def initialize(self): + """Initialize custom variables here. + You can override this method.""" + + def on(self, event, callback): + 'Define a callback to handle an event emitted by the server' + self._callback_by_event[event] = callback + + def send(self, data): + 'Send a message' + self._io.send(data) + + def on_open(self): + """Called after engine.io connects. + You can override this method.""" + + def on_close(self): + """Called after engine.io disconnects. + You can override this method.""" + + def on_ping(self, data): + """Called after engine.io sends a ping packet. + You can override this method.""" + + def on_pong(self, data): + """Called after engine.io sends a pong packet. + You can override this method.""" + + def on_message(self, data): + """Called after engine.io sends a message packet. + You can override this method.""" + + def on_upgrade(self): + """Called after engine.io sends an upgrade packet. + You can override this method.""" + + def on_noop(self): + """Called after engine.io sends a noop packet. + You can override this method.""" + + def _find_packet_callback(self, event): + # Check callbacks defined by on() + try: + return self._callback_by_event[event] + except KeyError: + pass + # Check callbacks defined explicitly + return getattr(self, 'on_' + event) + + +class SocketIONamespace(EngineIONamespace): + 'Define socket.io client behavior' + + def __init__(self, io, path): + self.path = path + super(SocketIONamespace, self).__init__(io) + + def connect(self): + self._io.connect(self.path) + + def disconnect(self): + self._io.disconnect(self.path) + + def emit(self, event, *args, **kw): + self._io.emit(event, path=self.path, *args, **kw) + + def send(self, data='', callback=None): + self._io.send(data, callback) + + def on_connect(self): + """Called after socket.io connects. + You can override this method.""" + + def on_reconnect(self): + """Called after socket.io reconnects. + You can override this method.""" + + def on_disconnect(self): + """Called after socket.io disconnects. + You can override this method.""" + + def on_event(self, event, *args): + """ + Called if there is no matching event handler. + You can override this method. + There are three ways to define an event handler: + + - Call socketIO.on() + + socketIO = SocketIO('localhost', 8000) + socketIO.on('my_event', my_function) + + - Call namespace.on() + + namespace = socketIO.get_namespace() + namespace.on('my_event', my_function) + + - Define namespace.on_xxx + + class Namespace(SocketIONamespace): + + def on_my_event(self, *args): + my_function(*args) + + socketIO.define(Namespace)""" + + def on_error(self, data): + """Called after socket.io sends an error packet. + You can override this method.""" + + def _find_packet_callback(self, event): + # Interpret events + if event == 'connect': + if not hasattr(self, '_was_connected'): + self._was_connected = True + else: + event = 'reconnect' + # Check callbacks defined by on() + try: + return self._callback_by_event[event] + except KeyError: + pass + # Check callbacks defined explicitly or use on_event() + return getattr( + self, 'on_' + event.replace(' ', '_'), + lambda *args: self.on_event(event, *args)) + + +class LoggingEngineIONamespace(EngineIONamespace): + + def on_open(self): + self._debug('[engine.io open]') + super(LoggingEngineIONamespace, self).on_open() + + def on_close(self): + self._debug('[engine.io close]') + super(LoggingEngineIONamespace, self).on_close() + + def on_ping(self, data): + self._debug('[engine.io ping] %s', data) + super(LoggingEngineIONamespace, self).on_ping(data) + + def on_pong(self, data): + self._debug('[engine.io pong] %s', data) + super(LoggingEngineIONamespace, self).on_pong(data) + + def on_message(self, data): + self._debug('[engine.io message] %s', data) + super(LoggingEngineIONamespace, self).on_message(data) + + def on_upgrade(self): + self._debug('[engine.io upgrade]') + super(LoggingEngineIONamespace, self).on_upgrade() + + def on_noop(self): + self._debug('[engine.io noop]') + super(LoggingEngineIONamespace, self).on_noop() + + def on_event(self, event, *args): + callback, args = find_callback(args) + arguments = [repr(_) for _ in args] + if callback: + arguments.append('callback(*args)') + self._info('[engine.io event] %s(%s)', event, ', '.join(arguments)) + super(LoggingEngineIONamespace, self).on_event(event, *args) + + +class LoggingSocketIONamespace(SocketIONamespace, LoggingEngineIONamespace): + + def on_connect(self): + self._debug( + '%s[socket.io connect]', _make_logging_header(self.path)) + super(LoggingSocketIONamespace, self).on_connect() + + def on_reconnect(self): + self._debug( + '%s[socket.io reconnect]', _make_logging_header(self.path)) + super(LoggingSocketIONamespace, self).on_reconnect() + + def on_disconnect(self): + self._debug( + '%s[socket.io disconnect]', _make_logging_header(self.path)) + super(LoggingSocketIONamespace, self).on_disconnect() + + def on_event(self, event, *args): + callback, args = find_callback(args) + arguments = [repr(_) for _ in args] + if callback: + arguments.append('callback(*args)') + self._info( + '%s[socket.io event] %s(%s)', _make_logging_header(self.path), + event, ', '.join(arguments)) + super(LoggingSocketIONamespace, self).on_event(event, *args) + + def on_error(self, data): + self._debug( + '%s[socket.io error] %s', _make_logging_header(self.path), data) + super(LoggingSocketIONamespace, self).on_error() + + +def find_callback(args, kw=None): + 'Return callback whether passed as a last argument or as a keyword' + if args and callable(args[-1]): + return args[-1], args[:-1] + try: + return kw['callback'], args + except (KeyError, TypeError): + return None, args + + +def _make_logging_header(path): + return path + ' ' if path else '' diff --git a/socketIO_client/parsers.py b/socketIO_client/parsers.py new file mode 100644 index 0000000..8da8ee8 --- /dev/null +++ b/socketIO_client/parsers.py @@ -0,0 +1,137 @@ +import json +from collections import namedtuple + +from .symmetries import ( + decode_string, encode_string, get_byte, get_character, parse_url) + + +EngineIOSession = namedtuple('EngineIOSession', [ + 'id', 'ping_interval', 'ping_timeout', 'transport_upgrades']) +SocketIOData = namedtuple('SocketIOData', ['path', 'ack_id', 'args']) + + +def parse_host(host, port, resource): + if not host.startswith('http'): + host = 'http://' + host + url_pack = parse_url(host) + is_secure = url_pack.scheme == 'https' + port = port or url_pack.port or (443 if is_secure else 80) + url = '%s:%d%s/%s' % (url_pack.hostname, port, url_pack.path, resource) + return is_secure, url + + +def parse_engineIO_session(engineIO_packet_data): + d = json.loads(decode_string(engineIO_packet_data)) + return EngineIOSession( + id=d['sid'], + ping_interval=d['pingInterval'] / float(1000), + ping_timeout=d['pingTimeout'] / float(1000), + transport_upgrades=d['upgrades']) + + +def encode_engineIO_content(engineIO_packets): + content = bytearray() + for packet_type, packet_data in engineIO_packets: + packet_text = format_packet_text(packet_type, packet_data) + content.extend(_make_packet_prefix(packet_text) + packet_text) + return content + + +def decode_engineIO_content(content): + content_index = 0 + content_length = len(content) + while content_index < content_length: + try: + content_index, packet_length = _read_packet_length( + content, content_index) + except IndexError: + break + content_index, packet_text = _read_packet_text( + content, content_index, packet_length) + engineIO_packet_type, engineIO_packet_data = parse_packet_text( + packet_text) + yield engineIO_packet_type, engineIO_packet_data + + +def format_socketIO_packet_data(path=None, ack_id=None, args=None): + socketIO_packet_data = json.dumps(args, ensure_ascii=False) if args else '' + if ack_id is not None: + socketIO_packet_data = str(ack_id) + socketIO_packet_data + if path: + socketIO_packet_data = path + ',' + socketIO_packet_data + return socketIO_packet_data + + +def parse_socketIO_packet_data(socketIO_packet_data): + data = decode_string(socketIO_packet_data) + if data.startswith('/'): + try: + path, data = data.split(',', 1) + except ValueError: + path = data + data = '' + else: + path = '' + try: + ack_id_string, data = data.split('[', 1) + data = '[' + data + ack_id = int(ack_id_string) + except (ValueError, IndexError): + ack_id = None + try: + args = json.loads(data) + except ValueError: + args = [] + return SocketIOData(path=path, ack_id=ack_id, args=args) + + +def format_packet_text(packet_type, packet_data): + return encode_string(str(packet_type) + packet_data) + + +def parse_packet_text(packet_text): + packet_type = int(get_character(packet_text, 0)) + packet_data = packet_text[1:] + return packet_type, packet_data + + +def get_namespace_path(socketIO_packet_data): + if not socketIO_packet_data.startswith(b'/'): + return '' + # Loop incrementally in case there is binary data + parts = [] + for i in range(len(socketIO_packet_data)): + character = get_character(socketIO_packet_data, i) + if ',' == character: + break + parts.append(character) + return ''.join(parts) + + +def _make_packet_prefix(packet): + length_string = str(len(packet)) + header_digits = bytearray([0]) + for i in range(len(length_string)): + header_digits.append(ord(length_string[i]) - 48) + header_digits.append(255) + return header_digits + + +def _read_packet_length(content, content_index): + while get_byte(content, content_index) != 0: + content_index += 1 + content_index += 1 + packet_length_string = '' + byte = get_byte(content, content_index) + while byte != 255: + packet_length_string += str(byte) + content_index += 1 + byte = get_byte(content, content_index) + return content_index, int(packet_length_string) + + +def _read_packet_text(content, content_index, packet_length): + while get_byte(content, content_index) == 255: + content_index += 1 + packet_text = content[content_index:content_index + packet_length] + return content_index + packet_length, packet_text diff --git a/socketIO_client/symmetries.py b/socketIO_client/symmetries.py index eceecdc..f100ec6 100644 --- a/socketIO_client/symmetries.py +++ b/socketIO_client/symmetries.py @@ -1,5 +1,29 @@ -def _get_text(response): - try: - return response.text # requests 2.7.0 - except AttributeError: - return response.content # requests 0.8.2 +import six +try: + from urllib import urlencode as format_query +except ImportError: + from urllib.parse import urlencode as format_query +try: + from urlparse import urlparse as parse_url +except ImportError: + from urllib.parse import urlparse as parse_url +try: + memoryview = memoryview +except NameError: + memoryview = buffer + + +def get_character(x, index): + return chr(get_byte(x, index)) + + +def get_byte(x, index): + return six.indexbytes(x, index) + + +def encode_string(x): + return x.encode('utf-8') + + +def decode_string(x): + return x.decode('utf-8') diff --git a/socketIO_client/tests.py b/socketIO_client/tests/__init__.py similarity index 67% rename from socketIO_client/tests.py rename to socketIO_client/tests/__init__.py index 400d626..bcfc00e 100644 --- a/socketIO_client/tests.py +++ b/socketIO_client/tests/__init__.py @@ -2,12 +2,11 @@ import logging import time from unittest import TestCase -from . import SocketIO, LoggingNamespace, find_callback -from .transports import TIMEOUT_IN_SECONDS +from .. import SocketIO, LoggingNamespace, find_callback HOST = 'localhost' -PORT = 8000 +PORT = 9000 DATA = 'xxx' PAYLOAD = {'xxx': 'yyy'} logging.basicConfig(level=logging.DEBUG) @@ -16,68 +15,23 @@ logging.basicConfig(level=logging.DEBUG) class BaseMixin(object): def setUp(self): + super(BaseMixin, self).setUp() self.called_on_response = False + self.wait_time_in_seconds = 1 def tearDown(self): - del self.socketIO - - def on_response(self, *args): - for arg in args: - if isinstance(arg, dict): - self.assertEqual(arg, PAYLOAD) - else: - self.assertEqual(arg, DATA) - self.called_on_response = True + super(BaseMixin, self).tearDown() + self.socketIO.disconnect() def test_disconnect(self): 'Disconnect' - self.socketIO.define(LoggingNamespace) + namespace = self.socketIO.define(Namespace) self.assertTrue(self.socketIO.connected) + self.assertFalse(namespace.called_on_disconnect) self.socketIO.disconnect() - self.assertFalse(self.socketIO.connected) - # Use context manager - with SocketIO(HOST, PORT, Namespace) as self.socketIO: - namespace = self.socketIO.get_namespace() - self.assertFalse(namespace.called_on_disconnect) - self.assertTrue(self.socketIO.connected) self.assertTrue(namespace.called_on_disconnect) self.assertFalse(self.socketIO.connected) - def test_message(self): - 'Message' - namespace = self.socketIO.define(Namespace) - self.socketIO.message() - self.socketIO.wait(self.wait_time_in_seconds) - self.assertEqual(namespace.response, 'message_response') - - def test_message_with_data(self): - 'Message with data' - namespace = self.socketIO.define(Namespace) - self.socketIO.message(DATA) - self.socketIO.wait(self.wait_time_in_seconds) - self.assertEqual(namespace.response, DATA) - - def test_message_with_payload(self): - 'Message with payload' - namespace = self.socketIO.define(Namespace) - self.socketIO.message(PAYLOAD) - self.socketIO.wait(self.wait_time_in_seconds) - self.assertEqual(namespace.response, PAYLOAD) - - def test_message_with_callback(self): - 'Message with callback' - self.socketIO.define(LoggingNamespace) - self.socketIO.message(callback=self.on_response) - self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) - self.assertTrue(self.called_on_response) - - def test_message_with_callback_with_data(self): - 'Message with callback with data' - self.socketIO.define(LoggingNamespace) - self.socketIO.message(DATA, self.on_response) - self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) - self.assertTrue(self.called_on_response) - def test_emit(self): 'Emit' namespace = self.socketIO.define(Namespace) @@ -107,14 +61,12 @@ class BaseMixin(object): def test_emit_with_callback(self): 'Emit with callback' - self.socketIO.define(LoggingNamespace) self.socketIO.emit('emit_with_callback', self.on_response) self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) self.assertTrue(self.called_on_response) def test_emit_with_callback_with_payload(self): 'Emit with callback with payload' - self.socketIO.define(LoggingNamespace) self.socketIO.emit( 'emit_with_callback_with_payload', self.on_response) self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) @@ -122,7 +74,6 @@ class BaseMixin(object): def test_emit_with_callback_with_multiple_payloads(self): 'Emit with callback with multiple payloads' - self.socketIO.define(LoggingNamespace) self.socketIO.emit( 'emit_with_callback_with_multiple_payloads', self.on_response) self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) @@ -135,20 +86,34 @@ class BaseMixin(object): self.socketIO.wait(self.wait_time_in_seconds) self.assertTrue(self.called_on_response) - def test_ack(self): - 'Trigger server callback' + def test_send(self): + 'Send' namespace = self.socketIO.define(Namespace) - self.socketIO.emit('ack', PAYLOAD) + self.socketIO.send() + self.socketIO.wait(self.wait_time_in_seconds) + self.assertEqual(namespace.response, 'message_response') + + def test_send_with_data(self): + 'Send with data' + namespace = self.socketIO.define(Namespace) + self.socketIO.send(DATA) + self.socketIO.wait(self.wait_time_in_seconds) + self.assertEqual(namespace.response, DATA) + + def test_ack(self): + 'Respond to a server callback request' + namespace = self.socketIO.define(Namespace) + self.socketIO.emit('trigger_server_expects_callback', PAYLOAD) self.socketIO.wait(self.wait_time_in_seconds) self.assertEqual(namespace.args_by_event, { - 'ack_response': (PAYLOAD,), - 'ack_callback_response': (PAYLOAD,), + 'server_expects_callback': (PAYLOAD,), + 'server_received_callback': (PAYLOAD,), }) def test_wait_with_disconnect(self): 'Exit loop when the client wants to disconnect' self.socketIO.define(Namespace) - self.socketIO.emit('wait_with_disconnect') + self.socketIO.disconnect() timeout_in_seconds = 5 start_time = time.time() self.socketIO.wait(timeout_in_seconds) @@ -168,52 +133,54 @@ class BaseMixin(object): }) def test_namespace_ack(self): - 'Trigger server callback' + 'Respond to a server callback request within a namespace' chat_namespace = self.socketIO.define(Namespace, '/chat') - chat_namespace.emit('ack', PAYLOAD) + chat_namespace.emit('trigger_server_expects_callback', PAYLOAD) self.socketIO.wait(self.wait_time_in_seconds) self.assertEqual(chat_namespace.args_by_event, { - 'ack_response': (PAYLOAD,), - 'ack_callback_response': (PAYLOAD,), + 'server_expects_callback': (PAYLOAD,), + 'server_received_callback': (PAYLOAD,), }) - -class Test_WebsocketTransport(TestCase, BaseMixin): - - def setUp(self): - super(Test_WebsocketTransport, self).setUp() - self.socketIO = SocketIO(HOST, PORT, transports=['websocket']) - self.wait_time_in_seconds = 0.1 + def on_response(self, *args): + for arg in args: + if isinstance(arg, dict): + self.assertEqual(arg, PAYLOAD) + else: + self.assertEqual(arg, DATA) + self.called_on_response = True -class Test_XHR_PollingTransport(TestCase, BaseMixin): +class Test_XHR_PollingTransport(BaseMixin, TestCase): def setUp(self): super(Test_XHR_PollingTransport, self).setUp() - self.socketIO = SocketIO(HOST, PORT, transports=['xhr-polling']) - self.wait_time_in_seconds = TIMEOUT_IN_SECONDS + 1 + self.socketIO = SocketIO(HOST, PORT, LoggingNamespace, transports=[ + 'xhr-polling'], verify=False) + self.assertEqual(self.socketIO.transport_name, 'xhr-polling') -class Test_JSONP_PollingTransport(TestCase, BaseMixin): +class Test_WebsocketTransport(BaseMixin, TestCase): def setUp(self): - super(Test_JSONP_PollingTransport, self).setUp() - self.socketIO = SocketIO(HOST, PORT, transports=['jsonp-polling']) - self.wait_time_in_seconds = TIMEOUT_IN_SECONDS + 1 + super(Test_WebsocketTransport, self).setUp() + self.socketIO = SocketIO(HOST, PORT, LoggingNamespace, transports=[ + 'xhr-polling', 'websocket'], verify=False) + self.assertEqual(self.socketIO.transport_name, 'websocket') class Namespace(LoggingNamespace): def initialize(self): - self.response = None - self.args_by_event = {} self.called_on_disconnect = False + self.args_by_event = {} + self.response = None def on_disconnect(self): self.called_on_disconnect = True - def on_message(self, data): - self.response = data + def on_wait_with_disconnect_response(self): + self.disconnect() def on_event(self, event, *args): callback, args = find_callback(args) @@ -221,5 +188,5 @@ class Namespace(LoggingNamespace): callback(*args) self.args_by_event[event] = args - def on_wait_with_disconnect_response(self): - self.disconnect() + def on_message(self, data): + self.response = data diff --git a/socketIO_client/tests/index.html b/socketIO_client/tests/index.html new file mode 100644 index 0000000..f3c03f5 --- /dev/null +++ b/socketIO_client/tests/index.html @@ -0,0 +1,25 @@ + + diff --git a/socketIO_client/tests/proxy.js b/socketIO_client/tests/proxy.js new file mode 100644 index 0000000..6179788 --- /dev/null +++ b/socketIO_client/tests/proxy.js @@ -0,0 +1,36 @@ +var proxy = require('http-proxy').createProxyServer({ + target: {host: 'localhost', port: 9000} +}).on('error', function(err, req, res) { + console.log('[ERROR] %s', err); + res.end(); +}); +var server = require('http').createServer(function(req, res) { + console.log('[REQUEST.%s] %s', req.method, req.url); + console.log(req['headers']); + if (req.method == 'POST') { + var body = ''; + req.on('data', function (data) { + body += data; + }); + req.on('end', function () { + print_body('[REQUEST.BODY] ', body); + }); + } + var write = res.write; + res.write = function(data) { + print_body('[RESPONSE.BODY] ', data); + write.call(res, data); + } + proxy.web(req, res); +}); +function print_body(header, body) { + var text = String(body); + console.log(header + text); + if (text.charCodeAt(0) != 0) return; + for (var i = 0; i < text.length; i++) { + var character_code = text.charCodeAt(i); + console.log('body[%s] = %s = %s', i, text[i], character_code); + if (character_code == 65533) break; + } +} +server.listen(8000); diff --git a/socketIO_client/tests/serve.js b/socketIO_client/tests/serve.js new file mode 100644 index 0000000..05907b1 --- /dev/null +++ b/socketIO_client/tests/serve.js @@ -0,0 +1,98 @@ +// DEBUG=* node serve.js + +var argv = require('yargs').argv; +if (argv.secure) { + var fs = require('fs'); + var path = require('path'); + var app = require('https').createServer({ + key: fs.readFileSync(path.resolve(__dirname, 'ssl.key')), + cert: fs.readFileSync(path.resolve(__dirname, 'ssl.crt')) + }, serve); +} else { + var app = require('http').createServer(serve); +} +app.listen(9000); + +var io = require('socket.io')(app); +var PAYLOAD = {'xxx': 'yyy'}; + +io.on('connection', function(socket) { + socket.on('message', function(data, fn) { + if (fn) { + // Client requests callback + if (data) { + fn(data); + } else { + fn(); + } + } else if (typeof data === 'object') { + // Data has type object or is null + socket.json.send(data ? data : 'message_response'); + } else { + // Data has type string or is '' + socket.send(data ? data : 'message_response'); + } + }); + socket.on('emit', function() { + socket.emit('emit_response'); + }); + socket.on('emit_with_payload', function(payload) { + socket.emit('emit_with_payload_response', payload); + }); + socket.on('emit_with_multiple_payloads', function(payload1, payload2) { + socket.emit('emit_with_multiple_payloads_response', payload1, payload2); + }); + socket.on('emit_with_callback', function(fn) { + fn(); + }); + socket.on('emit_with_callback_with_payload', function(fn) { + fn(PAYLOAD); + }); + socket.on('emit_with_callback_with_multiple_payloads', function(fn) { + fn(PAYLOAD, PAYLOAD); + }); + socket.on('emit_with_event', function(payload) { + socket.emit('emit_with_event_response', payload); + }); + socket.on('trigger_server_expects_callback', function(payload) { + socket.emit('server_expects_callback', payload, function(payload) { + socket.emit('server_received_callback', payload); + }); + }); + socket.on('aaa', function() { + socket.emit('aaa_response', PAYLOAD); + }); + socket.on('bbb', function(payload, fn) { + if (fn) fn(payload); + }); +}); + +io.of('/chat').on('connection', function(socket) { + socket.on('emit_with_payload', function(payload) { + socket.emit('emit_with_payload_response', payload); + }); + socket.on('aaa', function() { + socket.emit('aaa_response', 'in chat'); + }); + socket.on('trigger_server_expects_callback', function(payload) { + socket.emit('server_expects_callback', payload, function(payload) { + socket.emit('server_received_callback', payload); + }); + }); +}); + +io.of('/news').on('connection', function(socket) { + socket.on('emit_with_payload', function(payload) { + socket.emit('emit_with_payload_response', payload); + }); + socket.on('aaa', function() { + socket.emit('aaa_response', 'in news'); + }); +}); + +function serve(req, res) { + fs.readFile(__dirname + '/index.html', function(err, data) { + res.writeHead(200); + res.end(data); + }); +} diff --git a/socketIO_client/tests/ssl.crt b/socketIO_client/tests/ssl.crt new file mode 100644 index 0000000..711ac71 --- /dev/null +++ b/socketIO_client/tests/ssl.crt @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDZTCCAk2gAwIBAgIJAK1HKQ8zF3cCMA0GCSqGSIb3DQEBBQUAMEkxCzAJBgNV +BAYTAlVTMQswCQYDVQQIDAJOWTELMAkGA1UEBwwCTlkxDDAKBgNVBAoMA1hZWjES +MBAGA1UEAwwJbG9jYWxob3N0MB4XDTE1MDQxNTE5NDUwNFoXDTE2MDQxNDE5NDUw +NFowSTELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAk5ZMQswCQYDVQQHDAJOWTEMMAoG +A1UECgwDWFlaMRIwEAYDVQQDDAlsb2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUA +A4IBDwAwggEKAoIBAQDAQUM9+xbiDeJXg+7X6HgXwla2AnGKWbZ11hZZUYbQwHyq +ABDSqRQXVWvzac6b59/trZiJ7cQEH4+c8ln1C4qbCLvr1aWkL1BDAtSbFUFhQ2Sb +R/xkSUpq35yTuR5+oHgahDg1gbgXgPhB3Y6HoBlYMSpSUKF+INu354kxfYi0t4tP +8f309KUe6eQH3gXgTBR7pPJEUpaPOsrk6UR3cHCMqyzHulyfhgvkk5FN+EtSR9ex +dIrF6WXmfynhsAa/+bxbsgeBF9MNj3zvckCzxdQStdqOvy0mu40/7i9vwguh9cRo +HDn6lx5EaE+gSGU48UNnKX5iQdqEhprNVDj31MiJAgMBAAGjUDBOMB0GA1UdDgQW +BBRkFsPxYU+e6ZSFwmzoS45qiOzAaDAfBgNVHSMEGDAWgBRkFsPxYU+e6ZSFwmzo +S45qiOzAaDAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBBQUAA4IBAQB4JyOA5bZ3 +NbkMvOjpDw+tKzcNiXZIkGfoQ8NC1DbGVhgG7Ps4VjXgUB552YSUV7iwyd3G78OC ++cxEcr+BvxXHXL2Wlxy0c/ZgBjRI5VnGbYQjjI2Iy2qJV+x5IR2oZatv45soZSLq +NFCg2KpOgcSRgs0oDGVBYO0d9m73s/kOySj2NGqVJsaQXqXtLWKnqToaCfl4Vnl+ +zcMdUv8ajBZEPRg6oNi2QIvcNT8fS5gd/T4OXBa7pYuC79yOZ1X6bkKsZrcAdNGM +zO/jH6jKFjIBBx1Of+uZTzfAj/eoTu3foPuUQ+Z9NNE2nkE6SLyBSlxE7wD+SfjS +4/J0PNj22Uh3 +-----END CERTIFICATE----- diff --git a/socketIO_client/tests/ssl.key b/socketIO_client/tests/ssl.key new file mode 100644 index 0000000..5717306 --- /dev/null +++ b/socketIO_client/tests/ssl.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDAQUM9+xbiDeJX +g+7X6HgXwla2AnGKWbZ11hZZUYbQwHyqABDSqRQXVWvzac6b59/trZiJ7cQEH4+c +8ln1C4qbCLvr1aWkL1BDAtSbFUFhQ2SbR/xkSUpq35yTuR5+oHgahDg1gbgXgPhB +3Y6HoBlYMSpSUKF+INu354kxfYi0t4tP8f309KUe6eQH3gXgTBR7pPJEUpaPOsrk +6UR3cHCMqyzHulyfhgvkk5FN+EtSR9exdIrF6WXmfynhsAa/+bxbsgeBF9MNj3zv +ckCzxdQStdqOvy0mu40/7i9vwguh9cRoHDn6lx5EaE+gSGU48UNnKX5iQdqEhprN +VDj31MiJAgMBAAECggEANAFzbxC83+lhkMrfkQgRdFvdmN6QWBxsfvOql/61uUJY +dqQN6O5TwPwad33npcTTjjenS6hFndfrwUjNjLvSgp2aN/FTHVavH3FkkY7uYKEa +VebjHz20I7TZZhxtY1OFKacajV7JrZH1lduY8pccQ/8Is7ub88JvrQ+0zO5oTHnh +KEPYY5r2wLxKrzGm0NavRW9MpiHxz1vUGykvaGq9vR8dVFvZlLC5szYII+BvlII+ +78XMnZbJ9ahT7dzfnzPdPPuyP3m4cdJ9c+7Advs0g2F3K/IDL3jZZCRZIaLxHIs0 +PeI17teW0OmK4RWrnf6dSf0bww05x5In8GzUYgppAQKBgQD4lJVi3UmAB319CGeP +NE4cZFuMneNsuCkNEJONb8Cfsah2maM0if8tUNoR96JorjWgkUTG4oThGSQgJQw8 +fPy6cW4EUhSvWCO+Q4MFFWpTcf0hBiz5O1d06FHVo39o8Ct9dv2bxJqfNtCUUf31 +Fz5tvA+wvByOSazUC3AowQZ6FwKBgQDF/ksJbOBd/bu3ss7eJRjE2sXmuhfxrUiu +P5RoOEqHROAifatJk/3hwT6lx2y1g+3kJpZm9V16dNTkcuybL0yJ/VBE3uWuodrj +i9+wcg8XSnRp3BPVKzebKIKDTMdypOeb1f5yhx6cCtChRm1frKQdoXpMQqptM0jq +w3B4bryWXwKBgQCWSv+nLrPpvJ2aoyI56x3u/J59fliquw3W4FbWBOMpqnh4fJu4 +gFbQRzoR8u82611xH2O9++brUhANf1jOmaMT9tDVu+rVuSyjNJ5azH/kw96PwPQg +HEjcXjpcOOYnxE4HJZJgQ5ZY/QNPKeOp88vC/RlfedyqCtF7ww6lFU+dMQKBgQC2 +M7ut4sne9R8If74rZAwVLBauq1ZZi1O1NsFF33eGX/W7B9bXER+z3vfd61W4/L2x +FWmXOflaNaWsza27aZ2P5tM1bcIEIOKkQBYL9Aq7LkNPH74Ij4rOeEsStVddwy94 +k0di8cFTbAhuQbdpMiCdO/qlrzvS3j0d/djEm3NlFQKBgQCpIrHaMcckCFsf2Y6o +zMnbi3859hve94OOJjauQLlw/nRE/+OaDsDN8iJoxnK0seek8ro1ixSBTScpuX8W +G2DBgqs9NrSQLe6FAckkGqVJdluoh5GewNneAcowkkauj2srnb6XtJDhFtTDY141 +EPbeqGB9PUY9Ny8VzHkAb1vi6g== +-----END PRIVATE KEY----- diff --git a/socketIO_client/transports.py b/socketIO_client/transports.py index e479cd1..58824a1 100644 --- a/socketIO_client/transports.py +++ b/socketIO_client/transports.py @@ -1,358 +1,200 @@ -import codecs -import json -import logging -import re import requests import six import socket +import ssl import sys +import threading import time import websocket -import re +from six import string_types from .exceptions import ConnectionError, TimeoutError -from .symmetries import _get_text +from .parsers import ( + encode_engineIO_content, decode_engineIO_content, + format_packet_text, parse_packet_text) +from .symmetries import format_query, memoryview, parse_url if not hasattr(websocket, 'create_connection'): - sys.exit("""Incompatible websocket implementation -- Please make sure that you have websocket-client installed -- Please remove other websocket implementations""") + sys.exit("""\ +An incompatible websocket library is conflicting with the one we need. +You can remove the incompatible library and install the correct one +by running the following commands: + +yes | pip uninstall websocket websocket-client +pip install -U websocket-client""") -TRANSPORTS = 'websocket', 'xhr-polling', 'jsonp-polling' -BOUNDARY = six.u('\ufffd') -TIMEOUT_IN_SECONDS = 3 -_log = logging.getLogger(__name__) -escape_unicode = lambda x: codecs.getdecoder('unicode_escape')(x)[0] -try: - unicode -except NameError: - encode_unicode = lambda x: x -else: - encode_unicode = lambda x: unicode(x).encode('utf-8') +ENGINEIO_PROTOCOL = 3 +TRANSPORTS = 'xhr-polling', 'websocket' -class _AbstractTransport(object): +class AbstractTransport(object): - def __init__(self): - self._packet_id = 0 - self._callback_by_packet_id = {} - self._wants_to_disconnect = False - self._packets = [] + def __init__(self, http_session, is_secure, url, engineIO_session=None): + self.http_session = http_session + self.is_secure = is_secure + self.url = url + self.engineIO_session = engineIO_session - def _log(self, level, msg, *attrs): - _log.log(level, '[%s] %s' % (self._url, msg), *attrs) + def recv_packet(self): + pass - def disconnect(self, path=''): - if not path: - self._wants_to_disconnect = True - if not self.connected: - return - if path: - self.send_packet(0, path) + def send_packet(self, engineIO_packet_type, engineIO_packet_data=''): + pass + + def set_timeout(self, seconds=None): + pass + + +class XHR_PollingTransport(AbstractTransport): + + def __init__(self, http_session, is_secure, url, engineIO_session=None): + super(XHR_PollingTransport, self).__init__( + http_session, is_secure, url, engineIO_session) + self._params = { + 'EIO': ENGINEIO_PROTOCOL, 'transport': 'polling'} + if engineIO_session: + self._request_index = 1 + self._kw_get = dict( + timeout=engineIO_session.ping_timeout) + self._kw_post = dict( + timeout=engineIO_session.ping_timeout, + headers={'content-type': 'application/octet-stream'}) + self._params['sid'] = engineIO_session.id else: - self.close() + self._request_index = 0 + self._kw_get = {} + self._kw_post = {} + http_scheme = 'https' if is_secure else 'http' + self._http_url = '%s://%s/' % (http_scheme, url) + self._request_index_lock = threading.Lock() + self._send_packet_lock = threading.Lock() - def connect(self, path): - self.send_packet(1, path) + def recv_packet(self): + params = dict(self._params) + params['t'] = self._get_timestamp() + response = get_response( + self.http_session.get, + self._http_url, + params=params, + **self._kw_get) + for engineIO_packet in decode_engineIO_content(response.content): + engineIO_packet_type, engineIO_packet_data = engineIO_packet + yield engineIO_packet_type, engineIO_packet_data - def send_heartbeat(self): - self.send_packet(2) + def send_packet(self, engineIO_packet_type, engineIO_packet_data=''): + with self._send_packet_lock: + params = dict(self._params) + params['t'] = self._get_timestamp() + data = encode_engineIO_content([ + (engineIO_packet_type, engineIO_packet_data), + ]) + response = get_response( + self.http_session.post, + self._http_url, + params=params, + data=memoryview(data), + **self._kw_post) + assert response.content == b'ok' - def message(self, path, data, callback): - if isinstance(data, six.string_types): - code = 3 - else: - code = 4 - data = json.dumps(data, ensure_ascii=False) - self.send_packet(code, path, data, callback) + def _get_timestamp(self): + with self._request_index_lock: + timestamp = '%s-%s' % ( + int(time.time() * 1000), self._request_index) + self._request_index += 1 + return timestamp - def emit(self, path, event, args, callback): - data = json.dumps(dict(name=event, args=args), ensure_ascii=False) - self.send_packet(5, path, data, callback) - def ack(self, path, packet_id, *args): - packet_id = packet_id.rstrip('+') - data = '%s+%s' % ( - packet_id, - json.dumps(args, ensure_ascii=False), - ) if args else packet_id - self.send_packet(6, path, data) +class WebsocketTransport(AbstractTransport): - def noop(self, path=''): - self.send_packet(8, path) - - def send_packet(self, code, path='', data='', callback=None): - packet_id = self.set_ack_callback(callback) if callback else '' - packet_parts = str(code), packet_id, path, encode_unicode(data) - if not data: - packet_parts = packet_parts[:-1] - packet_text = ':'.join(packet_parts) - self.send(packet_text) - self._log(logging.DEBUG, '[packet sent] %s', packet_text) - - def recv_packet(self, timeout=None): + def __init__(self, http_session, is_secure, url, engineIO_session=None): + super(WebsocketTransport, self).__init__( + http_session, is_secure, url, engineIO_session) + params = dict(http_session.params, **{ + 'EIO': ENGINEIO_PROTOCOL, 'transport': 'websocket'}) + request = http_session.prepare_request(requests.Request('GET', url)) + kw = {'header': ['%s: %s' % x for x in request.headers.items()]} + if engineIO_session: + params['sid'] = engineIO_session.id + kw['timeout'] = self._timeout = engineIO_session.ping_timeout + ws_url = '%s://%s/?%s' % ( + 'wss' if is_secure else 'ws', url, format_query(params)) + http_scheme = 'https' if is_secure else 'http' + if http_scheme in http_session.proxies: # Use the correct proxy + proxy_url_pack = parse_url(http_session.proxies[http_scheme]) + kw['http_proxy_host'] = proxy_url_pack.hostname + kw['http_proxy_port'] = proxy_url_pack.port + if proxy_url_pack.username: + kw['http_proxy_auth'] = ( + proxy_url_pack.username, proxy_url_pack.password) + if http_session.verify: + if http_session.cert: # Specify certificate path on disk + if isinstance(http_session.cert, string_types): + kw['ca_certs'] = http_session.cert + else: + kw['ca_certs'] = http_session.cert[0] + else: # Do not verify the SSL certificate + kw['sslopt'] = {'cert_reqs': ssl.CERT_NONE} try: - while self._packets: - yield self._packets.pop(0) - except IndexError: - pass - for packet_texts in self.recv(timeout=timeout): - #remove packet separator - packet_texts = re.sub('^\xef\xbf\xbd\w+\xef\xbf\xbd', '', - packet_texts) - - packets = packet_texts.split('\xef\xbf\xbd')[::2] - - for packet_text in packets: - self._log(logging.DEBUG, '[packet received] %s', packet_text) - sep_count = packet_text.count('\xef\xbf\xbd')/2 - - try: - packet_parts = packet_text.split(':', 3) - except AttributeError: - self._log(logging.WARNING, '[packet error] %s', packet_text) - continue - code, packet_id, path, data = None, None, None, None - packet_count = len(packet_parts) - if 4 == packet_count: - code, packet_id, path, data = packet_parts - elif 3 == packet_count: - code, packet_id, path = packet_parts - elif 1 == packet_count: - code = packet_parts[0] - if code and len(code) > 1: - code = code[-1] - - yield code, packet_id, path, data - - def _enqueue_packet(self, packet): - self._packets.append(packet) - - def set_ack_callback(self, callback): - 'Set callback to be called after server sends an acknowledgment' - self._packet_id += 1 - self._callback_by_packet_id[str(self._packet_id)] = callback - return '%s+' % self._packet_id - - def get_ack_callback(self, packet_id): - 'Get callback to be called after server sends an acknowledgment' - callback = self._callback_by_packet_id[packet_id] - del self._callback_by_packet_id[packet_id] - return callback - - @property - def has_ack_callback(self): - return True if self._callback_by_packet_id else False - - -class _WebsocketTransport(_AbstractTransport): - - def __init__(self, socketIO_session, is_secure, base_url, **kw): - super(_WebsocketTransport, self).__init__() - url = '%s://%s/websocket/%s' % ( - 'wss' if is_secure else 'ws', - base_url, socketIO_session.id) - self._url = url - http_session = _prepare_http_session(kw) - req = http_session.prepare_request(requests.Request('GET', url)) - headers = ['%s: %s' % item for item in req.headers.items()] - headers.append('Connection: keep-alive') - - try: - self._connection = websocket.create_connection(url, header=headers) - except socket.timeout as e: + self._connection = websocket.create_connection(ws_url, **kw) + except Exception as e: raise ConnectionError(e) - except socket.error as e: - raise ConnectionError(e) - self._connection.settimeout(TIMEOUT_IN_SECONDS) - @property - def connected(self): - return self._connection.connected - - def send(self, packet_text): + def recv_packet(self): try: - self._connection.send(packet_text) + packet_text = self._connection.recv() except websocket.WebSocketTimeoutException as e: - message = 'timed out while sending %s (%s)' % (packet_text, e) - self._log(logging.WARNING, message) - raise TimeoutError(e) - except socket.error as e: - message = 'disconnected while sending %s (%s)' % (packet_text, e) - self._log(logging.WARNING, message) - raise ConnectionError(message) - - def recv(self, timeout=None): - if timeout: - self._connection.settimeout(timeout) - try: - yield self._connection.recv() - except websocket.WebSocketTimeoutException as e: - raise TimeoutError(e) + raise TimeoutError('recv timed out (%s)' % e) except websocket.SSLError as e: - if 'timed out' in e.message: - raise TimeoutError(e) - else: - raise ConnectionError(e) + raise ConnectionError('recv disconnected by SSL (%s)' % e) except websocket.WebSocketConnectionClosedException as e: - raise ConnectionError('connection closed (%s)' % e) + raise ConnectionError('recv disconnected (%s)' % e) except socket.error as e: - raise ConnectionError(e) + raise ConnectionError('recv disconnected (%s)' % e) + engineIO_packet_type, engineIO_packet_data = parse_packet_text( + six.b(packet_text)) + yield engineIO_packet_type, engineIO_packet_data - def close(self): - self._connection.close() - - -class _XHR_PollingTransport(_AbstractTransport): - - def __init__(self, socketIO_session, is_secure, base_url, **kw): - super(_XHR_PollingTransport, self).__init__() - self._url = '%s://%s/xhr-polling/%s' % ( - 'https' if is_secure else 'http', - base_url, socketIO_session.id) - self._connected = True - self._http_session = _prepare_http_session(kw) - # Create connection - for packet in self.recv_packet(): - self._enqueue_packet(packet) - - @property - def connected(self): - return self._connected - - @property - def _params(self): - return dict(t=int(time.time())) - - def send(self, packet_text): - _get_response( - self._http_session.post, - self._url, - params=self._params, - data=packet_text, - timeout=TIMEOUT_IN_SECONDS) - - def recv(self, timeout=None): - response = _get_response( - self._http_session.get, - self._url, - params=self._params, - timeout=timeout or TIMEOUT_IN_SECONDS, - stream=True) - response_text = _get_text(response) - if not response_text.startswith(BOUNDARY): - yield response_text - return - for packet_text in _yield_text_from_framed_data(response_text): - yield packet_text - - def close(self): - _get_response( - self._http_session.get, - self._url, - params=dict(list(self._params.items()) + [('disconnect', True)])) - self._connected = False - - -class _JSONP_PollingTransport(_AbstractTransport): - - RESPONSE_PATTERN = re.compile(r'io.j\[(\d+)\]\("(.*)"\);') - - def __init__(self, socketIO_session, is_secure, base_url, **kw): - super(_JSONP_PollingTransport, self).__init__() - self._url = '%s://%s/jsonp-polling/%s' % ( - 'https' if is_secure else 'http', - base_url, socketIO_session.id) - self._connected = True - self._http_session = _prepare_http_session(kw) - self._id = 0 - # Create connection - for packet in self.recv_packet(): - self._enqueue_packet(packet) - - @property - def connected(self): - return self._connected - - @property - def _params(self): - return dict(t=int(time.time()), i=self._id) - - def send(self, packet_text): - _get_response( - self._http_session.post, - self._url, - params=self._params, - data='d=%s' % requests.utils.quote(json.dumps(packet_text)), - headers={'content-type': 'application/x-www-form-urlencoded'}, - timeout=TIMEOUT_IN_SECONDS) - - def recv(self, timeout=None): - 'Decode the JavaScript response so that we can parse it as JSON' - response = _get_response( - self._http_session.get, - self._url, - params=self._params, - headers={'content-type': 'text/javascript; charset=UTF-8'}, - timeout=timeout or TIMEOUT_IN_SECONDS) - response_text = _get_text(response) + def send_packet(self, engineIO_packet_type, engineIO_packet_data=''): + packet = format_packet_text(engineIO_packet_type, engineIO_packet_data) try: - self._id, response_text = self.RESPONSE_PATTERN.match( - response_text).groups() - except AttributeError: - self._log(logging.WARNING, '[packet error] %s', response_text) - return - if not response_text.startswith(BOUNDARY): - yield escape_unicode(response_text) - return - for packet_text in _yield_text_from_framed_data( - response_text, escape_unicode): - yield packet_text + self._connection.send(packet) + except websocket.WebSocketTimeoutException as e: + raise TimeoutError('send timed out (%s)' % e) + except socket.error as e: + raise ConnectionError('send disconnected (%s)' % e) + except websocket.WebSocketConnectionClosedException as e: + raise ConnectionError('send disconnected (%s)' % e) - def close(self): - _get_response( - self._http_session.get, - self._url, - params=dict(list(self._params.items()) + [('disconnect', True)])) - self._connected = False + def set_timeout(self, seconds=None): + self._connection.settimeout(seconds or self._timeout) -def _yield_text_from_framed_data(framed_data, parse=lambda x: x): - parts = [parse(x) for x in framed_data.split(BOUNDARY)] - for text_length, text in zip(parts[1::2], parts[2::2]): - if text_length != str(len(text)): - warning = 'invalid declared length=%s for packet_text=%s' % ( - text_length, text) - _log.warn('[packet error] %s', warning) - continue - yield text - - -def _get_response(request, *args, **kw): +def get_response(request, *args, **kw): try: - response = request(*args, **kw) + response = request(*args, stream=True, **kw) except requests.exceptions.Timeout as e: raise TimeoutError(e) - except requests.exceptions.SSLError as e: - raise ConnectionError('could not negotiate SSL (%s)' % e) except requests.exceptions.ConnectionError as e: raise ConnectionError(e) - status = response.status_code - if 200 != status: - raise ConnectionError('unexpected status code (%s)' % status) + except requests.exceptions.SSLError as e: + raise ConnectionError('could not negotiate SSL (%s)' % e) + status_code = response.status_code + if 200 != status_code: + raise ConnectionError('unexpected status code (%s %s)' % ( + status_code, response.text)) return response -def _prepare_http_session(kw): +def prepare_http_session(kw): http_session = requests.Session() http_session.headers.update(kw.get('headers', {})) http_session.auth = kw.get('auth') http_session.proxies.update(kw.get('proxies', {})) http_session.hooks.update(kw.get('hooks', {})) http_session.params.update(kw.get('params', {})) - http_session.verify = kw.get('verify') + http_session.verify = kw.get('verify', True) http_session.cert = kw.get('cert') http_session.cookies.update(kw.get('cookies', {})) return http_session