diff --git a/.gitignore b/.gitignore index 15e22ab..6cf8c00 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,9 @@ *~ +*.sw[op] +*.py[cod] +*.egg *.egg-info -*.pyc -*.swo -*.swp -.coverage build dist +sdist +.coverage diff --git a/CHANGES.rst b/CHANGES.rst index 8afe01b..eeae8ed 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,8 +1,14 @@ +0.4 +--- +- Added support for server-side callbacks thanks to Zac Lee +- Added low-level _SocketIO to remove cyclic references +- Merged Channel functionality into BaseNamespace thanks to Alexandre Bourget + 0.3 --- - Added support for secure connections - Added socketIO.wait() -- Improved exception handling in heartbeatThread and namespaceThread +- Improved exception handling in _RhythmicThread and _ListenerThread 0.2 --- diff --git a/LICENSE b/LICENSE index 2e7adea..4e68cea 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2012 Roy Hyunjin Han and contributors +Copyright (c) 2013 Roy Hyunjin Han and contributors Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: diff --git a/README.rst b/README.rst index bba2b1d..3c348b8 100644 --- a/README.rst +++ b/README.rst @@ -2,12 +2,6 @@ socketIO-client =============== Here is a socket.io_ client library for Python. You can use it to write test code for your socket.io_ server. -Thanks to rod_ for the `StackOverflow question and answer`__ on which this code is based. - -Thanks to liris_ for websocket-client_ and to guille_ for the `socket.io specification`_. - -Thanks to `Paul Kienzle`_, `Josh VanderLinden`_, `Ian Fitzpatrick`_ for submitting code to expand support of the socket.io protocol. - Installation ------------ @@ -22,7 +16,7 @@ Installation source $VIRTUAL_ENV/bin/activate # Install package - easy_install -U socketIO-client + pip install -U socketIO-client Usage @@ -36,31 +30,32 @@ Emit. :: from socketIO_client import SocketIO - socketIO = SocketIO('localhost', 8000) - socketIO.emit('aaa', {'bbb': 'ccc'}) - socketIO.wait(seconds=1) + with SocketIO('localhost', 8000) as socketIO: + socketIO.emit('aaa') + socketIO.wait(seconds=1) Emit with callback. :: from socketIO_client import SocketIO - def on_response(*args): - print args + def on_bbb_response(*args): + print 'on_bbb_response', args - socketIO = SocketIO('localhost', 8000) - socketIO.emit('aaa', {'bbb': 'ccc'}, on_response) - socketIO.wait(forCallbacks=True) + with SocketIO('localhost', 8000) as socketIO: + socketIO.emit('bbb', {'xxx': 'yyy'}, on_bbb_response) + socketIO.wait_for_callbacks(seconds=1) Define events. :: from socketIO_client import SocketIO - def on_ddd(*args): - print args + def on_aaa_response(*args): + print 'on_aaa_response', args socketIO = SocketIO('localhost', 8000) - socketIO.on('ddd', on_ddd) - socketIO.wait() + socketIO.on('aaa_response', on_aaa_response) + socketIO.emit('aaa') + socketIO.wait(seconds=1) Define events in a namespace. :: @@ -68,11 +63,14 @@ Define events in a namespace. :: class Namespace(BaseNamespace): - def on_ddd(self, *args): - self.socketIO.emit('eee', {'fff': 'ggg'}) + def on_aaa_response(self, *args): + print 'on_aaa_response', args + self.emit('bbb') - socketIO = SocketIO('localhost', 8000, Namespace) - socketIO.wait() + socketIO = SocketIO('localhost', 8000) + socketIO.define(Namespace) + socketIO.emit('aaa') + socketIO.wait(seconds=1) Define standard events. :: @@ -80,44 +78,40 @@ Define standard events. :: class Namespace(BaseNamespace): - def on_connect(self, socketIO): + def on_connect(self): print '[Connected]' - def on_disconnect(self): - print '[Disconnected]' + socketIO = SocketIO('localhost', 8000) + socketIO.define(Namespace) + socketIO.wait(seconds=1) - def on_error(self, name, message): - print '[Error] %s: %s' % (name, message) - - def on_message(self, id, message): - print '[Message] %s: %s' % (id, message) - - socketIO = SocketIO('localhost', 8000, Namespace) - socketIO.wait() - -Define different behavior for different channels on a single socket. :: +Define different namespaces on a single socket. :: from socketIO_client import SocketIO, BaseNamespace - class MainNamespace(BaseNamespace): - - def on_aaa(self, *args): - print 'aaa', args - class ChatNamespace(BaseNamespace): - def on_bbb(self, *args): - print 'bbb', args + def on_aaa_response(self, *args): + print 'on_aaa_response', args class NewsNamespace(BaseNamespace): - def on_ccc(self, *args): - print 'ccc', args + def on_aaa_response(self, *args): + print 'on_aaa_response', args - mainSocket = SocketIO('localhost', 8000, MainNamespace) - chatSocket = mainSocket.connect('/chat', ChatNamespace) - newsSocket = mainSocket.connect('/news', NewsNamespace) - mainSocket.wait() + socketIO = SocketIO('localhost', 8000) + chatNamespace = socketIO.define(ChatNamespace, '/chat') + newsNamespace = socketIO.define(NewsNamespace, '/news') + + chatNamespace.emit('aaa') + newsNamespace.emit('aaa') + socketIO.wait(seconds=1) + +Open secure websockets (HTTPS / WSS) behind a proxy. :: + + SocketIO('localhost', 8000, + secure=True, + proxies={'https': 'https://proxy.example.com:8080'}) License @@ -125,14 +119,31 @@ License This software is available under the MIT License. +Credits +------- +- `Guillermo Rauch`_ wrote the `socket.io specification`_. +- `Hiroki Ohtani`_ wrote websocket-client_. +- rod_ wrote a `prototype for a Python client to a socket.io server`_ on StackOverflow. +- `Alexandre Bourget`_ wrote gevent-socketio_, which is a socket.io server written in Python. +- `Paul Kienzle`_, `Zac Lee`_, `Josh VanderLinden`_, `Ian Fitzpatrick`_, `Lucas Klein`_ submitted code to expand support of the socket.io protocol. + + .. _socket.io: http://socket.io -.. _rod: http://stackoverflow.com/users/370115/rod -.. _StackOverflowQA: http://stackoverflow.com/questions/6692908/formatting-messages-to-send-to-socket-io-node-js-server-from-python-client -__ StackOverflowQA_ -.. _liris: https://github.com/liris -.. _websocket-client: https://github.com/liris/websocket-client -.. _guille: https://github.com/guille + +.. _Guillermo Rauch: https://github.com/guille .. _socket.io specification: https://github.com/LearnBoost/socket.io-spec + +.. _Hiroki Ohtani: https://github.com/liris +.. _websocket-client: https://github.com/liris/websocket-client + +.. _rod: http://stackoverflow.com/users/370115/rod +.. _prototype for a Python client to a socket.io server: http://stackoverflow.com/questions/6692908/formatting-messages-to-send-to-socket-io-node-js-server-from-python-client + +.. _Alexandre Bourget: https://github.com/abourget +.. _gevent-socketio: https://github.com/abourget/gevent-socketio + .. _Paul Kienzle: https://github.com/pkienzle +.. _Zac Lee: https://github.com/zratic .. _Josh VanderLinden: https://github.com/codekoala .. _Ian Fitzpatrick: https://github.com/GraphEffect +.. _Lucas Klein: https://github.com/lukashed diff --git a/TODO.goals b/TODO.goals new file mode 100644 index 0000000..84b159c --- /dev/null +++ b/TODO.goals @@ -0,0 +1,7 @@ += Resolve pull requests +Resolve issues + Investigate issue #8 +Examine forks + Integrate Sajal's fork #7 + Integrate Francis's fork #10 + Integrate Paul's fork diff --git a/TODO.rst b/TODO.rst deleted file mode 100644 index e69de29..0000000 diff --git a/serve_tests.js b/serve_tests.js new file mode 100644 index 0000000..82c8024 --- /dev/null +++ b/serve_tests.js @@ -0,0 +1,71 @@ +var io = require('socket.io').listen(8000); + +var main = io.of('').on('connection', function(socket) { + socket.on('message', function(data, fn) { + if (fn) { // Client expects a callback + if (data) { + fn(data); + } else { + fn(); + } + } else if (typeof data === 'object') { + socket.json.send(data ? data : 'message_response'); // object or null + } else { + socket.send(data ? data : 'message_response'); // string or '' + } + }); + socket.on('emit', function() { + socket.emit('emit_response'); + }); + socket.on('emit_with_payload', function(payload) { + socket.emit('emit_with_payload_response', payload); + }); + socket.on('emit_with_multiple_payloads', function(payload, payload) { + socket.emit('emit_with_multiple_payloads_response', payload, payload); + }); + socket.on('emit_with_callback', function(fn) { + fn(); + }); + socket.on('emit_with_callback_with_payload', function(fn) { + fn(PAYLOAD); + }); + socket.on('emit_with_callback_with_multiple_payloads', function(fn) { + fn(PAYLOAD, PAYLOAD); + }); + socket.on('emit_with_event', function(payload) { + socket.emit('emit_with_event_response', payload); + }); + socket.on('ack', function(payload) { + socket.emit('ack_response', payload, function(payload) { + socket.emit('ack_callback_response', payload); + }); + }); + socket.on('aaa', function() { + socket.emit('aaa_response', PAYLOAD); + }); + socket.on('bbb', function(payload, fn) { + if (fn) { + fn(payload); + } + }); +}); + +var chat = io.of('/chat').on('connection', function (socket) { + socket.on('emit_with_payload', function(payload) { + socket.emit('emit_with_payload_response', payload); + }); + socket.on('aaa', function() { + socket.emit('aaa_response', 'in chat'); + }); +}); + +var news = io.of('/news').on('connection', function (socket) { + socket.on('emit_with_payload', function(payload) { + socket.emit('emit_with_payload_response', payload); + }); + socket.on('aaa', function() { + socket.emit('aaa_response', 'in news'); + }); +}); + +var PAYLOAD = {'xxx': 'yyy'}; diff --git a/serve_tests.py b/serve_tests.py deleted file mode 100755 index 47c946a..0000000 --- a/serve_tests.py +++ /dev/null @@ -1,29 +0,0 @@ -'Launch this server in another terminal window before running tests' -from socketio import socketio_manage -from socketio.namespace import BaseNamespace -from socketio.server import SocketIOServer - - -class Namespace(BaseNamespace): - - def on_aaa(self, *args): - self.socket.send_packet(dict( - type='event', - name='ddd', - args=args, - endpoint=self.ns_name)) - - -class Application(object): - - def __call__(self, environ, start_response): - socketio_manage(environ, { - '': Namespace, - '/chat': Namespace, - '/news': Namespace, - }) - - -if __name__ == '__main__': - socketIOServer = SocketIOServer(('0.0.0.0', 8000), Application()) - socketIOServer.serve_forever() diff --git a/setup.py b/setup.py old mode 100755 new mode 100644 index 2651b7a..0f79923 --- a/setup.py +++ b/setup.py @@ -9,7 +9,7 @@ CHANGES = open(os.path.join(here, 'CHANGES.rst')).read() setup( name='socketIO-client', - version='0.3', + version='0.4', description='A socket.io client library', long_description=README + '\n\n' + CHANGES, license='MIT', diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index c6b3ce0..5d3499d 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -1,35 +1,55 @@ -import websocket -from anyjson import dumps, loads +import socket +from json import dumps, loads from threading import Thread, Event from time import sleep from urllib import urlopen +from websocket import WebSocketConnectionClosedException, create_connection -__version__ = '0.3' - - -PROTOCOL = 1 # SocketIO protocol version +PROTOCOL = 1 # socket.io protocol version class BaseNamespace(object): # pragma: no cover + 'Define socket.io behavior' - def __init__(self, socketIO): - self.socketIO = socketIO + def __init__(self, _socketIO, path): + self._socketIO = _socketIO + self._path = path + self._callbackByEvent = {} + self.initialize() - def on_connect(self, socketIO): + def initialize(self): + 'Initialize custom variables here; you can override this method' + pass + + def on_connect(self): + 'Called when socket is connecting; you can override this method' pass def on_disconnect(self): + 'Called when socket is disconnecting; you can override this method' pass def on_error(self, reason, advice): + 'Called when server sends an error; you can override this method' print '[Error] %s' % advice - def on_message(self, messageData): - print '[Message] %s' % messageData + def on_message(self, data): + 'Called when server sends a message; you can override this method' + print '[Message] %s' % data - def on_(self, eventName, *eventArguments): - print '[Event] %s%s' % (eventName, eventArguments) + def on_event(self, event, *args): + """ + Called when server emits an event; you can override this method. + Called only if the program cannot find a more specific event handler, + such as one defined by namespace.on('my_event', my_function). + """ + callback, args = find_callback(args) + arguments = [repr(_) for _ in args] + if callback: + arguments.append('callback(*args)') + callback(*args) + print '[Event] %s(%s)' % (event, ', '.join(arguments)) def on_open(self, *args): print '[Open]', args @@ -43,143 +63,96 @@ class BaseNamespace(object): # pragma: no cover def on_reconnect(self, *args): print '[Reconnect]', args + def message(self, data='', callback=None): + self._socketIO.message(data, callback, path=self._path) + + def emit(self, event, *args, **kw): + kw['path'] = self._path + self._socketIO.emit(event, *args, **kw) + + def on(self, event, callback): + 'Define a callback to handle a custom event emitted by the server' + self._callbackByEvent[event] = callback + + def _get_eventCallback(self, event): + # Check callbacks defined by on() + try: + return self._callbackByEvent[event] + except KeyError: + pass + # Check callbacks defined explicitly or use on_event() + callback = lambda *args: self.on_event(event, *args) + return getattr(self, 'on_' + event.replace(' ', '_'), callback) + class SocketIO(object): - messageID = 0 + def __init__(self, host, port, secure=False, proxies=None): + """ + Create a socket.io client that connects to a socket.io server + at the specified host and port. Set secure=True to use HTTPS / WSS. - def __init__(self, host, port, Namespace=BaseNamespace, secure=False): - self.host = host - self.port = int(port) - self.namespace = Namespace(self) - self.secure = secure - self.__connect() + SocketIO('localhost', 8000, secure=True, + proxies={'https': 'https://proxy.example.com:8080'}) + """ + self._socketIO = _SocketIO(host, port, secure, proxies) + self._namespaceByPath = {} + self.define(BaseNamespace) # Define default namespace - heartbeatInterval = self.heartbeatTimeout - 2 - self.heartbeatThread = RhythmicThread(heartbeatInterval, - self._send_heartbeat) - self.heartbeatThread.start() + self._rhythmicThread = _RhythmicThread( + self._socketIO.heartbeatInterval, + self._socketIO.send_heartbeat) + self._rhythmicThread.start() - self.channelByName = {} - self.callbackByEvent = {} - self.namespaceThread = ListenerThread(self) - self.namespaceThread.start() + self._listenerThread = _ListenerThread( + self._socketIO, + self._namespaceByPath) + self._listenerThread.start() - def __del__(self): # pragma: no cover - self.heartbeatThread.cancel() - self.namespaceThread.cancel() - self.connection.close() + def __enter__(self): + return self - def __connect(self): - baseURL = '%s:%d/socket.io/%s' % (self.host, self.port, PROTOCOL) - try: - response = urlopen('%s://%s/' % ( - 'https' if self.secure else 'http', baseURL)) - except IOError: # pragma: no cover - raise SocketIOError('Could not start connection') - if 200 != response.getcode(): # pragma: no cover - raise SocketIOError('Could not establish connection') - responseParts = response.readline().split(':') - self.sessionID = responseParts[0] - self.heartbeatTimeout = int(responseParts[1]) - self.connectionTimeout = int(responseParts[2]) - self.supportedTransports = responseParts[3].split(',') - if 'websocket' not in self.supportedTransports: - raise SocketIOError('Could not parse handshake') # pragma: no cover - socketURL = '%s://%s/websocket/%s' % ( - 'wss' if self.secure else 'ws', baseURL, self.sessionID) - self.connection = websocket.create_connection(socketURL) + def __exit__(self, exc_type, exc_value, traceback): + self.disconnect() - def _recv_packet(self): - code, packetID, channelName, data = -1, None, None, None - packet = self.connection.recv() - packetParts = packet.split(':', 3) - packetCount = len(packetParts) - if 4 == packetCount: - code, packetID, channelName, data = packetParts - elif 3 == packetCount: - code, packetID, channelName = packetParts - elif 1 == packetCount: # pragma: no cover - code = packetParts[0] - return int(code), packetID, channelName, data - - def _send_packet(self, code, channelName='', data='', callback=None): - self.connection.send(':'.join([ - str(code), - self.set_callback(callback) if callback else '', - channelName, - data])) - - def disconnect(self, channelName=''): - self._send_packet(0, channelName) - if channelName: - del self.channelByName[channelName] - else: - self.__del__() + def __del__(self): + self.disconnect(close=False) @property def connected(self): - return self.connection.connected + return self._socketIO.connected - def connect(self, channelName, Namespace=BaseNamespace): - channel = Channel(self, channelName, Namespace) - self.channelByName[channelName] = channel - self._send_packet(1, channelName) - return channel - - def _send_heartbeat(self): - try: - self._send_packet(2) - except: - self.__del__() - - def message(self, messageData, callback=None, channelName=''): - if isinstance(messageData, basestring): - code = 3 - data = messageData + def disconnect(self, path='', close=True): + if self.connected: + self._socketIO.disconnect(path, close) + if path: + del self._namespaceByPath[path] else: - code = 4 - data = dumps(messageData) - self._send_packet(code, channelName, data, callback) + self._rhythmicThread.cancel() + self._listenerThread.cancel() - def emit(self, eventName, *eventArguments, **eventKeywords): - code = 5 - if callable(eventArguments[-1]): - callback = eventArguments[-1] - eventArguments = eventArguments[:-1] - else: - callback = None - channelName = eventKeywords.get('channelName', '') - data = dumps(dict(name=eventName, args=eventArguments)) - self._send_packet(code, channelName, data, callback) + def define(self, Namespace, path=''): + if path: + self._socketIO.connect(path) + namespace = Namespace(self._socketIO, path) + self._namespaceByPath[path] = namespace + return namespace - def get_callback(self, channelName, eventName): - 'Get callback associated with channelName and eventName' - socketIO = self.channelByName[channelName] if channelName else self - try: - return socketIO.callbackByEvent[eventName] - except KeyError: - pass - namespace = socketIO.namespace + def get_namespace(self, path=''): + return self._namespaceByPath[path] - def callback_(*eventArguments): - return namespace.on_(eventName, *eventArguments) - return getattr(namespace, name_callback(eventName), callback_) + def on(self, event, callback, path=''): + return self.get_namespace(path).on(event, callback) - def set_callback(self, callback): - 'Set callback that will be called after receiving an acknowledgment' - self.messageID += 1 - self.namespaceThread.set_callback(self.messageID, callback) - return '%s+' % self.messageID + def message(self, data='', callback=None, path=''): + self._socketIO.message(data, callback, path) - def on(self, eventName, callback): - self.callbackByEvent[eventName] = callback + def emit(self, event, *args, **kw): + self._socketIO.emit(event, *args, **kw) - def wait(self, seconds=None, forCallbacks=False): - if forCallbacks: - self.namespaceThread.wait_for_callbacks(seconds) - elif seconds: - sleep(seconds) + def wait(self, seconds=None): + if seconds: + self._listenerThread.wait(seconds) else: try: while self.connected: @@ -187,149 +160,281 @@ class SocketIO(object): except KeyboardInterrupt: pass - -class Channel(object): - - def __init__(self, socketIO, channelName, Namespace): - self.socketIO = socketIO - self.channelName = channelName - self.namespace = Namespace(self) - self.callbackByEvent = {} - - def disconnect(self): - self.socketIO.disconnect(self.channelName) - - def emit(self, eventName, *eventArguments): - self.socketIO.emit(eventName, *eventArguments, - channelName=self.channelName) - - def message(self, messageData, callback=None): - self.socketIO.message(messageData, callback, - channelName=self.channelName) - - def on(self, eventName, eventCallback): - self.callbackByEvent[eventName] = eventCallback + def wait_for_callbacks(self, seconds=None): + self._listenerThread.wait_for_callbacks(seconds) -class ListenerThread(Thread): - 'Process messages from SocketIO server' +class _RhythmicThread(Thread): + 'Execute call every few seconds' daemon = True - def __init__(self, socketIO): - super(ListenerThread, self).__init__() - self.socketIO = socketIO - self.done = Event() - self.waitingForCallbacks = Event() - self.callbackByMessageID = {} - self.get_callback = self.socketIO.get_callback - - def run(self): - while not self.done.is_set(): - try: - code, packetID, channelName, data = self.socketIO._recv_packet() - except: - continue - try: - delegate = { - 0: self.on_disconnect, - 1: self.on_connect, - 2: self.on_heartbeat, - 3: self.on_message, - 4: self.on_json, - 5: self.on_event, - 6: self.on_acknowledgment, - 7: self.on_error, - }[code] - except KeyError: - continue - delegate(packetID, channelName, data) - - def cancel(self): - self.done.set() - - def wait_for_callbacks(self, seconds): - self.waitingForCallbacks.set() - self.join(seconds) - - def set_callback(self, messageID, callback): - self.callbackByMessageID[messageID] = callback - - def on_disconnect(self, packetID, channelName, data): - callback = self.get_callback(channelName, 'disconnect') - callback() - - def on_connect(self, packetID, channelName, data): - callback = self.get_callback(channelName, 'connect') - callback(self.socketIO) - - def on_heartbeat(self, packetID, channelName, data): - pass - - def on_message(self, packetID, channelName, data): - callback = self.get_callback(channelName, 'message') - callback(data) - - def on_json(self, packetID, channelName, data): - callback = self.get_callback(channelName, 'message') - callback(loads(data)) - - def on_event(self, packetID, channelName, data): - valueByName = loads(data) - eventName = valueByName['name'] - eventArguments = valueByName.get('args', []) - callback = self.get_callback(channelName, eventName) - callback(*eventArguments) - - def on_acknowledgment(self, packetID, channelName, data): - dataParts = data.split('+', 1) - messageID = int(dataParts[0]) - arguments = loads(dataParts[1]) or [] - try: - callback = self.callbackByMessageID[messageID] - except KeyError: - pass - else: - del self.callbackByMessageID[messageID] - callback(*arguments) - callbackCount = len(self.callbackByMessageID) - if self.waitingForCallbacks.is_set() and not callbackCount: - self.cancel() - - def on_error(self, packetID, channelName, data): - reason, advice = data.split('+', 1) - callback = self.get_callback(channelName, 'error') - callback(reason, advice) - - -class RhythmicThread(Thread): - 'Execute rhythmicFunction every few seconds' - - daemon = True - - def __init__(self, intervalInSeconds, rhythmicFunction, *args, **kw): - super(RhythmicThread, self).__init__() + def __init__(self, intervalInSeconds, call, *args, **kw): + super(_RhythmicThread, self).__init__() self.intervalInSeconds = intervalInSeconds - self.rhythmicFunction = rhythmicFunction + self.call = call self.args = args self.kw = kw self.done = Event() def run(self): - try: - while not self.done.is_set(): - self.rhythmicFunction(*self.args, **self.kw) - self.done.wait(self.intervalInSeconds) - except: - pass + while not self.done.is_set(): + self.call(*self.args, **self.kw) + self.done.wait(self.intervalInSeconds) def cancel(self): self.done.set() +class _ListenerThread(Thread): + 'Process messages from socket.io server' + + daemon = True + + def __init__(self, _socketIO, _namespaceByPath): + super(_ListenerThread, self).__init__() + self._socketIO = _socketIO + self._namespaceByPath = _namespaceByPath + self.done = Event() + self.ready = Event() + self.ready.set() + + def cancel(self): + self.done.set() + + def wait(self, seconds): + self.done.wait(seconds) + + def wait_for_callbacks(self, seconds): + self.ready.clear() + self.ready.wait(seconds) + + def get_ackCallback(self, packetID): + return lambda *args: self._socketIO.ack(packetID, *args) + + def run(self): + while not self.done.is_set(): + try: + code, packetID, path, data = self._socketIO.recv_packet() + except SocketIOConnectionError, error: + print error + return + except SocketIOPacketError, error: + print error + continue + try: + namespace = self._namespaceByPath[path] + except KeyError: + print 'Received unexpected path (%s)' % path + continue + try: + delegate = { + '0': self.on_disconnect, + '1': self.on_connect, + '2': self.on_heartbeat, + '3': self.on_message, + '4': self.on_json, + '5': self.on_event, + '6': self.on_ack, + '7': self.on_error, + }[code] + except KeyError: + print 'Received unexpected code (%s)' % code + continue + delegate(packetID, namespace._get_eventCallback, data) + + def on_disconnect(self, packetID, get_eventCallback, data): + get_eventCallback('disconnect')() + + def on_connect(self, packetID, get_eventCallback, data): + get_eventCallback('connect')() + + def on_heartbeat(self, packetID, get_eventCallback, data): + pass + + def on_message(self, packetID, get_eventCallback, data): + args = [data] + if packetID: + args.append(self.get_ackCallback(packetID)) + get_eventCallback('message')(*args) + + def on_json(self, packetID, get_eventCallback, data): + args = [loads(data)] + if packetID: + args.append(self.get_ackCallback(packetID)) + get_eventCallback('message')(*args) + + def on_event(self, packetID, get_eventCallback, data): + valueByName = loads(data) + event = valueByName['name'] + args = valueByName.get('args', []) + if packetID: + args.append(self.get_ackCallback(packetID)) + get_eventCallback(event)(*args) + + def on_ack(self, packetID, get_eventCallback, data): + dataParts = data.split('+', 1) + messageID = int(dataParts[0]) + args = loads(dataParts[1]) if len(dataParts) > 1 else [] + callback = self._socketIO.get_messageCallback(messageID) + if not callback: + return + callback(*args) + if not self._socketIO.has_messageCallback: + self.ready.set() + + def on_error(self, packetID, get_eventCallback, data): + reason, advice = data.split('+', 1) + get_eventCallback('error')(reason, advice) + + +class _SocketIO(object): + 'Low-level interface to remove cyclic references in child threads' + + messageID = 0 + + def __init__(self, host, port, secure, proxies): + baseURL = '%s:%d/socket.io/%s' % (host, port, PROTOCOL) + targetScheme = 'https' if secure else 'http' + targetURL = '%s://%s/' % (targetScheme, baseURL) + try: + response = urlopen(targetURL, proxies=proxies) + except IOError: # pragma: no cover + raise SocketIOError('Could not start connection') + if 200 != response.getcode(): # pragma: no cover + raise SocketIOError('Could not establish connection') + responseParts = response.readline().split(':') + sessionID = responseParts[0] + heartbeatTimeout = int(responseParts[1]) + # connectionTimeout = int(responseParts[2]) + supportedTransports = responseParts[3].split(',') + if 'websocket' not in supportedTransports: + raise SocketIOError('Could not parse handshake') + socketScheme = 'wss' if secure else 'ws' + socketURL = '%s://%s/websocket/%s' % (socketScheme, baseURL, sessionID) + self.connection = create_connection(socketURL) + self.heartbeatInterval = heartbeatTimeout - 2 + self.callbackByMessageID = {} + + def __del__(self): + self.disconnect(close=False) + + def disconnect(self, path='', close=True): + if not self.connected: + return + if path: + self.send_packet(0, path) + elif close: + self.connection.close() + + def connect(self, path): + self.send_packet(1, path) + + def send_heartbeat(self): + try: + self.send_packet(2) + except SocketIOPacketError: + print 'Could not send heartbeat' + pass + + def message(self, data, callback, path): + if isinstance(data, basestring): + code = 3 + packetData = data + else: + code = 4 + packetData = dumps(data, ensure_ascii=False) + self.send_packet(code, path, packetData, callback) + + def emit(self, event, *args, **kw): + callback, args = find_callback(args, kw) + packetData = dumps(dict(name=event, args=args), ensure_ascii=False) + path = kw.get('path', '') + self.send_packet(5, path, packetData, callback) + + def ack(self, packetID, *args): + packetID = packetID.rstrip('+') + packetData = '%s+%s' % ( + packetID, + dumps(args, ensure_ascii=False), + ) if args else packetID + self.send_packet(6, data=packetData) + + def set_messageCallback(self, callback): + 'Set callback that will be called after receiving an acknowledgment' + self.messageID += 1 + self.callbackByMessageID[self.messageID] = callback + return '%s+' % self.messageID + + def get_messageCallback(self, messageID): + try: + callback = self.callbackByMessageID[messageID] + del self.callbackByMessageID[messageID] + return callback + except KeyError: + return + + @property + def has_messageCallback(self): + return True if self.callbackByMessageID else False + + def recv_packet(self): + try: + packet = self.connection.recv() + except WebSocketConnectionClosedException: + text = 'Lost connection (Connection closed)' + raise SocketIOConnectionError(text) + except socket.timeout: + text = 'Lost connection (Connection timed out)' + raise SocketIOConnectionError(text) + except socket.error: + text = 'Lost connection' + raise SocketIOConnectionError(text) + try: + packetParts = packet.split(':', 3) + except AttributeError: + raise SocketIOPacketError('Received invalid packet (%s)' % packet) + packetCount = len(packetParts) + code, packetID, path, data = None, None, None, None + if 4 == packetCount: + code, packetID, path, data = packetParts + elif 3 == packetCount: + code, packetID, path = packetParts + elif 1 == packetCount: + code = packetParts[0] + return code, packetID, path, data + + def send_packet(self, code, path='', data='', callback=None): + packetID = self.set_messageCallback(callback) if callback else '' + packetParts = [str(code), packetID, path, data] + try: + packet = ':'.join(packetParts) + self.connection.send(packet) + except socket.error: + raise SocketIOPacketError('Could not send packet') + + @property + def connected(self): + return self.connection.connected + + class SocketIOError(Exception): pass -def name_callback(eventName): - return 'on_' + eventName.replace(' ', '_') +class SocketIOConnectionError(SocketIOError): + pass + + +class SocketIOPacketError(SocketIOError): + pass + + +def find_callback(args, kw=None): + 'Return callback whether passed as a last argument or as a keyword' + if args and callable(args[-1]): + return args[-1], args[:-1] + try: + return kw['callback'], args + except (KeyError, TypeError): + return None, args diff --git a/socketIO_client/tests.py b/socketIO_client/tests.py index a85c48e..bdbfbd5 100644 --- a/socketIO_client/tests.py +++ b/socketIO_client/tests.py @@ -1,61 +1,174 @@ -from socketIO_client import SocketIO, BaseNamespace -from time import sleep +from socketIO_client import SocketIO, BaseNamespace, find_callback from unittest import TestCase -PAYLOAD = {'bbb': 'ccc'} -ON_RESPONSE_CALLED = False +HOST = 'localhost' +PORT = 8000 +DATA = 'xxx' +PAYLOAD = {'xxx': 'yyy'} class TestSocketIO(TestCase): + def setUp(self): + self.socketIO = SocketIO(HOST, PORT) + self.called_on_response = False + + def tearDown(self): + del self.socketIO + + def on_response(self, *args): + self.called_on_response = True + for arg in args: + if isinstance(arg, dict): + self.assertEqual(arg, PAYLOAD) + else: + self.assertEqual(arg, DATA) + + def is_connected(self, socketIO, connected): + childThreads = [ + socketIO._rhythmicThread, + socketIO._listenerThread, + ] + for childThread in childThreads: + self.assertEqual(not connected, childThread.done.is_set()) + self.assertEqual(connected, socketIO.connected) + def test_disconnect(self): - socketIO = SocketIO('localhost', 8000) - socketIO.disconnect() - self.assertEqual(socketIO.connected, False) + 'Terminate child threads after disconnect' + self.is_connected(self.socketIO, True) + self.socketIO.disconnect() + self.is_connected(self.socketIO, False) + # Use context manager + with SocketIO(HOST, PORT) as self.socketIO: + self.is_connected(self.socketIO, True) + self.is_connected(self.socketIO, False) + + def test_message(self): + 'Message' + self.socketIO.define(Namespace) + self.socketIO.message() + self.socketIO.wait(0.1) + namespace = self.socketIO.get_namespace() + self.assertEqual(namespace.response, 'message_response') + + def test_message_with_data(self): + 'Message with data' + self.socketIO.define(Namespace) + self.socketIO.message(DATA) + self.socketIO.wait(0.1) + namespace = self.socketIO.get_namespace() + self.assertEqual(namespace.response, DATA) + + def test_message_with_payload(self): + 'Message with payload' + self.socketIO.define(Namespace) + self.socketIO.message(PAYLOAD) + self.socketIO.wait(0.1) + namespace = self.socketIO.get_namespace() + self.assertEqual(namespace.response, PAYLOAD) + + def test_message_with_callback(self): + 'Message with callback' + self.socketIO.message(callback=self.on_response) + self.socketIO.wait_for_callbacks(seconds=0.1) + self.assertEqual(self.called_on_response, True) + + def test_message_with_callback_with_data(self): + 'Message with callback with data' + self.socketIO.message(DATA, self.on_response) + self.socketIO.wait_for_callbacks(seconds=0.1) + self.assertEqual(self.called_on_response, True) def test_emit(self): - socketIO = SocketIO('localhost', 8000, Namespace) - socketIO.emit('aaa', PAYLOAD) - sleep(0.5) - self.assertEqual(socketIO.namespace.payload, PAYLOAD) + 'Emit' + self.socketIO.define(Namespace) + self.socketIO.emit('emit') + self.socketIO.wait(0.1) + self.assertEqual(self.socketIO.get_namespace().argsByEvent, { + 'emit_response': (), + }) + + def test_emit_with_payload(self): + 'Emit with payload' + self.socketIO.define(Namespace) + self.socketIO.emit('emit_with_payload', PAYLOAD) + self.socketIO.wait(0.1) + self.assertEqual(self.socketIO.get_namespace().argsByEvent, { + 'emit_with_payload_response': (PAYLOAD,), + }) + + def test_emit_with_multiple_payloads(self): + 'Emit with multiple payloads' + self.socketIO.define(Namespace) + self.socketIO.emit('emit_with_multiple_payloads', PAYLOAD, PAYLOAD) + self.socketIO.wait(0.1) + self.assertEqual(self.socketIO.get_namespace().argsByEvent, { + 'emit_with_multiple_payloads_response': (PAYLOAD, PAYLOAD), + }) def test_emit_with_callback(self): - global ON_RESPONSE_CALLED - ON_RESPONSE_CALLED = False - socketIO = SocketIO('localhost', 8000) - socketIO.emit('aaa', PAYLOAD, on_response) - socketIO.wait(forCallbacks=True) - self.assertEqual(ON_RESPONSE_CALLED, True) + 'Emit with callback' + self.socketIO.emit('emit_with_callback', self.on_response) + self.socketIO.wait_for_callbacks(seconds=0.1) + self.assertEqual(self.called_on_response, True) - def test_events(self): - global ON_RESPONSE_CALLED - ON_RESPONSE_CALLED = False - socketIO = SocketIO('localhost', 8000) - socketIO.on('ddd', on_response) - socketIO.emit('aaa', PAYLOAD) - sleep(0.5) - self.assertEqual(ON_RESPONSE_CALLED, True) + def test_emit_with_callback_with_payload(self): + 'Emit with callback with payload' + self.socketIO.emit('emit_with_callback_with_payload', + self.on_response) + self.socketIO.wait_for_callbacks(seconds=0.1) + self.assertEqual(self.called_on_response, True) - def test_channels(self): - mainSocket = SocketIO('localhost', 8000, Namespace) - chatSocket = mainSocket.connect('/chat', Namespace) - newsSocket = mainSocket.connect('/news', Namespace) - newsSocket.emit('aaa', PAYLOAD) - sleep(0.5) - self.assertNotEqual(mainSocket.namespace.payload, PAYLOAD) - self.assertNotEqual(chatSocket.namespace.payload, PAYLOAD) - self.assertEqual(newsSocket.namespace.payload, PAYLOAD) + def test_emit_with_callback_with_multiple_payloads(self): + 'Emit with callback with multiple payloads' + self.socketIO.emit('emit_with_callback_with_multiple_payloads', + self.on_response) + self.socketIO.wait_for_callbacks(seconds=0.1) + self.assertEqual(self.called_on_response, True) + + def test_emit_with_event(self): + 'Emit to trigger an event' + self.socketIO.on('emit_with_event_response', self.on_response) + self.socketIO.emit('emit_with_event', PAYLOAD) + self.socketIO.wait_for_callbacks(0.1) + self.assertEqual(self.called_on_response, True) + + def test_ack(self): + 'Trigger server callback' + self.socketIO.define(Namespace) + self.socketIO.emit('ack', PAYLOAD) + self.socketIO.wait(0.1) + self.assertEqual(self.socketIO.get_namespace().argsByEvent, { + 'ack_response': (PAYLOAD,), + 'ack_callback_response': (PAYLOAD,), + }) + + def test_namespaces(self): + 'Behave differently in different namespaces' + mainNamespace = self.socketIO.define(Namespace) + chatNamespace = self.socketIO.define(Namespace, '/chat') + newsNamespace = self.socketIO.define(Namespace, '/news') + newsNamespace.emit('emit_with_payload', PAYLOAD) + self.socketIO.wait(0.1) + self.assertEqual(mainNamespace.argsByEvent, {}) + self.assertEqual(chatNamespace.argsByEvent, {}) + self.assertEqual(newsNamespace.argsByEvent, { + 'emit_with_payload_response': (PAYLOAD,), + }) class Namespace(BaseNamespace): - payload = None + def initialize(self): + self.response = None + self.argsByEvent = {} - def on_ddd(self, data): - self.payload = data + def on_message(self, data): + self.response = data - -def on_response(*args): - global ON_RESPONSE_CALLED - ON_RESPONSE_CALLED = True + def on_event(self, event, *args): + callback, args = find_callback(args) + if callback: + callback(*args) + self.argsByEvent[event] = args