Merged 0.4

This commit is contained in:
Roy Hyunjin Han 2013-04-26 09:42:16 -07:00
commit 4f7185b074
11 changed files with 673 additions and 388 deletions

9
.gitignore vendored
View file

@ -1,8 +1,9 @@
*~
*.sw[op]
*.py[cod]
*.egg
*.egg-info
*.pyc
*.swo
*.swp
.coverage
build
dist
sdist
.coverage

View file

@ -1,8 +1,14 @@
0.4
---
- Added support for server-side callbacks thanks to Zac Lee
- Added low-level _SocketIO to remove cyclic references
- Merged Channel functionality into BaseNamespace thanks to Alexandre Bourget
0.3
---
- Added support for secure connections
- Added socketIO.wait()
- Improved exception handling in heartbeatThread and namespaceThread
- Improved exception handling in _RhythmicThread and _ListenerThread
0.2
---

View file

@ -1,4 +1,4 @@
Copyright (c) 2012 Roy Hyunjin Han and contributors
Copyright (c) 2013 Roy Hyunjin Han and contributors
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

View file

@ -2,12 +2,6 @@ socketIO-client
===============
Here is a socket.io_ client library for Python. You can use it to write test code for your socket.io_ server.
Thanks to rod_ for the `StackOverflow question and answer`__ on which this code is based.
Thanks to liris_ for websocket-client_ and to guille_ for the `socket.io specification`_.
Thanks to `Paul Kienzle`_, `Josh VanderLinden`_, `Ian Fitzpatrick`_ for submitting code to expand support of the socket.io protocol.
Installation
------------
@ -22,7 +16,7 @@ Installation
source $VIRTUAL_ENV/bin/activate
# Install package
easy_install -U socketIO-client
pip install -U socketIO-client
Usage
@ -36,31 +30,32 @@ Emit. ::
from socketIO_client import SocketIO
socketIO = SocketIO('localhost', 8000)
socketIO.emit('aaa', {'bbb': 'ccc'})
socketIO.wait(seconds=1)
with SocketIO('localhost', 8000) as socketIO:
socketIO.emit('aaa')
socketIO.wait(seconds=1)
Emit with callback. ::
from socketIO_client import SocketIO
def on_response(*args):
print args
def on_bbb_response(*args):
print 'on_bbb_response', args
socketIO = SocketIO('localhost', 8000)
socketIO.emit('aaa', {'bbb': 'ccc'}, on_response)
socketIO.wait(forCallbacks=True)
with SocketIO('localhost', 8000) as socketIO:
socketIO.emit('bbb', {'xxx': 'yyy'}, on_bbb_response)
socketIO.wait_for_callbacks(seconds=1)
Define events. ::
from socketIO_client import SocketIO
def on_ddd(*args):
print args
def on_aaa_response(*args):
print 'on_aaa_response', args
socketIO = SocketIO('localhost', 8000)
socketIO.on('ddd', on_ddd)
socketIO.wait()
socketIO.on('aaa_response', on_aaa_response)
socketIO.emit('aaa')
socketIO.wait(seconds=1)
Define events in a namespace. ::
@ -68,11 +63,14 @@ Define events in a namespace. ::
class Namespace(BaseNamespace):
def on_ddd(self, *args):
self.socketIO.emit('eee', {'fff': 'ggg'})
def on_aaa_response(self, *args):
print 'on_aaa_response', args
self.emit('bbb')
socketIO = SocketIO('localhost', 8000, Namespace)
socketIO.wait()
socketIO = SocketIO('localhost', 8000)
socketIO.define(Namespace)
socketIO.emit('aaa')
socketIO.wait(seconds=1)
Define standard events. ::
@ -80,44 +78,40 @@ Define standard events. ::
class Namespace(BaseNamespace):
def on_connect(self, socketIO):
def on_connect(self):
print '[Connected]'
def on_disconnect(self):
print '[Disconnected]'
socketIO = SocketIO('localhost', 8000)
socketIO.define(Namespace)
socketIO.wait(seconds=1)
def on_error(self, name, message):
print '[Error] %s: %s' % (name, message)
def on_message(self, id, message):
print '[Message] %s: %s' % (id, message)
socketIO = SocketIO('localhost', 8000, Namespace)
socketIO.wait()
Define different behavior for different channels on a single socket. ::
Define different namespaces on a single socket. ::
from socketIO_client import SocketIO, BaseNamespace
class MainNamespace(BaseNamespace):
def on_aaa(self, *args):
print 'aaa', args
class ChatNamespace(BaseNamespace):
def on_bbb(self, *args):
print 'bbb', args
def on_aaa_response(self, *args):
print 'on_aaa_response', args
class NewsNamespace(BaseNamespace):
def on_ccc(self, *args):
print 'ccc', args
def on_aaa_response(self, *args):
print 'on_aaa_response', args
mainSocket = SocketIO('localhost', 8000, MainNamespace)
chatSocket = mainSocket.connect('/chat', ChatNamespace)
newsSocket = mainSocket.connect('/news', NewsNamespace)
mainSocket.wait()
socketIO = SocketIO('localhost', 8000)
chatNamespace = socketIO.define(ChatNamespace, '/chat')
newsNamespace = socketIO.define(NewsNamespace, '/news')
chatNamespace.emit('aaa')
newsNamespace.emit('aaa')
socketIO.wait(seconds=1)
Open secure websockets (HTTPS / WSS) behind a proxy. ::
SocketIO('localhost', 8000,
secure=True,
proxies={'https': 'https://proxy.example.com:8080'})
License
@ -125,14 +119,31 @@ License
This software is available under the MIT License.
Credits
-------
- `Guillermo Rauch`_ wrote the `socket.io specification`_.
- `Hiroki Ohtani`_ wrote websocket-client_.
- rod_ wrote a `prototype for a Python client to a socket.io server`_ on StackOverflow.
- `Alexandre Bourget`_ wrote gevent-socketio_, which is a socket.io server written in Python.
- `Paul Kienzle`_, `Zac Lee`_, `Josh VanderLinden`_, `Ian Fitzpatrick`_, `Lucas Klein`_ submitted code to expand support of the socket.io protocol.
.. _socket.io: http://socket.io
.. _rod: http://stackoverflow.com/users/370115/rod
.. _StackOverflowQA: http://stackoverflow.com/questions/6692908/formatting-messages-to-send-to-socket-io-node-js-server-from-python-client
__ StackOverflowQA_
.. _liris: https://github.com/liris
.. _websocket-client: https://github.com/liris/websocket-client
.. _guille: https://github.com/guille
.. _Guillermo Rauch: https://github.com/guille
.. _socket.io specification: https://github.com/LearnBoost/socket.io-spec
.. _Hiroki Ohtani: https://github.com/liris
.. _websocket-client: https://github.com/liris/websocket-client
.. _rod: http://stackoverflow.com/users/370115/rod
.. _prototype for a Python client to a socket.io server: http://stackoverflow.com/questions/6692908/formatting-messages-to-send-to-socket-io-node-js-server-from-python-client
.. _Alexandre Bourget: https://github.com/abourget
.. _gevent-socketio: https://github.com/abourget/gevent-socketio
.. _Paul Kienzle: https://github.com/pkienzle
.. _Zac Lee: https://github.com/zratic
.. _Josh VanderLinden: https://github.com/codekoala
.. _Ian Fitzpatrick: https://github.com/GraphEffect
.. _Lucas Klein: https://github.com/lukashed

7
TODO.goals Normal file
View file

@ -0,0 +1,7 @@
= Resolve pull requests
Resolve issues
Investigate issue #8
Examine forks
Integrate Sajal's fork #7
Integrate Francis's fork #10
Integrate Paul's fork

View file

71
serve_tests.js Normal file
View file

@ -0,0 +1,71 @@
var io = require('socket.io').listen(8000);
var main = io.of('').on('connection', function(socket) {
socket.on('message', function(data, fn) {
if (fn) { // Client expects a callback
if (data) {
fn(data);
} else {
fn();
}
} else if (typeof data === 'object') {
socket.json.send(data ? data : 'message_response'); // object or null
} else {
socket.send(data ? data : 'message_response'); // string or ''
}
});
socket.on('emit', function() {
socket.emit('emit_response');
});
socket.on('emit_with_payload', function(payload) {
socket.emit('emit_with_payload_response', payload);
});
socket.on('emit_with_multiple_payloads', function(payload, payload) {
socket.emit('emit_with_multiple_payloads_response', payload, payload);
});
socket.on('emit_with_callback', function(fn) {
fn();
});
socket.on('emit_with_callback_with_payload', function(fn) {
fn(PAYLOAD);
});
socket.on('emit_with_callback_with_multiple_payloads', function(fn) {
fn(PAYLOAD, PAYLOAD);
});
socket.on('emit_with_event', function(payload) {
socket.emit('emit_with_event_response', payload);
});
socket.on('ack', function(payload) {
socket.emit('ack_response', payload, function(payload) {
socket.emit('ack_callback_response', payload);
});
});
socket.on('aaa', function() {
socket.emit('aaa_response', PAYLOAD);
});
socket.on('bbb', function(payload, fn) {
if (fn) {
fn(payload);
}
});
});
var chat = io.of('/chat').on('connection', function (socket) {
socket.on('emit_with_payload', function(payload) {
socket.emit('emit_with_payload_response', payload);
});
socket.on('aaa', function() {
socket.emit('aaa_response', 'in chat');
});
});
var news = io.of('/news').on('connection', function (socket) {
socket.on('emit_with_payload', function(payload) {
socket.emit('emit_with_payload_response', payload);
});
socket.on('aaa', function() {
socket.emit('aaa_response', 'in news');
});
});
var PAYLOAD = {'xxx': 'yyy'};

View file

@ -1,29 +0,0 @@
'Launch this server in another terminal window before running tests'
from socketio import socketio_manage
from socketio.namespace import BaseNamespace
from socketio.server import SocketIOServer
class Namespace(BaseNamespace):
def on_aaa(self, *args):
self.socket.send_packet(dict(
type='event',
name='ddd',
args=args,
endpoint=self.ns_name))
class Application(object):
def __call__(self, environ, start_response):
socketio_manage(environ, {
'': Namespace,
'/chat': Namespace,
'/news': Namespace,
})
if __name__ == '__main__':
socketIOServer = SocketIOServer(('0.0.0.0', 8000), Application())
socketIOServer.serve_forever()

2
setup.py Executable file → Normal file
View file

@ -9,7 +9,7 @@ CHANGES = open(os.path.join(here, 'CHANGES.rst')).read()
setup(
name='socketIO-client',
version='0.3',
version='0.4',
description='A socket.io client library',
long_description=README + '\n\n' + CHANGES,
license='MIT',

View file

@ -1,35 +1,55 @@
import websocket
from anyjson import dumps, loads
import socket
from json import dumps, loads
from threading import Thread, Event
from time import sleep
from urllib import urlopen
from websocket import WebSocketConnectionClosedException, create_connection
__version__ = '0.3'
PROTOCOL = 1 # SocketIO protocol version
PROTOCOL = 1 # socket.io protocol version
class BaseNamespace(object): # pragma: no cover
'Define socket.io behavior'
def __init__(self, socketIO):
self.socketIO = socketIO
def __init__(self, _socketIO, path):
self._socketIO = _socketIO
self._path = path
self._callbackByEvent = {}
self.initialize()
def on_connect(self, socketIO):
def initialize(self):
'Initialize custom variables here; you can override this method'
pass
def on_connect(self):
'Called when socket is connecting; you can override this method'
pass
def on_disconnect(self):
'Called when socket is disconnecting; you can override this method'
pass
def on_error(self, reason, advice):
'Called when server sends an error; you can override this method'
print '[Error] %s' % advice
def on_message(self, messageData):
print '[Message] %s' % messageData
def on_message(self, data):
'Called when server sends a message; you can override this method'
print '[Message] %s' % data
def on_(self, eventName, *eventArguments):
print '[Event] %s%s' % (eventName, eventArguments)
def on_event(self, event, *args):
"""
Called when server emits an event; you can override this method.
Called only if the program cannot find a more specific event handler,
such as one defined by namespace.on('my_event', my_function).
"""
callback, args = find_callback(args)
arguments = [repr(_) for _ in args]
if callback:
arguments.append('callback(*args)')
callback(*args)
print '[Event] %s(%s)' % (event, ', '.join(arguments))
def on_open(self, *args):
print '[Open]', args
@ -43,143 +63,96 @@ class BaseNamespace(object): # pragma: no cover
def on_reconnect(self, *args):
print '[Reconnect]', args
def message(self, data='', callback=None):
self._socketIO.message(data, callback, path=self._path)
def emit(self, event, *args, **kw):
kw['path'] = self._path
self._socketIO.emit(event, *args, **kw)
def on(self, event, callback):
'Define a callback to handle a custom event emitted by the server'
self._callbackByEvent[event] = callback
def _get_eventCallback(self, event):
# Check callbacks defined by on()
try:
return self._callbackByEvent[event]
except KeyError:
pass
# Check callbacks defined explicitly or use on_event()
callback = lambda *args: self.on_event(event, *args)
return getattr(self, 'on_' + event.replace(' ', '_'), callback)
class SocketIO(object):
messageID = 0
def __init__(self, host, port, secure=False, proxies=None):
"""
Create a socket.io client that connects to a socket.io server
at the specified host and port. Set secure=True to use HTTPS / WSS.
def __init__(self, host, port, Namespace=BaseNamespace, secure=False):
self.host = host
self.port = int(port)
self.namespace = Namespace(self)
self.secure = secure
self.__connect()
SocketIO('localhost', 8000, secure=True,
proxies={'https': 'https://proxy.example.com:8080'})
"""
self._socketIO = _SocketIO(host, port, secure, proxies)
self._namespaceByPath = {}
self.define(BaseNamespace) # Define default namespace
heartbeatInterval = self.heartbeatTimeout - 2
self.heartbeatThread = RhythmicThread(heartbeatInterval,
self._send_heartbeat)
self.heartbeatThread.start()
self._rhythmicThread = _RhythmicThread(
self._socketIO.heartbeatInterval,
self._socketIO.send_heartbeat)
self._rhythmicThread.start()
self.channelByName = {}
self.callbackByEvent = {}
self.namespaceThread = ListenerThread(self)
self.namespaceThread.start()
self._listenerThread = _ListenerThread(
self._socketIO,
self._namespaceByPath)
self._listenerThread.start()
def __del__(self): # pragma: no cover
self.heartbeatThread.cancel()
self.namespaceThread.cancel()
self.connection.close()
def __enter__(self):
return self
def __connect(self):
baseURL = '%s:%d/socket.io/%s' % (self.host, self.port, PROTOCOL)
try:
response = urlopen('%s://%s/' % (
'https' if self.secure else 'http', baseURL))
except IOError: # pragma: no cover
raise SocketIOError('Could not start connection')
if 200 != response.getcode(): # pragma: no cover
raise SocketIOError('Could not establish connection')
responseParts = response.readline().split(':')
self.sessionID = responseParts[0]
self.heartbeatTimeout = int(responseParts[1])
self.connectionTimeout = int(responseParts[2])
self.supportedTransports = responseParts[3].split(',')
if 'websocket' not in self.supportedTransports:
raise SocketIOError('Could not parse handshake') # pragma: no cover
socketURL = '%s://%s/websocket/%s' % (
'wss' if self.secure else 'ws', baseURL, self.sessionID)
self.connection = websocket.create_connection(socketURL)
def __exit__(self, exc_type, exc_value, traceback):
self.disconnect()
def _recv_packet(self):
code, packetID, channelName, data = -1, None, None, None
packet = self.connection.recv()
packetParts = packet.split(':', 3)
packetCount = len(packetParts)
if 4 == packetCount:
code, packetID, channelName, data = packetParts
elif 3 == packetCount:
code, packetID, channelName = packetParts
elif 1 == packetCount: # pragma: no cover
code = packetParts[0]
return int(code), packetID, channelName, data
def _send_packet(self, code, channelName='', data='', callback=None):
self.connection.send(':'.join([
str(code),
self.set_callback(callback) if callback else '',
channelName,
data]))
def disconnect(self, channelName=''):
self._send_packet(0, channelName)
if channelName:
del self.channelByName[channelName]
else:
self.__del__()
def __del__(self):
self.disconnect(close=False)
@property
def connected(self):
return self.connection.connected
return self._socketIO.connected
def connect(self, channelName, Namespace=BaseNamespace):
channel = Channel(self, channelName, Namespace)
self.channelByName[channelName] = channel
self._send_packet(1, channelName)
return channel
def _send_heartbeat(self):
try:
self._send_packet(2)
except:
self.__del__()
def message(self, messageData, callback=None, channelName=''):
if isinstance(messageData, basestring):
code = 3
data = messageData
def disconnect(self, path='', close=True):
if self.connected:
self._socketIO.disconnect(path, close)
if path:
del self._namespaceByPath[path]
else:
code = 4
data = dumps(messageData)
self._send_packet(code, channelName, data, callback)
self._rhythmicThread.cancel()
self._listenerThread.cancel()
def emit(self, eventName, *eventArguments, **eventKeywords):
code = 5
if callable(eventArguments[-1]):
callback = eventArguments[-1]
eventArguments = eventArguments[:-1]
else:
callback = None
channelName = eventKeywords.get('channelName', '')
data = dumps(dict(name=eventName, args=eventArguments))
self._send_packet(code, channelName, data, callback)
def define(self, Namespace, path=''):
if path:
self._socketIO.connect(path)
namespace = Namespace(self._socketIO, path)
self._namespaceByPath[path] = namespace
return namespace
def get_callback(self, channelName, eventName):
'Get callback associated with channelName and eventName'
socketIO = self.channelByName[channelName] if channelName else self
try:
return socketIO.callbackByEvent[eventName]
except KeyError:
pass
namespace = socketIO.namespace
def get_namespace(self, path=''):
return self._namespaceByPath[path]
def callback_(*eventArguments):
return namespace.on_(eventName, *eventArguments)
return getattr(namespace, name_callback(eventName), callback_)
def on(self, event, callback, path=''):
return self.get_namespace(path).on(event, callback)
def set_callback(self, callback):
'Set callback that will be called after receiving an acknowledgment'
self.messageID += 1
self.namespaceThread.set_callback(self.messageID, callback)
return '%s+' % self.messageID
def message(self, data='', callback=None, path=''):
self._socketIO.message(data, callback, path)
def on(self, eventName, callback):
self.callbackByEvent[eventName] = callback
def emit(self, event, *args, **kw):
self._socketIO.emit(event, *args, **kw)
def wait(self, seconds=None, forCallbacks=False):
if forCallbacks:
self.namespaceThread.wait_for_callbacks(seconds)
elif seconds:
sleep(seconds)
def wait(self, seconds=None):
if seconds:
self._listenerThread.wait(seconds)
else:
try:
while self.connected:
@ -187,149 +160,281 @@ class SocketIO(object):
except KeyboardInterrupt:
pass
class Channel(object):
def __init__(self, socketIO, channelName, Namespace):
self.socketIO = socketIO
self.channelName = channelName
self.namespace = Namespace(self)
self.callbackByEvent = {}
def disconnect(self):
self.socketIO.disconnect(self.channelName)
def emit(self, eventName, *eventArguments):
self.socketIO.emit(eventName, *eventArguments,
channelName=self.channelName)
def message(self, messageData, callback=None):
self.socketIO.message(messageData, callback,
channelName=self.channelName)
def on(self, eventName, eventCallback):
self.callbackByEvent[eventName] = eventCallback
def wait_for_callbacks(self, seconds=None):
self._listenerThread.wait_for_callbacks(seconds)
class ListenerThread(Thread):
'Process messages from SocketIO server'
class _RhythmicThread(Thread):
'Execute call every few seconds'
daemon = True
def __init__(self, socketIO):
super(ListenerThread, self).__init__()
self.socketIO = socketIO
self.done = Event()
self.waitingForCallbacks = Event()
self.callbackByMessageID = {}
self.get_callback = self.socketIO.get_callback
def run(self):
while not self.done.is_set():
try:
code, packetID, channelName, data = self.socketIO._recv_packet()
except:
continue
try:
delegate = {
0: self.on_disconnect,
1: self.on_connect,
2: self.on_heartbeat,
3: self.on_message,
4: self.on_json,
5: self.on_event,
6: self.on_acknowledgment,
7: self.on_error,
}[code]
except KeyError:
continue
delegate(packetID, channelName, data)
def cancel(self):
self.done.set()
def wait_for_callbacks(self, seconds):
self.waitingForCallbacks.set()
self.join(seconds)
def set_callback(self, messageID, callback):
self.callbackByMessageID[messageID] = callback
def on_disconnect(self, packetID, channelName, data):
callback = self.get_callback(channelName, 'disconnect')
callback()
def on_connect(self, packetID, channelName, data):
callback = self.get_callback(channelName, 'connect')
callback(self.socketIO)
def on_heartbeat(self, packetID, channelName, data):
pass
def on_message(self, packetID, channelName, data):
callback = self.get_callback(channelName, 'message')
callback(data)
def on_json(self, packetID, channelName, data):
callback = self.get_callback(channelName, 'message')
callback(loads(data))
def on_event(self, packetID, channelName, data):
valueByName = loads(data)
eventName = valueByName['name']
eventArguments = valueByName.get('args', [])
callback = self.get_callback(channelName, eventName)
callback(*eventArguments)
def on_acknowledgment(self, packetID, channelName, data):
dataParts = data.split('+', 1)
messageID = int(dataParts[0])
arguments = loads(dataParts[1]) or []
try:
callback = self.callbackByMessageID[messageID]
except KeyError:
pass
else:
del self.callbackByMessageID[messageID]
callback(*arguments)
callbackCount = len(self.callbackByMessageID)
if self.waitingForCallbacks.is_set() and not callbackCount:
self.cancel()
def on_error(self, packetID, channelName, data):
reason, advice = data.split('+', 1)
callback = self.get_callback(channelName, 'error')
callback(reason, advice)
class RhythmicThread(Thread):
'Execute rhythmicFunction every few seconds'
daemon = True
def __init__(self, intervalInSeconds, rhythmicFunction, *args, **kw):
super(RhythmicThread, self).__init__()
def __init__(self, intervalInSeconds, call, *args, **kw):
super(_RhythmicThread, self).__init__()
self.intervalInSeconds = intervalInSeconds
self.rhythmicFunction = rhythmicFunction
self.call = call
self.args = args
self.kw = kw
self.done = Event()
def run(self):
try:
while not self.done.is_set():
self.rhythmicFunction(*self.args, **self.kw)
self.done.wait(self.intervalInSeconds)
except:
pass
while not self.done.is_set():
self.call(*self.args, **self.kw)
self.done.wait(self.intervalInSeconds)
def cancel(self):
self.done.set()
class _ListenerThread(Thread):
'Process messages from socket.io server'
daemon = True
def __init__(self, _socketIO, _namespaceByPath):
super(_ListenerThread, self).__init__()
self._socketIO = _socketIO
self._namespaceByPath = _namespaceByPath
self.done = Event()
self.ready = Event()
self.ready.set()
def cancel(self):
self.done.set()
def wait(self, seconds):
self.done.wait(seconds)
def wait_for_callbacks(self, seconds):
self.ready.clear()
self.ready.wait(seconds)
def get_ackCallback(self, packetID):
return lambda *args: self._socketIO.ack(packetID, *args)
def run(self):
while not self.done.is_set():
try:
code, packetID, path, data = self._socketIO.recv_packet()
except SocketIOConnectionError, error:
print error
return
except SocketIOPacketError, error:
print error
continue
try:
namespace = self._namespaceByPath[path]
except KeyError:
print 'Received unexpected path (%s)' % path
continue
try:
delegate = {
'0': self.on_disconnect,
'1': self.on_connect,
'2': self.on_heartbeat,
'3': self.on_message,
'4': self.on_json,
'5': self.on_event,
'6': self.on_ack,
'7': self.on_error,
}[code]
except KeyError:
print 'Received unexpected code (%s)' % code
continue
delegate(packetID, namespace._get_eventCallback, data)
def on_disconnect(self, packetID, get_eventCallback, data):
get_eventCallback('disconnect')()
def on_connect(self, packetID, get_eventCallback, data):
get_eventCallback('connect')()
def on_heartbeat(self, packetID, get_eventCallback, data):
pass
def on_message(self, packetID, get_eventCallback, data):
args = [data]
if packetID:
args.append(self.get_ackCallback(packetID))
get_eventCallback('message')(*args)
def on_json(self, packetID, get_eventCallback, data):
args = [loads(data)]
if packetID:
args.append(self.get_ackCallback(packetID))
get_eventCallback('message')(*args)
def on_event(self, packetID, get_eventCallback, data):
valueByName = loads(data)
event = valueByName['name']
args = valueByName.get('args', [])
if packetID:
args.append(self.get_ackCallback(packetID))
get_eventCallback(event)(*args)
def on_ack(self, packetID, get_eventCallback, data):
dataParts = data.split('+', 1)
messageID = int(dataParts[0])
args = loads(dataParts[1]) if len(dataParts) > 1 else []
callback = self._socketIO.get_messageCallback(messageID)
if not callback:
return
callback(*args)
if not self._socketIO.has_messageCallback:
self.ready.set()
def on_error(self, packetID, get_eventCallback, data):
reason, advice = data.split('+', 1)
get_eventCallback('error')(reason, advice)
class _SocketIO(object):
'Low-level interface to remove cyclic references in child threads'
messageID = 0
def __init__(self, host, port, secure, proxies):
baseURL = '%s:%d/socket.io/%s' % (host, port, PROTOCOL)
targetScheme = 'https' if secure else 'http'
targetURL = '%s://%s/' % (targetScheme, baseURL)
try:
response = urlopen(targetURL, proxies=proxies)
except IOError: # pragma: no cover
raise SocketIOError('Could not start connection')
if 200 != response.getcode(): # pragma: no cover
raise SocketIOError('Could not establish connection')
responseParts = response.readline().split(':')
sessionID = responseParts[0]
heartbeatTimeout = int(responseParts[1])
# connectionTimeout = int(responseParts[2])
supportedTransports = responseParts[3].split(',')
if 'websocket' not in supportedTransports:
raise SocketIOError('Could not parse handshake')
socketScheme = 'wss' if secure else 'ws'
socketURL = '%s://%s/websocket/%s' % (socketScheme, baseURL, sessionID)
self.connection = create_connection(socketURL)
self.heartbeatInterval = heartbeatTimeout - 2
self.callbackByMessageID = {}
def __del__(self):
self.disconnect(close=False)
def disconnect(self, path='', close=True):
if not self.connected:
return
if path:
self.send_packet(0, path)
elif close:
self.connection.close()
def connect(self, path):
self.send_packet(1, path)
def send_heartbeat(self):
try:
self.send_packet(2)
except SocketIOPacketError:
print 'Could not send heartbeat'
pass
def message(self, data, callback, path):
if isinstance(data, basestring):
code = 3
packetData = data
else:
code = 4
packetData = dumps(data, ensure_ascii=False)
self.send_packet(code, path, packetData, callback)
def emit(self, event, *args, **kw):
callback, args = find_callback(args, kw)
packetData = dumps(dict(name=event, args=args), ensure_ascii=False)
path = kw.get('path', '')
self.send_packet(5, path, packetData, callback)
def ack(self, packetID, *args):
packetID = packetID.rstrip('+')
packetData = '%s+%s' % (
packetID,
dumps(args, ensure_ascii=False),
) if args else packetID
self.send_packet(6, data=packetData)
def set_messageCallback(self, callback):
'Set callback that will be called after receiving an acknowledgment'
self.messageID += 1
self.callbackByMessageID[self.messageID] = callback
return '%s+' % self.messageID
def get_messageCallback(self, messageID):
try:
callback = self.callbackByMessageID[messageID]
del self.callbackByMessageID[messageID]
return callback
except KeyError:
return
@property
def has_messageCallback(self):
return True if self.callbackByMessageID else False
def recv_packet(self):
try:
packet = self.connection.recv()
except WebSocketConnectionClosedException:
text = 'Lost connection (Connection closed)'
raise SocketIOConnectionError(text)
except socket.timeout:
text = 'Lost connection (Connection timed out)'
raise SocketIOConnectionError(text)
except socket.error:
text = 'Lost connection'
raise SocketIOConnectionError(text)
try:
packetParts = packet.split(':', 3)
except AttributeError:
raise SocketIOPacketError('Received invalid packet (%s)' % packet)
packetCount = len(packetParts)
code, packetID, path, data = None, None, None, None
if 4 == packetCount:
code, packetID, path, data = packetParts
elif 3 == packetCount:
code, packetID, path = packetParts
elif 1 == packetCount:
code = packetParts[0]
return code, packetID, path, data
def send_packet(self, code, path='', data='', callback=None):
packetID = self.set_messageCallback(callback) if callback else ''
packetParts = [str(code), packetID, path, data]
try:
packet = ':'.join(packetParts)
self.connection.send(packet)
except socket.error:
raise SocketIOPacketError('Could not send packet')
@property
def connected(self):
return self.connection.connected
class SocketIOError(Exception):
pass
def name_callback(eventName):
return 'on_' + eventName.replace(' ', '_')
class SocketIOConnectionError(SocketIOError):
pass
class SocketIOPacketError(SocketIOError):
pass
def find_callback(args, kw=None):
'Return callback whether passed as a last argument or as a keyword'
if args and callable(args[-1]):
return args[-1], args[:-1]
try:
return kw['callback'], args
except (KeyError, TypeError):
return None, args

View file

@ -1,61 +1,174 @@
from socketIO_client import SocketIO, BaseNamespace
from time import sleep
from socketIO_client import SocketIO, BaseNamespace, find_callback
from unittest import TestCase
PAYLOAD = {'bbb': 'ccc'}
ON_RESPONSE_CALLED = False
HOST = 'localhost'
PORT = 8000
DATA = 'xxx'
PAYLOAD = {'xxx': 'yyy'}
class TestSocketIO(TestCase):
def setUp(self):
self.socketIO = SocketIO(HOST, PORT)
self.called_on_response = False
def tearDown(self):
del self.socketIO
def on_response(self, *args):
self.called_on_response = True
for arg in args:
if isinstance(arg, dict):
self.assertEqual(arg, PAYLOAD)
else:
self.assertEqual(arg, DATA)
def is_connected(self, socketIO, connected):
childThreads = [
socketIO._rhythmicThread,
socketIO._listenerThread,
]
for childThread in childThreads:
self.assertEqual(not connected, childThread.done.is_set())
self.assertEqual(connected, socketIO.connected)
def test_disconnect(self):
socketIO = SocketIO('localhost', 8000)
socketIO.disconnect()
self.assertEqual(socketIO.connected, False)
'Terminate child threads after disconnect'
self.is_connected(self.socketIO, True)
self.socketIO.disconnect()
self.is_connected(self.socketIO, False)
# Use context manager
with SocketIO(HOST, PORT) as self.socketIO:
self.is_connected(self.socketIO, True)
self.is_connected(self.socketIO, False)
def test_message(self):
'Message'
self.socketIO.define(Namespace)
self.socketIO.message()
self.socketIO.wait(0.1)
namespace = self.socketIO.get_namespace()
self.assertEqual(namespace.response, 'message_response')
def test_message_with_data(self):
'Message with data'
self.socketIO.define(Namespace)
self.socketIO.message(DATA)
self.socketIO.wait(0.1)
namespace = self.socketIO.get_namespace()
self.assertEqual(namespace.response, DATA)
def test_message_with_payload(self):
'Message with payload'
self.socketIO.define(Namespace)
self.socketIO.message(PAYLOAD)
self.socketIO.wait(0.1)
namespace = self.socketIO.get_namespace()
self.assertEqual(namespace.response, PAYLOAD)
def test_message_with_callback(self):
'Message with callback'
self.socketIO.message(callback=self.on_response)
self.socketIO.wait_for_callbacks(seconds=0.1)
self.assertEqual(self.called_on_response, True)
def test_message_with_callback_with_data(self):
'Message with callback with data'
self.socketIO.message(DATA, self.on_response)
self.socketIO.wait_for_callbacks(seconds=0.1)
self.assertEqual(self.called_on_response, True)
def test_emit(self):
socketIO = SocketIO('localhost', 8000, Namespace)
socketIO.emit('aaa', PAYLOAD)
sleep(0.5)
self.assertEqual(socketIO.namespace.payload, PAYLOAD)
'Emit'
self.socketIO.define(Namespace)
self.socketIO.emit('emit')
self.socketIO.wait(0.1)
self.assertEqual(self.socketIO.get_namespace().argsByEvent, {
'emit_response': (),
})
def test_emit_with_payload(self):
'Emit with payload'
self.socketIO.define(Namespace)
self.socketIO.emit('emit_with_payload', PAYLOAD)
self.socketIO.wait(0.1)
self.assertEqual(self.socketIO.get_namespace().argsByEvent, {
'emit_with_payload_response': (PAYLOAD,),
})
def test_emit_with_multiple_payloads(self):
'Emit with multiple payloads'
self.socketIO.define(Namespace)
self.socketIO.emit('emit_with_multiple_payloads', PAYLOAD, PAYLOAD)
self.socketIO.wait(0.1)
self.assertEqual(self.socketIO.get_namespace().argsByEvent, {
'emit_with_multiple_payloads_response': (PAYLOAD, PAYLOAD),
})
def test_emit_with_callback(self):
global ON_RESPONSE_CALLED
ON_RESPONSE_CALLED = False
socketIO = SocketIO('localhost', 8000)
socketIO.emit('aaa', PAYLOAD, on_response)
socketIO.wait(forCallbacks=True)
self.assertEqual(ON_RESPONSE_CALLED, True)
'Emit with callback'
self.socketIO.emit('emit_with_callback', self.on_response)
self.socketIO.wait_for_callbacks(seconds=0.1)
self.assertEqual(self.called_on_response, True)
def test_events(self):
global ON_RESPONSE_CALLED
ON_RESPONSE_CALLED = False
socketIO = SocketIO('localhost', 8000)
socketIO.on('ddd', on_response)
socketIO.emit('aaa', PAYLOAD)
sleep(0.5)
self.assertEqual(ON_RESPONSE_CALLED, True)
def test_emit_with_callback_with_payload(self):
'Emit with callback with payload'
self.socketIO.emit('emit_with_callback_with_payload',
self.on_response)
self.socketIO.wait_for_callbacks(seconds=0.1)
self.assertEqual(self.called_on_response, True)
def test_channels(self):
mainSocket = SocketIO('localhost', 8000, Namespace)
chatSocket = mainSocket.connect('/chat', Namespace)
newsSocket = mainSocket.connect('/news', Namespace)
newsSocket.emit('aaa', PAYLOAD)
sleep(0.5)
self.assertNotEqual(mainSocket.namespace.payload, PAYLOAD)
self.assertNotEqual(chatSocket.namespace.payload, PAYLOAD)
self.assertEqual(newsSocket.namespace.payload, PAYLOAD)
def test_emit_with_callback_with_multiple_payloads(self):
'Emit with callback with multiple payloads'
self.socketIO.emit('emit_with_callback_with_multiple_payloads',
self.on_response)
self.socketIO.wait_for_callbacks(seconds=0.1)
self.assertEqual(self.called_on_response, True)
def test_emit_with_event(self):
'Emit to trigger an event'
self.socketIO.on('emit_with_event_response', self.on_response)
self.socketIO.emit('emit_with_event', PAYLOAD)
self.socketIO.wait_for_callbacks(0.1)
self.assertEqual(self.called_on_response, True)
def test_ack(self):
'Trigger server callback'
self.socketIO.define(Namespace)
self.socketIO.emit('ack', PAYLOAD)
self.socketIO.wait(0.1)
self.assertEqual(self.socketIO.get_namespace().argsByEvent, {
'ack_response': (PAYLOAD,),
'ack_callback_response': (PAYLOAD,),
})
def test_namespaces(self):
'Behave differently in different namespaces'
mainNamespace = self.socketIO.define(Namespace)
chatNamespace = self.socketIO.define(Namespace, '/chat')
newsNamespace = self.socketIO.define(Namespace, '/news')
newsNamespace.emit('emit_with_payload', PAYLOAD)
self.socketIO.wait(0.1)
self.assertEqual(mainNamespace.argsByEvent, {})
self.assertEqual(chatNamespace.argsByEvent, {})
self.assertEqual(newsNamespace.argsByEvent, {
'emit_with_payload_response': (PAYLOAD,),
})
class Namespace(BaseNamespace):
payload = None
def initialize(self):
self.response = None
self.argsByEvent = {}
def on_ddd(self, data):
self.payload = data
def on_message(self, data):
self.response = data
def on_response(*args):
global ON_RESPONSE_CALLED
ON_RESPONSE_CALLED = True
def on_event(self, event, *args):
callback, args = find_callback(args)
if callback:
callback(*args)
self.argsByEvent[event] = args