Compare commits

..

No commits in common. "master" and "v0.4" have entirely different histories.

24 changed files with 731 additions and 1800 deletions

View file

@ -1,19 +0,0 @@
language: python
python:
- 2.6
- 2.7
- 3.4
before_install:
- sudo apt-get update
- sudo apt-get install nodejs
install:
- npm install -G socket.io
- npm install -G yargs
- pip install -U requests
- pip install -U six
- pip install -U websocket-client
- pip install -U coverage
before_script:
- DEBUG=* node socketIO_client/tests/serve.js &
- sleep 3
script: nosetests

View file

@ -1,57 +1,3 @@
0.6.5
-----
- Updated wait loop to be more responsive under websocket transport
0.6.4
-----
- Fixed support for Python 3
- Fixed thread cleanup
0.6.3
-----
- Upgraded to socket.io protocol 1.x for websocket transport
- Added locks to fix concurrency issues with polling transport
- Fixed SSL support
0.6.1
-----
- Upgraded to socket.io protocol 1.x thanks to Sean Arietta and Joe Palmer
0.5.6
-----
- Backported to support requests 0.8.2
0.5.5
-----
- Fixed reconnection in the event of server restart
- Fixed calling on_reconnect() so that it is actually called
- Set default Namespace=None
- Added support for Python 3.4
0.5.3
-----
- Updated wait loop to exit if the client wants to disconnect
- Fixed calling on_connect() so that it is called only once
- Set heartbeat_interval to be half of the heartbeat_timeout
0.5.2
-----
- Replaced secure=True with host='https://example.com'
- Fixed sending heartbeats thanks to Travis Odom
0.5.1
-----
- Added error handling in the event of websocket timeout
- Fixed sending acknowledgments in custom namespaces thanks to Travis Odom
0.5
---
- Rewrote library to use coroutines instead of threads to save memory
- Improved connection resilience
- Added support for xhr-polling thanks to Francis Bull
- Added support for jsonp-polling thanks to Bernard Pratz
- Added support for query params and cookies
0.4 0.4
--- ---
- Added support for custom headers and proxies thanks to Rui and Sajal - Added support for custom headers and proxies thanks to Rui and Sajal
@ -72,5 +18,7 @@
0.1 0.1
--- ---
- Wrapped `code from StackOverflow <http://stackoverflow.com/questions/6692908/formatting-messages-to-send-to-socket-io-node-js-server-from-python-client>`_ - Wrapped code from StackOverflow_
- Added exception handling to destructor in case of connection failure - Added exception handling to destructor in case of connection failure
.. _StackOverflow: http://stackoverflow.com/questions/6692908/formatting-messages-to-send-to-socket-io-node-js-server-from-python-client

18
LICENSE
View file

@ -1,19 +1,7 @@
Copyright (c) 2013 Roy Hyunjin Han and contributors Copyright (c) 2013 Roy Hyunjin Han and contributors
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View file

@ -1,3 +1,3 @@
recursive-include socketIO_client * recursive-include socketIOClient *
include *.html *.js *.rst include *.rst
global-exclude *.pyc global-exclude *.pyc

View file

@ -1,17 +1,11 @@
.. image:: https://travis-ci.org/invisibleroads/socketIO-client.svg?branch=master
:target: https://travis-ci.org/invisibleroads/socketIO-client
socketIO-client socketIO-client
=============== ===============
Here is a `socket.io <http://socket.io>`_ client library for Python. You can use it to write test code for your socket.io server. Here is a socket.io_ client library for Python. You can use it to write test code for your socket.io_ server.
Please note that this version implements `socket.io protocol 1.x <https://github.com/automattic/socket.io-protocol>`_, which is not backwards compatible. If you want to communicate using `socket.io protocol 0.9 <https://github.com/learnboost/socket.io-spec>`_ (which is compatible with `gevent-socketio <https://github.com/abourget/gevent-socketio>`_), please use `socketIO-client 0.5.6 <https://pypi.python.org/pypi/socketIO-client/0.5.6>`_.
Installation Installation
------------ ------------
Install the package in an isolated environment. :: ::
VIRTUAL_ENV=$HOME/.virtualenv VIRTUAL_ENV=$HOME/.virtualenv
@ -32,49 +26,33 @@ Activate isolated environment. ::
VIRTUAL_ENV=$HOME/.virtualenv VIRTUAL_ENV=$HOME/.virtualenv
source $VIRTUAL_ENV/bin/activate source $VIRTUAL_ENV/bin/activate
Launch your socket.io server. ::
# Get package folder
PACKAGE_FOLDER=`python -c "import os, socketIO_client;\
print(os.path.dirname(socketIO_client.__file__))"`
# Start socket.io server
DEBUG=* node $PACKAGE_FOLDER/tests/serve.js
# Start proxy server in a separate terminal on the same machine
DEBUG=* node $PACKAGE_FOLDER/tests/proxy.js
For debugging information, run these commands first. ::
import logging
logging.getLogger('requests').setLevel(logging.WARNING)
logging.basicConfig(level=logging.DEBUG)
Emit. :: Emit. ::
from socketIO_client import SocketIO, LoggingNamespace from socketIO_client import SocketIO
with SocketIO('localhost', 8000, LoggingNamespace) as socketIO: with SocketIO('localhost', 8000) as socketIO:
socketIO.emit('aaa') socketIO.emit('aaa')
socketIO.wait(seconds=1) socketIO.wait(seconds=1)
Emit with callback. :: Emit with callback. ::
from socketIO_client import SocketIO, LoggingNamespace from socketIO_client import SocketIO
def on_bbb_response(*args): def on_bbb_response(*args):
print('on_bbb_response', args) print 'on_bbb_response', args
with SocketIO('localhost', 8000, LoggingNamespace) as socketIO: with SocketIO('localhost', 8000) as socketIO:
socketIO.emit('bbb', {'xxx': 'yyy'}, on_bbb_response) socketIO.emit('bbb', {'xxx': 'yyy'}, on_bbb_response)
socketIO.wait_for_callbacks(seconds=1) socketIO.wait_for_callbacks(seconds=1)
Define events. :: Define events. ::
from socketIO_client import SocketIO, LoggingNamespace from socketIO_client import SocketIO
def on_aaa_response(*args): def on_aaa_response(*args):
print('on_aaa_response', args) print 'on_aaa_response', args
socketIO = SocketIO('localhost', 8000, LoggingNamespace) socketIO = SocketIO('localhost', 8000)
socketIO.on('aaa_response', on_aaa_response) socketIO.on('aaa_response', on_aaa_response)
socketIO.emit('aaa') socketIO.emit('aaa')
socketIO.wait(seconds=1) socketIO.wait(seconds=1)
@ -86,10 +64,11 @@ Define events in a namespace. ::
class Namespace(BaseNamespace): class Namespace(BaseNamespace):
def on_aaa_response(self, *args): def on_aaa_response(self, *args):
print('on_aaa_response', args) print 'on_aaa_response', args
self.emit('bbb') self.emit('bbb')
socketIO = SocketIO('localhost', 8000, Namespace) socketIO = SocketIO('localhost', 8000)
socketIO.define(Namespace)
socketIO.emit('aaa') socketIO.emit('aaa')
socketIO.wait(seconds=1) socketIO.wait(seconds=1)
@ -100,9 +79,10 @@ Define standard events. ::
class Namespace(BaseNamespace): class Namespace(BaseNamespace):
def on_connect(self): def on_connect(self):
print('[Connected]') print '[Connected]'
socketIO = SocketIO('localhost', 8000, Namespace) socketIO = SocketIO('localhost', 8000)
socketIO.define(Namespace)
socketIO.wait(seconds=1) socketIO.wait(seconds=1)
Define different namespaces on a single socket. :: Define different namespaces on a single socket. ::
@ -112,45 +92,36 @@ Define different namespaces on a single socket. ::
class ChatNamespace(BaseNamespace): class ChatNamespace(BaseNamespace):
def on_aaa_response(self, *args): def on_aaa_response(self, *args):
print('on_aaa_response', args) print 'on_aaa_response', args
class NewsNamespace(BaseNamespace): class NewsNamespace(BaseNamespace):
def on_aaa_response(self, *args): def on_aaa_response(self, *args):
print('on_aaa_response', args) print 'on_aaa_response', args
socketIO = SocketIO('localhost', 8000) socketIO = SocketIO('localhost', 8000)
chat_namespace = socketIO.define(ChatNamespace, '/chat') chatNamespace = socketIO.define(ChatNamespace, '/chat')
news_namespace = socketIO.define(NewsNamespace, '/news') newsNamespace = socketIO.define(NewsNamespace, '/news')
chat_namespace.emit('aaa') chatNamespace.emit('aaa')
news_namespace.emit('aaa') newsNamespace.emit('aaa')
socketIO.wait(seconds=1) socketIO.wait(seconds=1)
Connect via SSL. :: Open secure websockets (HTTPS / WSS) behind a proxy. ::
from socketIO_client import SocketIO from socketIO_client import SocketIO
SocketIO('https://localhost', verify=False) SocketIO('localhost', 8000,
secure=True,
proxies={'https': 'https://proxy.example.com:8080'})
Specify params, headers, cookies, proxies thanks to the `requests <http://python-requests.org>`_ library. :: Specify custom headers thanks to the `requests`_ library. ::
from socketIO_client import SocketIO from socketIO_client import SocketIO
from base64 import b64encode from base64 import b64encode
SocketIO( SocketIO('localhost', 8000,
localhost', 8000, headers={'Authorization': 'Basic ' + b64encode('username:password')})
params={'q': 'qqq'},
headers={'Authorization': 'Basic ' + b64encode('username:password')},
cookies={'a': 'aaa'},
proxies={'https': 'https://proxy.example.com:8080'})
Wait forever. ::
from socketIO_client import SocketIO
socketIO = SocketIO('localhost', 8000)
socketIO.wait()
License License
@ -160,11 +131,30 @@ This software is available under the MIT License.
Credits Credits
------- -------
- `Guillermo Rauch <https://github.com/rauchg>`_ wrote the `socket.io specification <https://github.com/automattic/socket.io-protocol>`_. - `Guillermo Rauch`_ wrote the `socket.io specification`_.
- `Hiroki Ohtani <https://github.com/liris>`_ wrote `websocket-client <https://github.com/liris/websocket-client>`_. - `Hiroki Ohtani`_ wrote websocket-client_.
- `rod <http://stackoverflow.com/users/370115/rod>`_ wrote a `prototype for a Python client to a socket.io server <http://stackoverflow.com/questions/6692908/formatting-messages-to-send-to-socket-io-node-js-server-from-python-client>`_. - rod_ wrote a `prototype for a Python client to a socket.io server`_ on StackOverflow.
- `Alexandre Bourget <https://github.com/abourget>`_ wrote `gevent-socketio <https://github.com/abourget/gevent-socketio>`_, which is a socket.io server written in Python. - `Alexandre Bourget`_ wrote gevent-socketio_, which is a socket.io server written in Python.
- `Paul Kienzle <https://github.com/pkienzle>`_, `Zac Lee <https://github.com/zratic>`_, `Josh VanderLinden <https://github.com/codekoala>`_, `Ian Fitzpatrick <https://github.com/ifitzpatrick>`_, `Lucas Klein <https://github.com/lukasklein>`_, `Rui Chicoria <https://github.com/rchicoria>`_, `Travis Odom <https://github.com/burstaholic>`_, `Patrick Huber <https://github.com/stackmagic>`_, `Brad Campbell <https://github.com/bradjc>`_, `Daniel <https://github.com/dabidan>`_, `Sean Arietta <https://github.com/sarietta>`_ submitted code to expand support of the socket.io protocol. - `Paul Kienzle`_, `Zac Lee`_, `Josh VanderLinden`_, `Ian Fitzpatrick`_, `Lucas Klein`_ submitted code to expand support of the socket.io protocol.
- `Bernard Pratz <https://github.com/guyzmo>`_, `Francis Bull <https://github.com/franbull>`_ wrote prototypes to support xhr-polling and jsonp-polling.
- `Eric Chen <https://github.com/taiyangc>`_, `Denis Zinevich <https://github.com/dzinevich>`_, `Thiago Hersan <https://github.com/thiagohersan>`_, `Nayef Copty <https://github.com/nayefc>`_, `Jörgen Karlsson <https://github.com/jorgen-k>`_, `Branden Ghena <https://github.com/brghena>`_, `Tim Landscheidt <https://github.com/scfc>`_, `Matt Porritt <https://github.com/mattporritt>`_ suggested ways to make the connection more robust.
- `Merlijn van Deen <https://github.com/valhallasw>`_, `Frederic Sureau <https://github.com/fredericsureau>`_, `Marcus Cobden <https://github.com/leth>`_, `Drew Hutchison <https://github.com/drewhutchison>`_, `wuurrd <https://github.com/wuurrd>`_, `Adam Kecer <https://github.com/amfg>`_, `Alex Monk <https://github.com/Krenair>`_, `Vishal P R <https://github.com/vishalwy>`_, `John Vandenberg <https://github.com/jayvdb>`_, `Thomas Grainger <https://github.com/graingert>`_ proposed changes that make the library more friendly and practical for you! .. _socket.io: http://socket.io
.. _requests: http://python-requests.org
.. _Guillermo Rauch: https://github.com/guille
.. _socket.io specification: https://github.com/LearnBoost/socket.io-spec
.. _Hiroki Ohtani: https://github.com/liris
.. _websocket-client: https://github.com/liris/websocket-client
.. _rod: http://stackoverflow.com/users/370115/rod
.. _prototype for a Python client to a socket.io server: http://stackoverflow.com/questions/6692908/formatting-messages-to-send-to-socket-io-node-js-server-from-python-client
.. _Alexandre Bourget: https://github.com/abourget
.. _gevent-socketio: https://github.com/abourget/gevent-socketio
.. _Paul Kienzle: https://github.com/pkienzle
.. _Zac Lee: https://github.com/zratic
.. _Josh VanderLinden: https://github.com/codekoala
.. _Ian Fitzpatrick: https://github.com/GraphEffect
.. _Lucas Klein: https://github.com/lukashed

View file

@ -1,48 +1,7 @@
= Consider supporting both protocols in the same library = Resolve pull requests
https://github.com/invisibleroads/socketIO-client/issues/95 Resolve issues
Add binary support Investigate issue #8
https://github.com/invisibleroads/socketIO-client/pull/85 Examine forks
https://github.com/invisibleroads/socketIO-client/issues/70 Integrate Sajal's fork #7
https://github.com/invisibleroads/socketIO-client/issues/71 Integrate Francis's fork #10
https://github.com/invisibleroads/socketIO-client/issues/91 Integrate Paul's fork
Check requests dependency
https://github.com/invisibleroads/socketIO-client/issues/92
https://github.com/invisibleroads/socketIO-client/pull/93
https://github.com/invisibleroads/socketIO-client/commit/b288d89c15d452a30bfeb00f38494f59f71f5a43
Check SSL
https://github.com/invisibleroads/socketIO-client/issues/86
https://github.com/invisibleroads/socketIO-client/pull/87
Look into invalid namespace handling
https://github.com/invisibleroads/socketIO-client/issues/84
Check unicode issues
https://github.com/invisibleroads/socketIO-client/issues/81
Check python3 support for socketIO-client 0.5.6
https://github.com/invisibleroads/socketIO-client/issues/83
Check OK assertion
https://github.com/invisibleroads/socketIO-client/issues/99
https://github.com/invisibleroads/socketIO-client/pull/103
Check why connected=True after termination
https://github.com/invisibleroads/socketIO-client/issues/80
https://github.com/invisibleroads/socketIO-client/issues/98
Consider catching heartbeat thread exception
https://github.com/invisibleroads/socketIO-client/issues/100
Check why it blocks when defining a namespace
https://github.com/invisibleroads/socketIO-client/issues/96
Check why transports are not being set
https://github.com/invisibleroads/socketIO-client/issues/102
Look at 404 not found error
https://github.com/invisibleroads/socketIO-client/issues/101
Look at socketio off and socketio once
https://github.com/invisibleroads/socketIO-client/pull/94
Implement rooms
https://github.com/invisibleroads/socketIO-client/issues/72
https://github.com/invisibleroads/socketIO-client/pull/65
Check tests
https://github.com/invisibleroads/socketIO-client/issues/90
Check whether it works on Windows 8
https://github.com/invisibleroads/socketIO-client/issues/97
Add debian packaging support
https://github.com/invisibleroads/socketIO-client/pull/89
+ Review issues and pull requests
+ Order issues

71
serve_tests.js Normal file
View file

@ -0,0 +1,71 @@
var io = require('socket.io').listen(8000);
var main = io.of('').on('connection', function(socket) {
socket.on('message', function(data, fn) {
if (fn) { // Client expects a callback
if (data) {
fn(data);
} else {
fn();
}
} else if (typeof data === 'object') {
socket.json.send(data ? data : 'message_response'); // object or null
} else {
socket.send(data ? data : 'message_response'); // string or ''
}
});
socket.on('emit', function() {
socket.emit('emit_response');
});
socket.on('emit_with_payload', function(payload) {
socket.emit('emit_with_payload_response', payload);
});
socket.on('emit_with_multiple_payloads', function(payload, payload) {
socket.emit('emit_with_multiple_payloads_response', payload, payload);
});
socket.on('emit_with_callback', function(fn) {
fn();
});
socket.on('emit_with_callback_with_payload', function(fn) {
fn(PAYLOAD);
});
socket.on('emit_with_callback_with_multiple_payloads', function(fn) {
fn(PAYLOAD, PAYLOAD);
});
socket.on('emit_with_event', function(payload) {
socket.emit('emit_with_event_response', payload);
});
socket.on('ack', function(payload) {
socket.emit('ack_response', payload, function(payload) {
socket.emit('ack_callback_response', payload);
});
});
socket.on('aaa', function() {
socket.emit('aaa_response', PAYLOAD);
});
socket.on('bbb', function(payload, fn) {
if (fn) {
fn(payload);
}
});
});
var chat = io.of('/chat').on('connection', function (socket) {
socket.on('emit_with_payload', function(payload) {
socket.emit('emit_with_payload_response', payload);
});
socket.on('aaa', function() {
socket.emit('aaa_response', 'in chat');
});
});
var news = io.of('/news').on('connection', function (socket) {
socket.on('emit_with_payload', function(payload) {
socket.emit('emit_with_payload_response', payload);
});
socket.on('aaa', function() {
socket.emit('aaa_response', 'in news');
});
});
var PAYLOAD = {'xxx': 'yyy'};

View file

@ -1,42 +1,31 @@
import io import os
from os.path import abspath, dirname, join from setuptools import setup, find_packages
from setuptools import find_packages, setup
REQUIREMENTS = [ here = os.path.abspath(os.path.dirname(__file__))
'requests', README = open(os.path.join(here, 'README.rst')).read()
'six', CHANGES = open(os.path.join(here, 'CHANGES.rst')).read()
'websocket-client',
]
HERE = dirname(abspath(__file__))
LOAD_TEXT = lambda name: io.open(join(HERE, name), encoding='UTF-8').read()
DESCRIPTION = '\n\n'.join(LOAD_TEXT(_) for _ in [
'README.rst',
'CHANGES.rst',
])
setup( setup(
name='socketIO_client', name='socketIO-client',
version='0.6.5', version='0.4',
description='A socket.io client library', description='A socket.io client library',
long_description=DESCRIPTION, long_description=README + '\n\n' + CHANGES,
license='MIT', license='MIT',
classifiers=[ classifiers=[
'Intended Audience :: Developers', 'Intended Audience :: Developers',
'Programming Language :: Python', 'Programming Language :: Python',
'License :: OSI Approved :: MIT License', 'License :: OSI Approved :: MIT License',
'Development Status :: 5 - Production/Stable',
], ],
keywords='socket.io node.js', keywords='socket.io node.js',
author='Roy Hyunjin Han', author='Roy Hyunjin Han',
author_email='rhh@crosscompute.com', author_email='rhh@crosscompute.com',
url='https://github.com/invisibleroads/socketIO-client', url='https://github.com/invisibleroads/socketIO-client',
install_requires=REQUIREMENTS, install_requires=[
tests_require=[ 'requests',
'nose', 'websocket-client',
'coverage',
], ],
packages=find_packages(), packages=find_packages(),
include_package_data=True, include_package_data=True,
zip_safe=False) zip_safe=True)

View file

@ -1,515 +1,444 @@
from .exceptions import ConnectionError, TimeoutError, PacketError import requests
from .heartbeats import HeartbeatThread import socket
from .logs import LoggingMixin from json import dumps, loads
from .namespaces import ( from threading import Thread, Event
EngineIONamespace, SocketIONamespace, LoggingSocketIONamespace, from time import sleep
find_callback) from websocket import WebSocketConnectionClosedException, create_connection
from .parsers import (
parse_host, parse_engineIO_session,
format_socketIO_packet_data, parse_socketIO_packet_data,
get_namespace_path)
from .symmetries import get_character
from .transports import (
WebsocketTransport, XHR_PollingTransport, prepare_http_session, TRANSPORTS)
__all__ = 'SocketIO', 'SocketIONamespace' PROTOCOL = 1 # socket.io protocol version
__version__ = '0.6.3'
BaseNamespace = SocketIONamespace
LoggingNamespace = LoggingSocketIONamespace
def retry(f): class BaseNamespace(object): # pragma: no cover
def wrap(*args, **kw): 'Define socket.io behavior'
self = args[0]
try:
return f(*args, **kw)
except (TimeoutError, ConnectionError):
self._opened = False
return f(*args, **kw)
return wrap
def __init__(self, _socketIO, path):
self._socketIO = _socketIO
self._path = path
self._callbackByEvent = {}
self.initialize()
class EngineIO(LoggingMixin): def initialize(self):
'Initialize custom variables here; you can override this method'
def __init__(
self, host, port=None, Namespace=EngineIONamespace,
wait_for_connection=True, transports=TRANSPORTS,
resource='engine.io', hurry_interval_in_seconds=1, **kw):
self._is_secure, self._url = parse_host(host, port, resource)
self._wait_for_connection = wait_for_connection
self._client_transports = transports
self._hurry_interval_in_seconds = hurry_interval_in_seconds
self._http_session = prepare_http_session(kw)
self._log_name = self._url
self._wants_to_close = False
self._opened = False
if Namespace:
self.define(Namespace)
self._transport
# Connect
@property
def _transport(self):
if self._opened:
return self._transport_instance
self._engineIO_session = self._get_engineIO_session()
self._negotiate_transport()
self._connect_namespaces()
self._opened = True
self._reset_heartbeat()
return self._transport_instance
def _get_engineIO_session(self):
warning_screen = self._yield_warning_screen()
for elapsed_time in warning_screen:
transport = XHR_PollingTransport(
self._http_session, self._is_secure, self._url)
try:
engineIO_packet_type, engineIO_packet_data = next(
transport.recv_packet())
break
except (TimeoutError, ConnectionError) as e:
if not self._wait_for_connection:
raise
warning = Exception('[waiting for connection] %s' % e)
warning_screen.throw(warning)
assert engineIO_packet_type == 0 # engineIO_packet_type == open
return parse_engineIO_session(engineIO_packet_data)
def _negotiate_transport(self):
self._transport_instance = self._get_transport('xhr-polling')
self.transport_name = 'xhr-polling'
is_ws_client = 'websocket' in self._client_transports
is_ws_server = 'websocket' in self._engineIO_session.transport_upgrades
if is_ws_client and is_ws_server:
try:
transport = self._get_transport('websocket')
transport.send_packet(2, 'probe')
for packet_type, packet_data in transport.recv_packet():
if packet_type == 3 and packet_data == b'probe':
transport.send_packet(5, '')
self._transport_instance = transport
self.transport_name = 'websocket'
else:
self._warn('unexpected engine.io packet')
except Exception:
pass
self._debug('[transport selected] %s', self.transport_name)
def _reset_heartbeat(self):
try:
self._heartbeat_thread.halt()
hurried = self._heartbeat_thread.hurried
except AttributeError:
hurried = False
ping_interval = self._engineIO_session.ping_interval
if self.transport_name.endswith('-polling'):
# Use ping/pong to unblock recv for polling transport
hurry_interval_in_seconds = self._hurry_interval_in_seconds
else:
# Use timeout to unblock recv for websocket transport
hurry_interval_in_seconds = ping_interval
self._heartbeat_thread = HeartbeatThread(
send_heartbeat=self._ping,
relax_interval_in_seconds=ping_interval,
hurry_interval_in_seconds=hurry_interval_in_seconds)
self._heartbeat_thread.start()
if hurried:
self._heartbeat_thread.hurry()
self._debug('[heartbeat reset]')
def _connect_namespaces(self):
pass pass
def _get_transport(self, transport_name): def on_connect(self):
SelectedTransport = { 'Called when socket is connecting; you can override this method'
'xhr-polling': XHR_PollingTransport, pass
'websocket': WebsocketTransport,
}[transport_name] def on_disconnect(self):
return SelectedTransport( 'Called when socket is disconnecting; you can override this method'
self._http_session, self._is_secure, self._url, pass
self._engineIO_session)
def on_error(self, reason, advice):
'Called when server sends an error; you can override this method'
print '[Error] %s' % advice
def on_message(self, data):
'Called when server sends a message; you can override this method'
print '[Message] %s' % data
def on_event(self, event, *args):
"""
Called when server emits an event; you can override this method.
Called only if the program cannot find a more specific event handler,
such as one defined by namespace.on('my_event', my_function).
"""
callback, args = find_callback(args)
arguments = [repr(_) for _ in args]
if callback:
arguments.append('callback(*args)')
callback(*args)
print '[Event] %s(%s)' % (event, ', '.join(arguments))
def on_open(self, *args):
print '[Open]', args
def on_close(self, *args):
print '[Close]', args
def on_retry(self, *args):
print '[Retry]', args
def on_reconnect(self, *args):
print '[Reconnect]', args
def message(self, data='', callback=None):
self._socketIO.message(data, callback, path=self._path)
def emit(self, event, *args, **kw):
kw['path'] = self._path
self._socketIO.emit(event, *args, **kw)
def on(self, event, callback):
'Define a callback to handle a custom event emitted by the server'
self._callbackByEvent[event] = callback
def _get_eventCallback(self, event):
# Check callbacks defined by on()
try:
return self._callbackByEvent[event]
except KeyError:
pass
# Check callbacks defined explicitly or use on_event()
callback = lambda *args: self.on_event(event, *args)
return getattr(self, 'on_' + event.replace(' ', '_'), callback)
class SocketIO(object):
def __init__(self, host, port, secure=False, headers=None, proxies=None):
"""
Create a socket.io client that connects to a socket.io server
at the specified host and port. Set secure=True to use HTTPS / WSS.
SocketIO('localhost', 8000, secure=True,
proxies={'https': 'https://proxy.example.com:8080'})
"""
self._socketIO = _SocketIO(host, port, secure, headers, proxies)
self._namespaceByPath = {}
self.define(BaseNamespace) # Define default namespace
self._rhythmicThread = _RhythmicThread(
self._socketIO.heartbeatInterval,
self._socketIO.send_heartbeat)
self._rhythmicThread.start()
self._listenerThread = _ListenerThread(
self._socketIO,
self._namespaceByPath)
self._listenerThread.start()
def __enter__(self): def __enter__(self):
return self return self
def __exit__(self, *exception_pack): def __exit__(self, exc_type, exc_value, traceback):
self._close() self.disconnect()
def __del__(self): def __del__(self):
self._close() self.disconnect(close=False)
# Define
def define(self, Namespace):
self._namespace = namespace = Namespace(self)
return namespace
def on(self, event, callback):
try:
namespace = self.get_namespace()
except PacketError:
namespace = self.define(EngineIONamespace)
return namespace.on(event, callback)
def get_namespace(self):
try:
return self._namespace
except AttributeError:
raise PacketError('undefined engine.io namespace')
# Act
def send(self, engineIO_packet_data):
self._message(engineIO_packet_data)
def _open(self):
engineIO_packet_type = 0
self._transport_instance.send_packet(engineIO_packet_type)
def _close(self):
self._wants_to_close = True
try:
self._heartbeat_thread.halt()
except AttributeError:
pass
if not self._opened:
return
engineIO_packet_type = 1
try:
self._transport_instance.send_packet(engineIO_packet_type)
except (TimeoutError, ConnectionError):
pass
self._opened = False
def _ping(self, engineIO_packet_data=''):
engineIO_packet_type = 2
self._transport_instance.send_packet(
engineIO_packet_type, engineIO_packet_data)
def _pong(self, engineIO_packet_data=''):
engineIO_packet_type = 3
self._transport_instance.send_packet(
engineIO_packet_type, engineIO_packet_data)
@retry
def _message(self, engineIO_packet_data, with_transport_instance=False):
engineIO_packet_type = 4
if with_transport_instance:
transport = self._transport_instance
else:
transport = self._transport
transport.send_packet(engineIO_packet_type, engineIO_packet_data)
self._debug('[socket.io packet sent] %s', engineIO_packet_data)
def _upgrade(self):
engineIO_packet_type = 5
self._transport_instance.send_packet(engineIO_packet_type)
def _noop(self):
engineIO_packet_type = 6
self._transport_instance.send_packet(engineIO_packet_type)
# React
def wait(self, seconds=None, **kw):
'Wait in a loop and react to events as defined in the namespaces'
# Use ping/pong to unblock recv for polling transport
self._heartbeat_thread.hurry()
# Use timeout to unblock recv for websocket transport
self._transport.set_timeout(seconds=1)
# Listen
warning_screen = self._yield_warning_screen(seconds)
for elapsed_time in warning_screen:
if self._should_stop_waiting(**kw):
break
try:
try:
self._process_packets()
except TimeoutError:
pass
except ConnectionError as e:
self._opened = False
try:
warning = Exception('[connection error] %s' % e)
warning_screen.throw(warning)
except StopIteration:
self._warn(warning)
try:
namespace = self.get_namespace()
namespace.on_disconnect()
except PacketError:
pass
self._heartbeat_thread.relax()
self._transport.set_timeout()
def _should_stop_waiting(self):
return self._wants_to_close
def _process_packets(self):
for engineIO_packet in self._transport.recv_packet():
try:
self._process_packet(engineIO_packet)
except PacketError as e:
self._warn('[packet error] %s', e)
def _process_packet(self, packet):
engineIO_packet_type, engineIO_packet_data = packet
# Launch callbacks
namespace = self.get_namespace()
try:
delegate = {
0: self._on_open,
1: self._on_close,
2: self._on_ping,
3: self._on_pong,
4: self._on_message,
5: self._on_upgrade,
6: self._on_noop,
}[engineIO_packet_type]
except KeyError:
raise PacketError(
'unexpected engine.io packet type (%s)' % engineIO_packet_type)
delegate(engineIO_packet_data, namespace)
if engineIO_packet_type is 4:
return engineIO_packet_data
def _on_open(self, data, namespace):
namespace._find_packet_callback('open')()
def _on_close(self, data, namespace):
namespace._find_packet_callback('close')()
def _on_ping(self, data, namespace):
self._pong(data)
namespace._find_packet_callback('ping')(data)
def _on_pong(self, data, namespace):
namespace._find_packet_callback('pong')(data)
def _on_message(self, data, namespace):
namespace._find_packet_callback('message')(data)
def _on_upgrade(self, data, namespace):
namespace._find_packet_callback('upgrade')()
def _on_noop(self, data, namespace):
namespace._find_packet_callback('noop')()
class SocketIO(EngineIO):
"""Create a socket.io client that connects to a socket.io server
at the specified host and port.
- Define the behavior of the client by specifying a custom Namespace.
- Prefix host with https:// to use SSL.
- Set wait_for_connection=True to block until we have a connection.
- Specify desired transports=['websocket', 'xhr-polling'].
- Pass query params, headers, cookies, proxies as keyword arguments.
SocketIO(
'localhost', 8000,
params={'q': 'qqq'},
headers={'Authorization': 'Basic ' + b64encode('username:password')},
cookies={'a': 'aaa'},
proxies={'https': 'https://proxy.example.com:8080'})
"""
def __init__(
self, host, port=None, Namespace=SocketIONamespace,
wait_for_connection=True, transports=TRANSPORTS,
resource='socket.io', hurry_interval_in_seconds=1, **kw):
self._namespace_by_path = {}
self._callback_by_ack_id = {}
self._ack_id = 0
super(SocketIO, self).__init__(
host, port, Namespace, wait_for_connection, transports,
resource, hurry_interval_in_seconds, **kw)
# Connect
@property @property
def connected(self): def connected(self):
return self._opened return self._socketIO.connected
def _connect_namespaces(self): def disconnect(self, path='', close=True):
for path, namespace in self._namespace_by_path.items(): if self.connected:
namespace._transport = self._transport_instance self._socketIO.disconnect(path, close)
if path: if path:
self.connect(path, with_transport_instance=True) del self._namespaceByPath[path]
else:
def __exit__(self, *exception_pack): self._rhythmicThread.cancel()
self.disconnect() self._listenerThread.cancel()
super(SocketIO, self).__exit__(*exception_pack)
def __del__(self):
self.disconnect()
super(SocketIO, self).__del__()
# Define
def define(self, Namespace, path=''): def define(self, Namespace, path=''):
self._namespace_by_path[path] = namespace = Namespace(self, path)
if path: if path:
self.connect(path) self._socketIO.connect(path)
self.wait(for_connect=True) namespace = Namespace(self._socketIO, path)
self._namespaceByPath[path] = namespace
return namespace return namespace
def on(self, event, callback, path=''):
try:
namespace = self.get_namespace(path)
except PacketError:
namespace = self.define(SocketIONamespace, path)
return namespace.on(event, callback)
def get_namespace(self, path=''): def get_namespace(self, path=''):
try: return self._namespaceByPath[path]
return self._namespace_by_path[path]
except KeyError:
raise PacketError('undefined socket.io namespace (%s)' % path)
# Act def on(self, event, callback, path=''):
return self.get_namespace(path).on(event, callback)
def connect(self, path, with_transport_instance=False): def message(self, data='', callback=None, path=''):
socketIO_packet_type = 0 self._socketIO.message(data, callback, path)
socketIO_packet_data = format_socketIO_packet_data(path)
self._message(
str(socketIO_packet_type) + socketIO_packet_data,
with_transport_instance)
def disconnect(self, path=''):
if not path or not self._opened:
self._close()
elif path:
socketIO_packet_type = 1
socketIO_packet_data = format_socketIO_packet_data(path)
try:
self._message(str(socketIO_packet_type) + socketIO_packet_data)
except (TimeoutError, ConnectionError):
pass
try:
namespace = self._namespace_by_path.pop(path)
namespace.on_disconnect()
except KeyError:
pass
def emit(self, event, *args, **kw): def emit(self, event, *args, **kw):
path = kw.get('path', '') self._socketIO.emit(event, *args, **kw)
callback, args = find_callback(args, kw)
ack_id = self._set_ack_callback(callback) if callback else None
args = [event] + list(args)
socketIO_packet_type = 2
socketIO_packet_data = format_socketIO_packet_data(path, ack_id, args)
self._message(str(socketIO_packet_type) + socketIO_packet_data)
def send(self, data='', callback=None, **kw): def wait(self, seconds=None):
path = kw.get('path', '') if seconds:
args = [data] self._listenerThread.wait(seconds)
if callback: else:
args.append(callback) try:
self.emit('message', *args, path=path) while self.connected:
sleep(1)
def _ack(self, path, ack_id, *args): except KeyboardInterrupt:
socketIO_packet_type = 3 pass
socketIO_packet_data = format_socketIO_packet_data(path, ack_id, args)
self._message(str(socketIO_packet_type) + socketIO_packet_data)
# React
def wait_for_callbacks(self, seconds=None): def wait_for_callbacks(self, seconds=None):
self.wait(seconds, for_callbacks=True) self._listenerThread.wait_for_callbacks(seconds)
def _should_stop_waiting(self, for_connect=False, for_callbacks=False):
if for_connect:
for namespace in self._namespace_by_path.values():
is_namespace_connected = getattr(
namespace, '_connected', False)
if not is_namespace_connected:
return False
return True
if for_callbacks and not self._has_ack_callback:
return True
return super(SocketIO, self)._should_stop_waiting()
def _process_packet(self, packet): class _RhythmicThread(Thread):
engineIO_packet_data = super(SocketIO, self)._process_packet(packet) 'Execute call every few seconds'
if engineIO_packet_data is None:
return daemon = True
self._debug('[socket.io packet received] %s', engineIO_packet_data)
socketIO_packet_type = int(get_character(engineIO_packet_data, 0)) def __init__(self, intervalInSeconds, call, *args, **kw):
socketIO_packet_data = engineIO_packet_data[1:] super(_RhythmicThread, self).__init__()
# Launch callbacks self.intervalInSeconds = intervalInSeconds
path = get_namespace_path(socketIO_packet_data) self.call = call
namespace = self.get_namespace(path) self.args = args
self.kw = kw
self.done = Event()
def run(self):
while not self.done.is_set():
self.call(*self.args, **self.kw)
self.done.wait(self.intervalInSeconds)
def cancel(self):
self.done.set()
class _ListenerThread(Thread):
'Process messages from socket.io server'
daemon = True
def __init__(self, _socketIO, _namespaceByPath):
super(_ListenerThread, self).__init__()
self._socketIO = _socketIO
self._namespaceByPath = _namespaceByPath
self.done = Event()
self.ready = Event()
self.ready.set()
def cancel(self):
self.done.set()
def wait(self, seconds):
self.done.wait(seconds)
def wait_for_callbacks(self, seconds):
self.ready.clear()
self.ready.wait(seconds)
def get_ackCallback(self, packetID):
return lambda *args: self._socketIO.ack(packetID, *args)
def run(self):
while not self.done.is_set():
try:
code, packetID, path, data = self._socketIO.recv_packet()
except SocketIOConnectionError, error:
print error
self.cancel()
break
except SocketIOPacketError, error:
print error
continue
try:
namespace = self._namespaceByPath[path]
except KeyError:
print 'Received unexpected path (%s)' % path
continue
try: try:
delegate = { delegate = {
0: self._on_connect, '0': self.on_disconnect,
1: self._on_disconnect, '1': self.on_connect,
2: self._on_event, '2': self.on_heartbeat,
3: self._on_ack, '3': self.on_message,
4: self._on_error, '4': self.on_json,
5: self._on_binary_event, '5': self.on_event,
6: self._on_binary_ack, '6': self.on_ack,
}[socketIO_packet_type] '7': self.on_error,
}[code]
except KeyError: except KeyError:
raise PacketError( print 'Received unexpected code (%s)' % code
'unexpected socket.io packet type (%s)' % socketIO_packet_type) continue
delegate(socketIO_packet_data, namespace) delegate(packetID, namespace._get_eventCallback, data)
return socketIO_packet_data
def _on_connect(self, data, namespace): def on_disconnect(self, packetID, get_eventCallback, data):
namespace._connected = True get_eventCallback('disconnect')()
namespace._find_packet_callback('connect')()
def _on_disconnect(self, data, namespace): def on_connect(self, packetID, get_eventCallback, data):
namespace._connected = False get_eventCallback('connect')()
namespace._find_packet_callback('disconnect')()
def _on_event(self, data, namespace): def on_heartbeat(self, packetID, get_eventCallback, data):
data_parsed = parse_socketIO_packet_data(data) pass
args = data_parsed.args
def on_message(self, packetID, get_eventCallback, data):
args = [data]
if packetID:
args.append(self.get_ackCallback(packetID))
get_eventCallback('message')(*args)
def on_json(self, packetID, get_eventCallback, data):
args = [loads(data)]
if packetID:
args.append(self.get_ackCallback(packetID))
get_eventCallback('message')(*args)
def on_event(self, packetID, get_eventCallback, data):
valueByName = loads(data)
event = valueByName['name']
args = valueByName.get('args', [])
if packetID:
args.append(self.get_ackCallback(packetID))
get_eventCallback(event)(*args)
def on_ack(self, packetID, get_eventCallback, data):
dataParts = data.split('+', 1)
messageID = int(dataParts[0])
args = loads(dataParts[1]) if len(dataParts) > 1 else []
callback = self._socketIO.get_messageCallback(messageID)
if not callback:
return
callback(*args)
if not self._socketIO.has_messageCallback:
self.ready.set()
def on_error(self, packetID, get_eventCallback, data):
reason, advice = data.split('+', 1)
get_eventCallback('error')(reason, advice)
class _SocketIO(object):
'Low-level interface to remove cyclic references in child threads'
messageID = 0
def __init__(self, host, port, secure, headers, proxies):
baseURL = '%s:%d/socket.io/%s' % (host, port, PROTOCOL)
targetScheme = 'https' if secure else 'http'
targetURL = '%s://%s/' % (targetScheme, baseURL)
try: try:
event = args.pop(0) response = requests.get(
except IndexError: targetURL,
raise PacketError('missing event name') headers=headers,
if data_parsed.ack_id is not None: proxies=proxies)
args.append(self._prepare_to_send_ack( except IOError: # pragma: no cover
data_parsed.path, data_parsed.ack_id)) raise SocketIOError('Could not start connection')
namespace._find_packet_callback(event)(*args) if 200 != response.status_code: # pragma: no cover
raise SocketIOError('Could not establish connection')
responseParts = response.text.split(':')
sessionID = responseParts[0]
heartbeatTimeout = int(responseParts[1])
# connectionTimeout = int(responseParts[2])
supportedTransports = responseParts[3].split(',')
if 'websocket' not in supportedTransports:
raise SocketIOError('Could not parse handshake')
socketScheme = 'wss' if secure else 'ws'
socketURL = '%s://%s/websocket/%s' % (socketScheme, baseURL, sessionID)
self.connection = create_connection(socketURL)
self.heartbeatInterval = heartbeatTimeout - 2
self.callbackByMessageID = {}
def _on_ack(self, data, namespace): def __del__(self):
data_parsed = parse_socketIO_packet_data(data) self.disconnect(close=False)
def disconnect(self, path='', close=True):
if not self.connected:
return
if path:
self.send_packet(0, path)
elif close:
self.connection.close()
def connect(self, path):
self.send_packet(1, path)
def send_heartbeat(self):
try: try:
ack_callback = self._get_ack_callback(data_parsed.ack_id) self.send_packet(2)
except SocketIOPacketError:
print 'Could not send heartbeat'
pass
def message(self, data, callback, path):
if isinstance(data, basestring):
code = 3
packetData = data
else:
code = 4
packetData = dumps(data, ensure_ascii=False)
self.send_packet(code, path, packetData, callback)
def emit(self, event, *args, **kw):
callback, args = find_callback(args, kw)
packetData = dumps(dict(name=event, args=args), ensure_ascii=False)
path = kw.get('path', '')
self.send_packet(5, path, packetData, callback)
def ack(self, packetID, *args):
packetID = packetID.rstrip('+')
packetData = '%s+%s' % (
packetID,
dumps(args, ensure_ascii=False),
) if args else packetID
self.send_packet(6, data=packetData)
def set_messageCallback(self, callback):
'Set callback that will be called after receiving an acknowledgment'
self.messageID += 1
self.callbackByMessageID[self.messageID] = callback
return '%s+' % self.messageID
def get_messageCallback(self, messageID):
try:
callback = self.callbackByMessageID[messageID]
del self.callbackByMessageID[messageID]
return callback
except KeyError: except KeyError:
return return
ack_callback(*data_parsed.args)
def _on_error(self, data, namespace):
namespace._find_packet_callback('error')(data)
def _on_binary_event(self, data, namespace):
self._warn('[not implemented] binary event')
def _on_binary_ack(self, data, namespace):
self._warn('[not implemented] binary ack')
def _prepare_to_send_ack(self, path, ack_id):
'Return function that acknowledges the server'
return lambda *args: self._ack(path, ack_id, *args)
def _set_ack_callback(self, callback):
self._ack_id += 1
self._callback_by_ack_id[self._ack_id] = callback
return self._ack_id
def _get_ack_callback(self, ack_id):
return self._callback_by_ack_id.pop(ack_id)
@property @property
def _has_ack_callback(self): def has_messageCallback(self):
return True if self._callback_by_ack_id else False return True if self.callbackByMessageID else False
def recv_packet(self):
try:
packet = self.connection.recv()
except WebSocketConnectionClosedException:
text = 'Lost connection (Connection closed)'
raise SocketIOConnectionError(text)
except socket.timeout:
text = 'Lost connection (Connection timed out)'
raise SocketIOConnectionError(text)
except socket.error:
text = 'Lost connection'
raise SocketIOConnectionError(text)
try:
packetParts = packet.split(':', 3)
except AttributeError:
raise SocketIOPacketError('Received invalid packet (%s)' % packet)
packetCount = len(packetParts)
code, packetID, path, data = None, None, None, None
if 4 == packetCount:
code, packetID, path, data = packetParts
elif 3 == packetCount:
code, packetID, path = packetParts
elif 1 == packetCount:
code = packetParts[0]
return code, packetID, path, data
def send_packet(self, code, path='', data='', callback=None):
packetID = self.set_messageCallback(callback) if callback else ''
packetParts = [str(code), packetID, path, data]
try:
packet = ':'.join(packetParts)
self.connection.send(packet)
except socket.error:
raise SocketIOPacketError('Could not send packet')
@property
def connected(self):
return self.connection.connected
class SocketIOError(Exception):
pass
class SocketIOConnectionError(SocketIOError):
pass
class SocketIOPacketError(SocketIOError):
pass
def find_callback(args, kw=None):
'Return callback whether passed as a last argument or as a keyword'
if args and callable(args[-1]):
return args[-1], args[:-1]
try:
return kw['callback'], args
except (KeyError, TypeError):
return None, args

View file

@ -1,14 +0,0 @@
class SocketIOError(Exception):
pass
class ConnectionError(SocketIOError):
pass
class TimeoutError(SocketIOError):
pass
class PacketError(SocketIOError):
pass

View file

@ -1,52 +0,0 @@
import logging
from threading import Thread, Event
from .exceptions import ConnectionError, TimeoutError
class HeartbeatThread(Thread):
daemon = True
def __init__(
self, send_heartbeat,
relax_interval_in_seconds,
hurry_interval_in_seconds):
super(HeartbeatThread, self).__init__()
self._send_heartbeat = send_heartbeat
self._relax_interval_in_seconds = relax_interval_in_seconds
self._hurry_interval_in_seconds = hurry_interval_in_seconds
self._adrenaline = Event()
self._rest = Event()
self._halt = Event()
def run(self):
try:
while not self._halt.is_set():
try:
self._send_heartbeat()
except TimeoutError:
pass
if self._adrenaline.is_set():
interval_in_seconds = self._hurry_interval_in_seconds
else:
interval_in_seconds = self._relax_interval_in_seconds
self._rest.wait(interval_in_seconds)
except ConnectionError:
logging.debug('[heartbeat connection error]')
def relax(self):
self._adrenaline.clear()
def hurry(self):
self._adrenaline.set()
self._rest.set()
self._rest.clear()
@property
def hurried(self):
return self._adrenaline.is_set()
def halt(self):
self._rest.set()
self._halt.set()

View file

@ -1,42 +0,0 @@
import logging
import time
class LoggingMixin(object):
def _log(self, level, msg, *attrs):
logging.log(level, '%s %s' % (self._log_name, msg), *attrs)
def _debug(self, msg, *attrs):
self._log(logging.DEBUG, msg, *attrs)
def _info(self, msg, *attrs):
self._log(logging.INFO, msg, *attrs)
def _warn(self, msg, *attrs):
self._log(logging.WARNING, msg, *attrs)
def _yield_warning_screen(self, seconds=None):
last_warning = None
for elapsed_time in _yield_elapsed_time(seconds):
try:
yield elapsed_time
except Exception as warning:
warning = str(warning)
if last_warning != warning:
last_warning = warning
self._warn(warning)
time.sleep(1)
def _yield_elapsed_time(seconds=None):
start_time = time.time()
if seconds is None:
while True:
yield _get_elapsed_time(start_time)
while _get_elapsed_time(start_time) < seconds:
yield _get_elapsed_time(start_time)
def _get_elapsed_time(start_time):
return time.time() - start_time

View file

@ -1,224 +0,0 @@
from .logs import LoggingMixin
class EngineIONamespace(LoggingMixin):
'Define engine.io client behavior'
def __init__(self, io):
self._io = io
self._callback_by_event = {}
self._log_name = io._url
self.initialize()
def initialize(self):
"""Initialize custom variables here.
You can override this method."""
def on(self, event, callback):
'Define a callback to handle an event emitted by the server'
self._callback_by_event[event] = callback
def send(self, data):
'Send a message'
self._io.send(data)
def on_open(self):
"""Called after engine.io connects.
You can override this method."""
def on_close(self):
"""Called after engine.io disconnects.
You can override this method."""
def on_ping(self, data):
"""Called after engine.io sends a ping packet.
You can override this method."""
def on_pong(self, data):
"""Called after engine.io sends a pong packet.
You can override this method."""
def on_message(self, data):
"""Called after engine.io sends a message packet.
You can override this method."""
def on_upgrade(self):
"""Called after engine.io sends an upgrade packet.
You can override this method."""
def on_noop(self):
"""Called after engine.io sends a noop packet.
You can override this method."""
def _find_packet_callback(self, event):
# Check callbacks defined by on()
try:
return self._callback_by_event[event]
except KeyError:
pass
# Check callbacks defined explicitly
return getattr(self, 'on_' + event)
class SocketIONamespace(EngineIONamespace):
'Define socket.io client behavior'
def __init__(self, io, path):
self.path = path
super(SocketIONamespace, self).__init__(io)
def connect(self):
self._io.connect(self.path)
def disconnect(self):
self._io.disconnect(self.path)
def emit(self, event, *args, **kw):
self._io.emit(event, path=self.path, *args, **kw)
def send(self, data='', callback=None):
self._io.send(data, callback)
def on_connect(self):
"""Called after socket.io connects.
You can override this method."""
def on_reconnect(self):
"""Called after socket.io reconnects.
You can override this method."""
def on_disconnect(self):
"""Called after socket.io disconnects.
You can override this method."""
def on_event(self, event, *args):
"""
Called if there is no matching event handler.
You can override this method.
There are three ways to define an event handler:
- Call socketIO.on()
socketIO = SocketIO('localhost', 8000)
socketIO.on('my_event', my_function)
- Call namespace.on()
namespace = socketIO.get_namespace()
namespace.on('my_event', my_function)
- Define namespace.on_xxx
class Namespace(SocketIONamespace):
def on_my_event(self, *args):
my_function(*args)
socketIO.define(Namespace)"""
def on_error(self, data):
"""Called after socket.io sends an error packet.
You can override this method."""
def _find_packet_callback(self, event):
# Interpret events
if event == 'connect':
if not hasattr(self, '_was_connected'):
self._was_connected = True
else:
event = 'reconnect'
# Check callbacks defined by on()
try:
return self._callback_by_event[event]
except KeyError:
pass
# Check callbacks defined explicitly or use on_event()
return getattr(
self, 'on_' + event.replace(' ', '_'),
lambda *args: self.on_event(event, *args))
class LoggingEngineIONamespace(EngineIONamespace):
def on_open(self):
self._debug('[engine.io open]')
super(LoggingEngineIONamespace, self).on_open()
def on_close(self):
self._debug('[engine.io close]')
super(LoggingEngineIONamespace, self).on_close()
def on_ping(self, data):
self._debug('[engine.io ping] %s', data)
super(LoggingEngineIONamespace, self).on_ping(data)
def on_pong(self, data):
self._debug('[engine.io pong] %s', data)
super(LoggingEngineIONamespace, self).on_pong(data)
def on_message(self, data):
self._debug('[engine.io message] %s', data)
super(LoggingEngineIONamespace, self).on_message(data)
def on_upgrade(self):
self._debug('[engine.io upgrade]')
super(LoggingEngineIONamespace, self).on_upgrade()
def on_noop(self):
self._debug('[engine.io noop]')
super(LoggingEngineIONamespace, self).on_noop()
def on_event(self, event, *args):
callback, args = find_callback(args)
arguments = [repr(_) for _ in args]
if callback:
arguments.append('callback(*args)')
self._info('[engine.io event] %s(%s)', event, ', '.join(arguments))
super(LoggingEngineIONamespace, self).on_event(event, *args)
class LoggingSocketIONamespace(SocketIONamespace, LoggingEngineIONamespace):
def on_connect(self):
self._debug(
'%s[socket.io connect]', _make_logging_header(self.path))
super(LoggingSocketIONamespace, self).on_connect()
def on_reconnect(self):
self._debug(
'%s[socket.io reconnect]', _make_logging_header(self.path))
super(LoggingSocketIONamespace, self).on_reconnect()
def on_disconnect(self):
self._debug(
'%s[socket.io disconnect]', _make_logging_header(self.path))
super(LoggingSocketIONamespace, self).on_disconnect()
def on_event(self, event, *args):
callback, args = find_callback(args)
arguments = [repr(_) for _ in args]
if callback:
arguments.append('callback(*args)')
self._info(
'%s[socket.io event] %s(%s)', _make_logging_header(self.path),
event, ', '.join(arguments))
super(LoggingSocketIONamespace, self).on_event(event, *args)
def on_error(self, data):
self._debug(
'%s[socket.io error] %s', _make_logging_header(self.path), data)
super(LoggingSocketIONamespace, self).on_error()
def find_callback(args, kw=None):
'Return callback whether passed as a last argument or as a keyword'
if args and callable(args[-1]):
return args[-1], args[:-1]
try:
return kw['callback'], args
except (KeyError, TypeError):
return None, args
def _make_logging_header(path):
return path + ' ' if path else ''

View file

@ -1,137 +0,0 @@
import json
from collections import namedtuple
from .symmetries import (
decode_string, encode_string, get_byte, get_character, parse_url)
EngineIOSession = namedtuple('EngineIOSession', [
'id', 'ping_interval', 'ping_timeout', 'transport_upgrades'])
SocketIOData = namedtuple('SocketIOData', ['path', 'ack_id', 'args'])
def parse_host(host, port, resource):
if not host.startswith('http'):
host = 'http://' + host
url_pack = parse_url(host)
is_secure = url_pack.scheme == 'https'
port = port or url_pack.port or (443 if is_secure else 80)
url = '%s:%d%s/%s' % (url_pack.hostname, port, url_pack.path, resource)
return is_secure, url
def parse_engineIO_session(engineIO_packet_data):
d = json.loads(decode_string(engineIO_packet_data))
return EngineIOSession(
id=d['sid'],
ping_interval=d['pingInterval'] / float(1000),
ping_timeout=d['pingTimeout'] / float(1000),
transport_upgrades=d['upgrades'])
def encode_engineIO_content(engineIO_packets):
content = bytearray()
for packet_type, packet_data in engineIO_packets:
packet_text = format_packet_text(packet_type, packet_data)
content.extend(_make_packet_prefix(packet_text) + packet_text)
return content
def decode_engineIO_content(content):
content_index = 0
content_length = len(content)
while content_index < content_length:
try:
content_index, packet_length = _read_packet_length(
content, content_index)
except IndexError:
break
content_index, packet_text = _read_packet_text(
content, content_index, packet_length)
engineIO_packet_type, engineIO_packet_data = parse_packet_text(
packet_text)
yield engineIO_packet_type, engineIO_packet_data
def format_socketIO_packet_data(path=None, ack_id=None, args=None):
socketIO_packet_data = json.dumps(args, ensure_ascii=False) if args else ''
if ack_id is not None:
socketIO_packet_data = str(ack_id) + socketIO_packet_data
if path:
socketIO_packet_data = path + ',' + socketIO_packet_data
return socketIO_packet_data
def parse_socketIO_packet_data(socketIO_packet_data):
data = decode_string(socketIO_packet_data)
if data.startswith('/'):
try:
path, data = data.split(',', 1)
except ValueError:
path = data
data = ''
else:
path = ''
try:
ack_id_string, data = data.split('[', 1)
data = '[' + data
ack_id = int(ack_id_string)
except (ValueError, IndexError):
ack_id = None
try:
args = json.loads(data)
except ValueError:
args = []
return SocketIOData(path=path, ack_id=ack_id, args=args)
def format_packet_text(packet_type, packet_data):
return encode_string(str(packet_type) + packet_data)
def parse_packet_text(packet_text):
packet_type = int(get_character(packet_text, 0))
packet_data = packet_text[1:]
return packet_type, packet_data
def get_namespace_path(socketIO_packet_data):
if not socketIO_packet_data.startswith(b'/'):
return ''
# Loop incrementally in case there is binary data
parts = []
for i in range(len(socketIO_packet_data)):
character = get_character(socketIO_packet_data, i)
if ',' == character:
break
parts.append(character)
return ''.join(parts)
def _make_packet_prefix(packet):
length_string = str(len(packet))
header_digits = bytearray([0])
for i in range(len(length_string)):
header_digits.append(ord(length_string[i]) - 48)
header_digits.append(255)
return header_digits
def _read_packet_length(content, content_index):
while get_byte(content, content_index) != 0:
content_index += 1
content_index += 1
packet_length_string = ''
byte = get_byte(content, content_index)
while byte != 255:
packet_length_string += str(byte)
content_index += 1
byte = get_byte(content, content_index)
return content_index, int(packet_length_string)
def _read_packet_text(content, content_index, packet_length):
while get_byte(content, content_index) == 255:
content_index += 1
packet_text = content[content_index:content_index + packet_length]
return content_index + packet_length, packet_text

View file

@ -1,29 +0,0 @@
import six
try:
from urllib import urlencode as format_query
except ImportError:
from urllib.parse import urlencode as format_query
try:
from urlparse import urlparse as parse_url
except ImportError:
from urllib.parse import urlparse as parse_url
try:
memoryview = memoryview
except NameError:
memoryview = buffer
def get_character(x, index):
return chr(get_byte(x, index))
def get_byte(x, index):
return six.indexbytes(x, index)
def encode_string(x):
return x.encode('utf-8')
def decode_string(x):
return x.decode('utf-8')

174
socketIO_client/tests.py Normal file
View file

@ -0,0 +1,174 @@
from socketIO_client import SocketIO, BaseNamespace, find_callback
from unittest import TestCase
HOST = 'localhost'
PORT = 8000
DATA = 'xxx'
PAYLOAD = {'xxx': 'yyy'}
class TestSocketIO(TestCase):
def setUp(self):
self.socketIO = SocketIO(HOST, PORT)
self.called_on_response = False
def tearDown(self):
del self.socketIO
def on_response(self, *args):
self.called_on_response = True
for arg in args:
if isinstance(arg, dict):
self.assertEqual(arg, PAYLOAD)
else:
self.assertEqual(arg, DATA)
def is_connected(self, socketIO, connected):
childThreads = [
socketIO._rhythmicThread,
socketIO._listenerThread,
]
for childThread in childThreads:
self.assertEqual(not connected, childThread.done.is_set())
self.assertEqual(connected, socketIO.connected)
def test_disconnect(self):
'Terminate child threads after disconnect'
self.is_connected(self.socketIO, True)
self.socketIO.disconnect()
self.is_connected(self.socketIO, False)
# Use context manager
with SocketIO(HOST, PORT) as self.socketIO:
self.is_connected(self.socketIO, True)
self.is_connected(self.socketIO, False)
def test_message(self):
'Message'
self.socketIO.define(Namespace)
self.socketIO.message()
self.socketIO.wait(0.1)
namespace = self.socketIO.get_namespace()
self.assertEqual(namespace.response, 'message_response')
def test_message_with_data(self):
'Message with data'
self.socketIO.define(Namespace)
self.socketIO.message(DATA)
self.socketIO.wait(0.1)
namespace = self.socketIO.get_namespace()
self.assertEqual(namespace.response, DATA)
def test_message_with_payload(self):
'Message with payload'
self.socketIO.define(Namespace)
self.socketIO.message(PAYLOAD)
self.socketIO.wait(0.1)
namespace = self.socketIO.get_namespace()
self.assertEqual(namespace.response, PAYLOAD)
def test_message_with_callback(self):
'Message with callback'
self.socketIO.message(callback=self.on_response)
self.socketIO.wait_for_callbacks(seconds=0.1)
self.assertEqual(self.called_on_response, True)
def test_message_with_callback_with_data(self):
'Message with callback with data'
self.socketIO.message(DATA, self.on_response)
self.socketIO.wait_for_callbacks(seconds=0.1)
self.assertEqual(self.called_on_response, True)
def test_emit(self):
'Emit'
self.socketIO.define(Namespace)
self.socketIO.emit('emit')
self.socketIO.wait(0.1)
self.assertEqual(self.socketIO.get_namespace().argsByEvent, {
'emit_response': (),
})
def test_emit_with_payload(self):
'Emit with payload'
self.socketIO.define(Namespace)
self.socketIO.emit('emit_with_payload', PAYLOAD)
self.socketIO.wait(0.1)
self.assertEqual(self.socketIO.get_namespace().argsByEvent, {
'emit_with_payload_response': (PAYLOAD,),
})
def test_emit_with_multiple_payloads(self):
'Emit with multiple payloads'
self.socketIO.define(Namespace)
self.socketIO.emit('emit_with_multiple_payloads', PAYLOAD, PAYLOAD)
self.socketIO.wait(0.1)
self.assertEqual(self.socketIO.get_namespace().argsByEvent, {
'emit_with_multiple_payloads_response': (PAYLOAD, PAYLOAD),
})
def test_emit_with_callback(self):
'Emit with callback'
self.socketIO.emit('emit_with_callback', self.on_response)
self.socketIO.wait_for_callbacks(seconds=0.1)
self.assertEqual(self.called_on_response, True)
def test_emit_with_callback_with_payload(self):
'Emit with callback with payload'
self.socketIO.emit('emit_with_callback_with_payload',
self.on_response)
self.socketIO.wait_for_callbacks(seconds=0.1)
self.assertEqual(self.called_on_response, True)
def test_emit_with_callback_with_multiple_payloads(self):
'Emit with callback with multiple payloads'
self.socketIO.emit('emit_with_callback_with_multiple_payloads',
self.on_response)
self.socketIO.wait_for_callbacks(seconds=0.1)
self.assertEqual(self.called_on_response, True)
def test_emit_with_event(self):
'Emit to trigger an event'
self.socketIO.on('emit_with_event_response', self.on_response)
self.socketIO.emit('emit_with_event', PAYLOAD)
self.socketIO.wait_for_callbacks(0.1)
self.assertEqual(self.called_on_response, True)
def test_ack(self):
'Trigger server callback'
self.socketIO.define(Namespace)
self.socketIO.emit('ack', PAYLOAD)
self.socketIO.wait(0.1)
self.assertEqual(self.socketIO.get_namespace().argsByEvent, {
'ack_response': (PAYLOAD,),
'ack_callback_response': (PAYLOAD,),
})
def test_namespaces(self):
'Behave differently in different namespaces'
mainNamespace = self.socketIO.define(Namespace)
chatNamespace = self.socketIO.define(Namespace, '/chat')
newsNamespace = self.socketIO.define(Namespace, '/news')
newsNamespace.emit('emit_with_payload', PAYLOAD)
self.socketIO.wait(0.1)
self.assertEqual(mainNamespace.argsByEvent, {})
self.assertEqual(chatNamespace.argsByEvent, {})
self.assertEqual(newsNamespace.argsByEvent, {
'emit_with_payload_response': (PAYLOAD,),
})
class Namespace(BaseNamespace):
def initialize(self):
self.response = None
self.argsByEvent = {}
def on_message(self, data):
self.response = data
def on_event(self, event, *args):
callback, args = find_callback(args)
if callback:
callback(*args)
self.argsByEvent[event] = args

View file

@ -1,192 +0,0 @@
import logging
import time
from unittest import TestCase
from .. import SocketIO, LoggingNamespace, find_callback
HOST = 'localhost'
PORT = 9000
DATA = 'xxx'
PAYLOAD = {'xxx': 'yyy'}
logging.basicConfig(level=logging.DEBUG)
class BaseMixin(object):
def setUp(self):
super(BaseMixin, self).setUp()
self.called_on_response = False
self.wait_time_in_seconds = 1
def tearDown(self):
super(BaseMixin, self).tearDown()
self.socketIO.disconnect()
def test_disconnect(self):
'Disconnect'
namespace = self.socketIO.define(Namespace)
self.assertTrue(self.socketIO.connected)
self.assertFalse(namespace.called_on_disconnect)
self.socketIO.disconnect()
self.assertTrue(namespace.called_on_disconnect)
self.assertFalse(self.socketIO.connected)
def test_emit(self):
'Emit'
namespace = self.socketIO.define(Namespace)
self.socketIO.emit('emit')
self.socketIO.wait(self.wait_time_in_seconds)
self.assertEqual(namespace.args_by_event, {
'emit_response': (),
})
def test_emit_with_payload(self):
'Emit with payload'
namespace = self.socketIO.define(Namespace)
self.socketIO.emit('emit_with_payload', PAYLOAD)
self.socketIO.wait(self.wait_time_in_seconds)
self.assertEqual(namespace.args_by_event, {
'emit_with_payload_response': (PAYLOAD,),
})
def test_emit_with_multiple_payloads(self):
'Emit with multiple payloads'
namespace = self.socketIO.define(Namespace)
self.socketIO.emit('emit_with_multiple_payloads', PAYLOAD, PAYLOAD)
self.socketIO.wait(self.wait_time_in_seconds)
self.assertEqual(namespace.args_by_event, {
'emit_with_multiple_payloads_response': (PAYLOAD, PAYLOAD),
})
def test_emit_with_callback(self):
'Emit with callback'
self.socketIO.emit('emit_with_callback', self.on_response)
self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds)
self.assertTrue(self.called_on_response)
def test_emit_with_callback_with_payload(self):
'Emit with callback with payload'
self.socketIO.emit(
'emit_with_callback_with_payload', self.on_response)
self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds)
self.assertTrue(self.called_on_response)
def test_emit_with_callback_with_multiple_payloads(self):
'Emit with callback with multiple payloads'
self.socketIO.emit(
'emit_with_callback_with_multiple_payloads', self.on_response)
self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds)
self.assertTrue(self.called_on_response)
def test_emit_with_event(self):
'Emit to trigger an event'
self.socketIO.on('emit_with_event_response', self.on_response)
self.socketIO.emit('emit_with_event', PAYLOAD)
self.socketIO.wait(self.wait_time_in_seconds)
self.assertTrue(self.called_on_response)
def test_send(self):
'Send'
namespace = self.socketIO.define(Namespace)
self.socketIO.send()
self.socketIO.wait(self.wait_time_in_seconds)
self.assertEqual(namespace.response, 'message_response')
def test_send_with_data(self):
'Send with data'
namespace = self.socketIO.define(Namespace)
self.socketIO.send(DATA)
self.socketIO.wait(self.wait_time_in_seconds)
self.assertEqual(namespace.response, DATA)
def test_ack(self):
'Respond to a server callback request'
namespace = self.socketIO.define(Namespace)
self.socketIO.emit('trigger_server_expects_callback', PAYLOAD)
self.socketIO.wait(self.wait_time_in_seconds)
self.assertEqual(namespace.args_by_event, {
'server_expects_callback': (PAYLOAD,),
'server_received_callback': (PAYLOAD,),
})
def test_wait_with_disconnect(self):
'Exit loop when the client wants to disconnect'
self.socketIO.define(Namespace)
self.socketIO.disconnect()
timeout_in_seconds = 5
start_time = time.time()
self.socketIO.wait(timeout_in_seconds)
self.assertTrue(time.time() - start_time < timeout_in_seconds)
def test_namespace_emit(self):
'Behave differently in different namespaces'
main_namespace = self.socketIO.define(Namespace)
chat_namespace = self.socketIO.define(Namespace, '/chat')
news_namespace = self.socketIO.define(Namespace, '/news')
news_namespace.emit('emit_with_payload', PAYLOAD)
self.socketIO.wait(self.wait_time_in_seconds)
self.assertEqual(main_namespace.args_by_event, {})
self.assertEqual(chat_namespace.args_by_event, {})
self.assertEqual(news_namespace.args_by_event, {
'emit_with_payload_response': (PAYLOAD,),
})
def test_namespace_ack(self):
'Respond to a server callback request within a namespace'
chat_namespace = self.socketIO.define(Namespace, '/chat')
chat_namespace.emit('trigger_server_expects_callback', PAYLOAD)
self.socketIO.wait(self.wait_time_in_seconds)
self.assertEqual(chat_namespace.args_by_event, {
'server_expects_callback': (PAYLOAD,),
'server_received_callback': (PAYLOAD,),
})
def on_response(self, *args):
for arg in args:
if isinstance(arg, dict):
self.assertEqual(arg, PAYLOAD)
else:
self.assertEqual(arg, DATA)
self.called_on_response = True
class Test_XHR_PollingTransport(BaseMixin, TestCase):
def setUp(self):
super(Test_XHR_PollingTransport, self).setUp()
self.socketIO = SocketIO(HOST, PORT, LoggingNamespace, transports=[
'xhr-polling'], verify=False)
self.assertEqual(self.socketIO.transport_name, 'xhr-polling')
class Test_WebsocketTransport(BaseMixin, TestCase):
def setUp(self):
super(Test_WebsocketTransport, self).setUp()
self.socketIO = SocketIO(HOST, PORT, LoggingNamespace, transports=[
'xhr-polling', 'websocket'], verify=False)
self.assertEqual(self.socketIO.transport_name, 'websocket')
class Namespace(LoggingNamespace):
def initialize(self):
self.called_on_disconnect = False
self.args_by_event = {}
self.response = None
def on_disconnect(self):
self.called_on_disconnect = True
def on_wait_with_disconnect_response(self):
self.disconnect()
def on_event(self, event, *args):
callback, args = find_callback(args)
if callback:
callback(*args)
self.args_by_event[event] = args
def on_message(self, data):
self.response = data

View file

@ -1,25 +0,0 @@
<script src="/socket.io/socket.io.js"></script>
<script>
var socket = io('//localhost');
var chat = io('/chat');
var news = io('/news');
socket.on('server_expects_callback', function(payload, fn) {fn(payload)});
socket.emit('trigger_server_expects_callback', 'whee');
socket.emit('emit');
socket.emit('emit_with_payload');
socket.emit('emit_with_multiple_payloads', 'aaa', 'bbb');
socket.emit('emit_with_callback', function() {console.log('whee')});
socket.emit('emit_with_callback_with_payload', function(x) {console.log('whee ' + x)});
socket.emit('emit_with_callback_with_multiple_payloads', function(x, y) {console.log('whee ' + x + ' ' + y)});
socket.emit('emit_with_event');
socket.emit('aaa');
chat.on('server_expects_callback', function(payload, fn) {fn(payload)});
chat.emit('trigger_server_expects_callback', 'whee');
chat.emit('emit_with_payload');
chat.emit('aaa');
news.emit('emit_with_payload');
news.emit('aaa');
</script>

View file

@ -1,36 +0,0 @@
var proxy = require('http-proxy').createProxyServer({
target: {host: 'localhost', port: 9000}
}).on('error', function(err, req, res) {
console.log('[ERROR] %s', err);
res.end();
});
var server = require('http').createServer(function(req, res) {
console.log('[REQUEST.%s] %s', req.method, req.url);
console.log(req['headers']);
if (req.method == 'POST') {
var body = '';
req.on('data', function (data) {
body += data;
});
req.on('end', function () {
print_body('[REQUEST.BODY] ', body);
});
}
var write = res.write;
res.write = function(data) {
print_body('[RESPONSE.BODY] ', data);
write.call(res, data);
}
proxy.web(req, res);
});
function print_body(header, body) {
var text = String(body);
console.log(header + text);
if (text.charCodeAt(0) != 0) return;
for (var i = 0; i < text.length; i++) {
var character_code = text.charCodeAt(i);
console.log('body[%s] = %s = %s', i, text[i], character_code);
if (character_code == 65533) break;
}
}
server.listen(8000);

View file

@ -1,98 +0,0 @@
// DEBUG=* node serve.js
var argv = require('yargs').argv;
if (argv.secure) {
var fs = require('fs');
var path = require('path');
var app = require('https').createServer({
key: fs.readFileSync(path.resolve(__dirname, 'ssl.key')),
cert: fs.readFileSync(path.resolve(__dirname, 'ssl.crt'))
}, serve);
} else {
var app = require('http').createServer(serve);
}
app.listen(9000);
var io = require('socket.io')(app);
var PAYLOAD = {'xxx': 'yyy'};
io.on('connection', function(socket) {
socket.on('message', function(data, fn) {
if (fn) {
// Client requests callback
if (data) {
fn(data);
} else {
fn();
}
} else if (typeof data === 'object') {
// Data has type object or is null
socket.json.send(data ? data : 'message_response');
} else {
// Data has type string or is ''
socket.send(data ? data : 'message_response');
}
});
socket.on('emit', function() {
socket.emit('emit_response');
});
socket.on('emit_with_payload', function(payload) {
socket.emit('emit_with_payload_response', payload);
});
socket.on('emit_with_multiple_payloads', function(payload1, payload2) {
socket.emit('emit_with_multiple_payloads_response', payload1, payload2);
});
socket.on('emit_with_callback', function(fn) {
fn();
});
socket.on('emit_with_callback_with_payload', function(fn) {
fn(PAYLOAD);
});
socket.on('emit_with_callback_with_multiple_payloads', function(fn) {
fn(PAYLOAD, PAYLOAD);
});
socket.on('emit_with_event', function(payload) {
socket.emit('emit_with_event_response', payload);
});
socket.on('trigger_server_expects_callback', function(payload) {
socket.emit('server_expects_callback', payload, function(payload) {
socket.emit('server_received_callback', payload);
});
});
socket.on('aaa', function() {
socket.emit('aaa_response', PAYLOAD);
});
socket.on('bbb', function(payload, fn) {
if (fn) fn(payload);
});
});
io.of('/chat').on('connection', function(socket) {
socket.on('emit_with_payload', function(payload) {
socket.emit('emit_with_payload_response', payload);
});
socket.on('aaa', function() {
socket.emit('aaa_response', 'in chat');
});
socket.on('trigger_server_expects_callback', function(payload) {
socket.emit('server_expects_callback', payload, function(payload) {
socket.emit('server_received_callback', payload);
});
});
});
io.of('/news').on('connection', function(socket) {
socket.on('emit_with_payload', function(payload) {
socket.emit('emit_with_payload_response', payload);
});
socket.on('aaa', function() {
socket.emit('aaa_response', 'in news');
});
});
function serve(req, res) {
fs.readFile(__dirname + '/index.html', function(err, data) {
res.writeHead(200);
res.end(data);
});
}

View file

@ -1,21 +0,0 @@
-----BEGIN CERTIFICATE-----
MIIDZTCCAk2gAwIBAgIJAK1HKQ8zF3cCMA0GCSqGSIb3DQEBBQUAMEkxCzAJBgNV
BAYTAlVTMQswCQYDVQQIDAJOWTELMAkGA1UEBwwCTlkxDDAKBgNVBAoMA1hZWjES
MBAGA1UEAwwJbG9jYWxob3N0MB4XDTE1MDQxNTE5NDUwNFoXDTE2MDQxNDE5NDUw
NFowSTELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAk5ZMQswCQYDVQQHDAJOWTEMMAoG
A1UECgwDWFlaMRIwEAYDVQQDDAlsb2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUA
A4IBDwAwggEKAoIBAQDAQUM9+xbiDeJXg+7X6HgXwla2AnGKWbZ11hZZUYbQwHyq
ABDSqRQXVWvzac6b59/trZiJ7cQEH4+c8ln1C4qbCLvr1aWkL1BDAtSbFUFhQ2Sb
R/xkSUpq35yTuR5+oHgahDg1gbgXgPhB3Y6HoBlYMSpSUKF+INu354kxfYi0t4tP
8f309KUe6eQH3gXgTBR7pPJEUpaPOsrk6UR3cHCMqyzHulyfhgvkk5FN+EtSR9ex
dIrF6WXmfynhsAa/+bxbsgeBF9MNj3zvckCzxdQStdqOvy0mu40/7i9vwguh9cRo
HDn6lx5EaE+gSGU48UNnKX5iQdqEhprNVDj31MiJAgMBAAGjUDBOMB0GA1UdDgQW
BBRkFsPxYU+e6ZSFwmzoS45qiOzAaDAfBgNVHSMEGDAWgBRkFsPxYU+e6ZSFwmzo
S45qiOzAaDAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBBQUAA4IBAQB4JyOA5bZ3
NbkMvOjpDw+tKzcNiXZIkGfoQ8NC1DbGVhgG7Ps4VjXgUB552YSUV7iwyd3G78OC
+cxEcr+BvxXHXL2Wlxy0c/ZgBjRI5VnGbYQjjI2Iy2qJV+x5IR2oZatv45soZSLq
NFCg2KpOgcSRgs0oDGVBYO0d9m73s/kOySj2NGqVJsaQXqXtLWKnqToaCfl4Vnl+
zcMdUv8ajBZEPRg6oNi2QIvcNT8fS5gd/T4OXBa7pYuC79yOZ1X6bkKsZrcAdNGM
zO/jH6jKFjIBBx1Of+uZTzfAj/eoTu3foPuUQ+Z9NNE2nkE6SLyBSlxE7wD+SfjS
4/J0PNj22Uh3
-----END CERTIFICATE-----

View file

@ -1,28 +0,0 @@
-----BEGIN PRIVATE KEY-----
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDAQUM9+xbiDeJX
g+7X6HgXwla2AnGKWbZ11hZZUYbQwHyqABDSqRQXVWvzac6b59/trZiJ7cQEH4+c
8ln1C4qbCLvr1aWkL1BDAtSbFUFhQ2SbR/xkSUpq35yTuR5+oHgahDg1gbgXgPhB
3Y6HoBlYMSpSUKF+INu354kxfYi0t4tP8f309KUe6eQH3gXgTBR7pPJEUpaPOsrk
6UR3cHCMqyzHulyfhgvkk5FN+EtSR9exdIrF6WXmfynhsAa/+bxbsgeBF9MNj3zv
ckCzxdQStdqOvy0mu40/7i9vwguh9cRoHDn6lx5EaE+gSGU48UNnKX5iQdqEhprN
VDj31MiJAgMBAAECggEANAFzbxC83+lhkMrfkQgRdFvdmN6QWBxsfvOql/61uUJY
dqQN6O5TwPwad33npcTTjjenS6hFndfrwUjNjLvSgp2aN/FTHVavH3FkkY7uYKEa
VebjHz20I7TZZhxtY1OFKacajV7JrZH1lduY8pccQ/8Is7ub88JvrQ+0zO5oTHnh
KEPYY5r2wLxKrzGm0NavRW9MpiHxz1vUGykvaGq9vR8dVFvZlLC5szYII+BvlII+
78XMnZbJ9ahT7dzfnzPdPPuyP3m4cdJ9c+7Advs0g2F3K/IDL3jZZCRZIaLxHIs0
PeI17teW0OmK4RWrnf6dSf0bww05x5In8GzUYgppAQKBgQD4lJVi3UmAB319CGeP
NE4cZFuMneNsuCkNEJONb8Cfsah2maM0if8tUNoR96JorjWgkUTG4oThGSQgJQw8
fPy6cW4EUhSvWCO+Q4MFFWpTcf0hBiz5O1d06FHVo39o8Ct9dv2bxJqfNtCUUf31
Fz5tvA+wvByOSazUC3AowQZ6FwKBgQDF/ksJbOBd/bu3ss7eJRjE2sXmuhfxrUiu
P5RoOEqHROAifatJk/3hwT6lx2y1g+3kJpZm9V16dNTkcuybL0yJ/VBE3uWuodrj
i9+wcg8XSnRp3BPVKzebKIKDTMdypOeb1f5yhx6cCtChRm1frKQdoXpMQqptM0jq
w3B4bryWXwKBgQCWSv+nLrPpvJ2aoyI56x3u/J59fliquw3W4FbWBOMpqnh4fJu4
gFbQRzoR8u82611xH2O9++brUhANf1jOmaMT9tDVu+rVuSyjNJ5azH/kw96PwPQg
HEjcXjpcOOYnxE4HJZJgQ5ZY/QNPKeOp88vC/RlfedyqCtF7ww6lFU+dMQKBgQC2
M7ut4sne9R8If74rZAwVLBauq1ZZi1O1NsFF33eGX/W7B9bXER+z3vfd61W4/L2x
FWmXOflaNaWsza27aZ2P5tM1bcIEIOKkQBYL9Aq7LkNPH74Ij4rOeEsStVddwy94
k0di8cFTbAhuQbdpMiCdO/qlrzvS3j0d/djEm3NlFQKBgQCpIrHaMcckCFsf2Y6o
zMnbi3859hve94OOJjauQLlw/nRE/+OaDsDN8iJoxnK0seek8ro1ixSBTScpuX8W
G2DBgqs9NrSQLe6FAckkGqVJdluoh5GewNneAcowkkauj2srnb6XtJDhFtTDY141
EPbeqGB9PUY9Ny8VzHkAb1vi6g==
-----END PRIVATE KEY-----

View file

@ -1,200 +0,0 @@
import requests
import six
import socket
import ssl
import sys
import threading
import time
import websocket
from six import string_types
from .exceptions import ConnectionError, TimeoutError
from .parsers import (
encode_engineIO_content, decode_engineIO_content,
format_packet_text, parse_packet_text)
from .symmetries import format_query, memoryview, parse_url
if not hasattr(websocket, 'create_connection'):
sys.exit("""\
An incompatible websocket library is conflicting with the one we need.
You can remove the incompatible library and install the correct one
by running the following commands:
yes | pip uninstall websocket websocket-client
pip install -U websocket-client""")
ENGINEIO_PROTOCOL = 3
TRANSPORTS = 'xhr-polling', 'websocket'
class AbstractTransport(object):
def __init__(self, http_session, is_secure, url, engineIO_session=None):
self.http_session = http_session
self.is_secure = is_secure
self.url = url
self.engineIO_session = engineIO_session
def recv_packet(self):
pass
def send_packet(self, engineIO_packet_type, engineIO_packet_data=''):
pass
def set_timeout(self, seconds=None):
pass
class XHR_PollingTransport(AbstractTransport):
def __init__(self, http_session, is_secure, url, engineIO_session=None):
super(XHR_PollingTransport, self).__init__(
http_session, is_secure, url, engineIO_session)
self._params = {
'EIO': ENGINEIO_PROTOCOL, 'transport': 'polling'}
if engineIO_session:
self._request_index = 1
self._kw_get = dict(
timeout=engineIO_session.ping_timeout)
self._kw_post = dict(
timeout=engineIO_session.ping_timeout,
headers={'content-type': 'application/octet-stream'})
self._params['sid'] = engineIO_session.id
else:
self._request_index = 0
self._kw_get = {}
self._kw_post = {}
http_scheme = 'https' if is_secure else 'http'
self._http_url = '%s://%s/' % (http_scheme, url)
self._request_index_lock = threading.Lock()
self._send_packet_lock = threading.Lock()
def recv_packet(self):
params = dict(self._params)
params['t'] = self._get_timestamp()
response = get_response(
self.http_session.get,
self._http_url,
params=params,
**self._kw_get)
for engineIO_packet in decode_engineIO_content(response.content):
engineIO_packet_type, engineIO_packet_data = engineIO_packet
yield engineIO_packet_type, engineIO_packet_data
def send_packet(self, engineIO_packet_type, engineIO_packet_data=''):
with self._send_packet_lock:
params = dict(self._params)
params['t'] = self._get_timestamp()
data = encode_engineIO_content([
(engineIO_packet_type, engineIO_packet_data),
])
response = get_response(
self.http_session.post,
self._http_url,
params=params,
data=memoryview(data),
**self._kw_post)
assert response.content == b'ok'
def _get_timestamp(self):
with self._request_index_lock:
timestamp = '%s-%s' % (
int(time.time() * 1000), self._request_index)
self._request_index += 1
return timestamp
class WebsocketTransport(AbstractTransport):
def __init__(self, http_session, is_secure, url, engineIO_session=None):
super(WebsocketTransport, self).__init__(
http_session, is_secure, url, engineIO_session)
params = dict(http_session.params, **{
'EIO': ENGINEIO_PROTOCOL, 'transport': 'websocket'})
request = http_session.prepare_request(requests.Request('GET', url))
kw = {'header': ['%s: %s' % x for x in request.headers.items()]}
if engineIO_session:
params['sid'] = engineIO_session.id
kw['timeout'] = self._timeout = engineIO_session.ping_timeout
ws_url = '%s://%s/?%s' % (
'wss' if is_secure else 'ws', url, format_query(params))
http_scheme = 'https' if is_secure else 'http'
if http_scheme in http_session.proxies: # Use the correct proxy
proxy_url_pack = parse_url(http_session.proxies[http_scheme])
kw['http_proxy_host'] = proxy_url_pack.hostname
kw['http_proxy_port'] = proxy_url_pack.port
if proxy_url_pack.username:
kw['http_proxy_auth'] = (
proxy_url_pack.username, proxy_url_pack.password)
if http_session.verify:
if http_session.cert: # Specify certificate path on disk
if isinstance(http_session.cert, string_types):
kw['ca_certs'] = http_session.cert
else:
kw['ca_certs'] = http_session.cert[0]
else: # Do not verify the SSL certificate
kw['sslopt'] = {'cert_reqs': ssl.CERT_NONE}
try:
self._connection = websocket.create_connection(ws_url, **kw)
except Exception as e:
raise ConnectionError(e)
def recv_packet(self):
try:
packet_text = self._connection.recv()
except websocket.WebSocketTimeoutException as e:
raise TimeoutError('recv timed out (%s)' % e)
except websocket.SSLError as e:
raise ConnectionError('recv disconnected by SSL (%s)' % e)
except websocket.WebSocketConnectionClosedException as e:
raise ConnectionError('recv disconnected (%s)' % e)
except socket.error as e:
raise ConnectionError('recv disconnected (%s)' % e)
engineIO_packet_type, engineIO_packet_data = parse_packet_text(
six.b(packet_text))
yield engineIO_packet_type, engineIO_packet_data
def send_packet(self, engineIO_packet_type, engineIO_packet_data=''):
packet = format_packet_text(engineIO_packet_type, engineIO_packet_data)
try:
self._connection.send(packet)
except websocket.WebSocketTimeoutException as e:
raise TimeoutError('send timed out (%s)' % e)
except socket.error as e:
raise ConnectionError('send disconnected (%s)' % e)
except websocket.WebSocketConnectionClosedException as e:
raise ConnectionError('send disconnected (%s)' % e)
def set_timeout(self, seconds=None):
self._connection.settimeout(seconds or self._timeout)
def get_response(request, *args, **kw):
try:
response = request(*args, stream=True, **kw)
except requests.exceptions.Timeout as e:
raise TimeoutError(e)
except requests.exceptions.ConnectionError as e:
raise ConnectionError(e)
except requests.exceptions.SSLError as e:
raise ConnectionError('could not negotiate SSL (%s)' % e)
status_code = response.status_code
if 200 != status_code:
raise ConnectionError('unexpected status code (%s %s)' % (
status_code, response.text))
return response
def prepare_http_session(kw):
http_session = requests.Session()
http_session.headers.update(kw.get('headers', {}))
http_session.auth = kw.get('auth')
http_session.proxies.update(kw.get('proxies', {}))
http_session.hooks.update(kw.get('hooks', {}))
http_session.params.update(kw.get('params', {}))
http_session.verify = kw.get('verify', True)
http_session.cert = kw.get('cert')
http_session.cookies.update(kw.get('cookies', {}))
return http_session