diff --git a/CHANGES.rst b/CHANGES.rst index 551eacc..d8a781c 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,7 +1,14 @@ +0.1.2 +----- +- Added support for callbacks and channels thanks to Paul Kienzle +- Incorporated suggestions from Josh VanderLinden and Ian Fitzpatrick + 0.1.1 ----- - Added exception handling to destructor in case of connection failure 0.1.0 ----- -- Wrapped code from `StackOverflow `_ +- Wrapped code from StackOverflow_ + +.. _StackOverflow: http://stackoverflow.com/questions/6692908/formatting-messages-to-send-to-socket-io-node-js-server-from-python-client diff --git a/MANIFEST.in b/MANIFEST.in index f3be32c..23be747 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,3 +1,3 @@ -recursive-include socketIO * +recursive-include socketIOClient * include *.rst global-exclude *.pyc diff --git a/README.rst b/README.rst index 82110eb..61579fc 100644 --- a/README.rst +++ b/README.rst @@ -1,41 +1,126 @@ -socketIO.client +socketIO-client =============== -Here is a barebones `socket.io `_ client library for Python. +Here is a socket.io_ client library for Python. -Thanks to `rod `_ for his `StackOverflow question and answer `_, on which this code is based. +.. _socket.io: http://socket.io -Thanks also to `liris `_ for his `websocket-client `_ and to `guille `_ for the `socket.io specification `_. +Thanks to rod_ for the `StackOverflow question and answer`__ on which this code is based. + +.. _rod: http://stackoverflow.com/users/370115/rod +.. _StackOverflowQA: http://stackoverflow.com/questions/6692908/formatting-messages-to-send-to-socket-io-node-js-server-from-python-client +__ StackOverflowQA_ + +Thanks to liris_ for websocket-client_ and to guille_ for the `socket.io specification`_. + +.. _liris: https://github.com/liris +.. _websocket-client: https://github.com/liris/websocket-client +.. _guille: https://github.com/guille +.. _socket.io specification: https://github.com/LearnBoost/socket.io-spec + +Thanks to `Paul Kienzle`_, `Josh VanderLinden`_, `Ian Fitzpatrick`_ for submitting code to expand support of the socket.io protocol. + +.. _Paul Kienzle: https://github.com/pkienzle +.. _Josh VanderLinden: https://github.com/codekoala +.. _Ian Fitzpatrick: https://github.com/GraphEffect Installation ------------ :: + VIRTUAL_ENV=$HOME/.virtualenv + # Prepare isolated environment - ENV=$HOME/Projects/env - virtualenv $ENV - mkdir $ENV/opt + virtualenv $VIRTUAL_ENV # Activate isolated environment - source $ENV/bin/activate + source $VIRTUAL_ENV/bin/activate # Install package - easy_install -U socketIO-client + easy_install -U socketIOClient Usage ----- -:: +--- +Activate isolated environment. +.. code-block:: bash - ENV=$HOME/Projects/env - source $ENV/bin/activate - python + VIRTUAL_ENV=$HOME/.virtualenv + source $VIRTUAL_ENV/bin/activate - from socketIO import SocketIO - s = SocketIO('localhost', 8000) - s.emit('news', {'hello': 'world'}) +Emit. +.. code-block:: python + + from socketIOClient import SocketIO + + socketIO = SocketIO('localhost', 8000) + socketIO.emit('aaa', {'bbb': 'ccc'}) + +Emit with callback. +.. code-block:: python + + from socketIOClient import SocketIO + + def on_response(arg1, arg2, arg3, arg4): + print arg1, arg2, arg3, arg4 + + socketIO = SocketIO('localhost', 8000) + socketIO.emit('aaa', {'bbb': 'ccc'}, on_response) + socketIO.wait() + +Define events. +.. code-block:: python + + from socketIOClient import SocketIO + + def on_ddd(arg1, arg2, arg3, arg4): + print arg1, arg2, arg3, arg4 + + socketIO = SocketIO('localhost', 8000) + socketIO.on('ddd', on_ddd) + +Define events in a namespace. +.. code-block:: python + + from socketIOClient import SocketIO, BaseNamespace + + class Namespace(BaseNamespace): + + def on_ddd(self, arg1, arg2): + self.socketIO.emit('eee', {'fff': arg1 + arg2}) + + socketIO = SocketIO('localhost', 8000, Namespace) + +Define standard events. +.. code-block:: python + + from socketIOClient import SocketIO, BaseNamespace + + class Namespace(BaseNamespace): + + def on_connect(self, socketIO): + print '[Connected]' + + def on_disconnect(self): + print '[Disconnected]' + + def on_error(self, name, message): + print '[Error] %s: %s' % (name, message) + + def on_message(self, id, message): + print '[Message] %s: %s' % (id, message) + + socketIO = SocketIO('localhost', 8000, Namespace) + +Define different behavior for different channels on a single socket. +.. code-block:: python + + mainSocket = SocketIO('localhost', 8000, MainNamespace()) + chatSocket = mainSocket.connect('/chat', ChatNamespace()) + newsSocket = mainSocket.connect('/news', NewsNamespace()) License ------- -This software is available under the MIT License. Please see LICENSE.txt for the full license text. +This software is available under the MIT License. diff --git a/TODO.rst b/TODO.rst new file mode 100644 index 0000000..f8bee38 --- /dev/null +++ b/TODO.rst @@ -0,0 +1 @@ +- Consider enabling multiple callbacks for a single event diff --git a/serve_tests.py b/serve_tests.py new file mode 100755 index 0000000..47c946a --- /dev/null +++ b/serve_tests.py @@ -0,0 +1,29 @@ +'Launch this server in another terminal window before running tests' +from socketio import socketio_manage +from socketio.namespace import BaseNamespace +from socketio.server import SocketIOServer + + +class Namespace(BaseNamespace): + + def on_aaa(self, *args): + self.socket.send_packet(dict( + type='event', + name='ddd', + args=args, + endpoint=self.ns_name)) + + +class Application(object): + + def __call__(self, environ, start_response): + socketio_manage(environ, { + '': Namespace, + '/chat': Namespace, + '/news': Namespace, + }) + + +if __name__ == '__main__': + socketIOServer = SocketIOServer(('0.0.0.0', 8000), Application()) + socketIOServer.serve_forever() diff --git a/setup.py b/setup.py old mode 100644 new mode 100755 index 35d6170..3c25b65 --- a/setup.py +++ b/setup.py @@ -1,7 +1,8 @@ import os - from setuptools import setup, find_packages +from socketIO import __version__ + here = os.path.abspath(os.path.dirname(__file__)) README = open(os.path.join(here, 'README.rst')).read() @@ -9,10 +10,10 @@ CHANGES = open(os.path.join(here, 'CHANGES.rst')).read() setup( - name='socketIO-client', - version='0.1.1', - description='Barebones socket.io client library', - long_description=README + '\n\n' + CHANGES, + name='socketIOClient', + version=__version__, + description='A socket.io client library', + long_description=README + '\n\n' + CHANGES, license='MIT', classifiers=[ 'Intended Audience :: Developers', @@ -21,9 +22,10 @@ setup( ], keywords='socket.io node.js', author='Roy Hyunjin Han', - author_email='starsareblueandfaraway@gmail.com', - url='https://github.com/invisibleroads/socketIO-client', + author_email='rhh@crosscompute.com', + url='https://github.com/invisibleroads/socketIOClient', install_requires=[ + 'anyjson', 'websocket-client', ], packages=find_packages(), diff --git a/socketIO/__init__.py b/socketIO/__init__.py deleted file mode 100644 index f27e324..0000000 --- a/socketIO/__init__.py +++ /dev/null @@ -1,71 +0,0 @@ -from simplejson import dumps -from threading import Thread, Event -from urllib import urlopen -from websocket import create_connection - - -class SocketIO(object): - - def __init__(self, host, port): - self.host = host - self.port = int(port) - self.__do_handshake() - self.__connect() - self.heartbeatThread = RhythmicThread(self.heartbeatTimeout - 2, self.__send_heartbeat) - self.heartbeatThread.start() - - def __do_handshake(self): - try: - response = urlopen('http://%s:%d/socket.io/1/' % (self.host, self.port)) - except IOError: - raise SocketIOError('Could not start connection') - if 200 != response.getcode(): - raise SocketIOError('Could not establish connection') - self.sessionID, heartbeatTimeout, connectionTimeout, supportedTransports = response.readline().split(':') - self.heartbeatTimeout = int(heartbeatTimeout) - self.connectionTimeout = int(connectionTimeout) - if 'websocket' not in supportedTransports.split(','): - raise SocketIOError('Could not parse handshake') - - def __connect(self): - self.connection = create_connection('ws://%s:%d/socket.io/1/websocket/%s' % (self.host, self.port, self.sessionID)) - - def __del__(self): - try: - self.heartbeatThread.cancel() - self.connection.close() - except AttributeError: - pass - - def __send_heartbeat(self): - self.connection.send('2::') - - def emit(self, eventName, eventData): - self.connection.send('5:::' + dumps(dict(name=eventName, args=eventData))) - - -class SocketIOError(Exception): - pass - - -class RhythmicThread(Thread): - 'Execute function every few seconds' - - daemon = True - - def __init__(self, intervalInSeconds, function, *args, **kw): - super(RhythmicThread, self).__init__() - self.intervalInSeconds = intervalInSeconds - self.function = function - self.args = args - self.kw = kw - self.done = Event() - - def cancel(self): - self.done.set() - - def run(self): - self.done.wait(self.intervalInSeconds) - while not self.done.is_set(): - self.function(*self.args, **self.kw) - self.done.wait(self.intervalInSeconds) diff --git a/socketIOClient/__init__.py b/socketIOClient/__init__.py new file mode 100644 index 0000000..fefbdea --- /dev/null +++ b/socketIOClient/__init__.py @@ -0,0 +1,319 @@ +import websocket +from anyjson import dumps, loads +from threading import Thread, Event +from urllib import urlopen + + +__version__ = '0.1.2' + + +PROTOCOL = 1 # SocketIO protocol version + + +class BaseNamespace(object): + + def __init__(self, socketIO): + self.socketIO = socketIO + + def on_connect(self, socketIO): + pass + + def on_disconnect(self): + pass + + def on_error(self, reason, advice): + print '[Error] %s' % advice + + def on_message(self, messageData): + print '[Message] %s' % messageData + + def on_(self, eventName, *eventArguments): + print '[Event] %s%s' % (eventName, eventArguments) + + def on_open(self, *args): + print '[Open]', args + + def on_close(self, *args): + print '[Close]', args + + def on_retry(self, *args): + print '[Retry]', args + + def on_reconnect(self, *args): + print '[Reconnect]', args + + +class SocketIO(object): + + messageID = 0 + + def __init__(self, host, port, Namespace=BaseNamespace): + self.host = host + self.port = int(port) + self.namespace = Namespace(self) + self.__connect() + + heartbeatInterval = self.heartbeatTimeout - 2 + self.heartbeatThread = RhythmicThread(heartbeatInterval, + self._send_heartbeat) + self.heartbeatThread.start() + + self.channelByName = {} + self.callbackByEvent = {} + self.namespaceThread = ListenerThread(self) + self.namespaceThread.start() + + def __del__(self): + self.heartbeatThread.cancel() + self.namespaceThread.cancel() + self.connection.close() + + def __connect(self): + baseURL = '%s:%d/socket.io/%s' % (self.host, self.port, PROTOCOL) + try: + response = urlopen('http://%s/' % baseURL) + except IOError: + raise SocketIOError('Could not start connection') + if 200 != response.getcode(): + raise SocketIOError('Could not establish connection') + responseParts = response.readline().split(':') + self.sessionID = responseParts[0] + self.heartbeatTimeout = int(responseParts[1]) + self.connectionTimeout = int(responseParts[2]) + self.supportedTransports = responseParts[3].split(',') + if 'websocket' not in self.supportedTransports: + raise SocketIOError('Could not parse handshake') + socketURL = 'ws://%s/websocket/%s' % (baseURL, self.sessionID) + self.connection = websocket.create_connection(socketURL) + + def _recv_packet(self): + packetID, channelName, data = None, None, None + try: + packet = self.connection.recv() + packetParts = packet.split(':', 3) + except (websocket.WebSocketException, AttributeError): + return 0, packetID, channelName, data + packetCount = len(packetParts) + if 4 == packetCount: + code, packetID, channelName, data = packetParts + elif 3 == packetCount: + code, packetID, channelName = packetParts + elif 1 == packetCount: + code = packetParts[0] + else: + raise ValueError('Could not parse packet:\n' + packet) + return int(code), packetID, channelName, data + + def _send_packet(self, code, channelName='', data='', callback=None): + try: + self.connection.send(':'.join([ + str(code), + self.set_callback(callback) if callback else '', + channelName, + data])) + except Exception, e: + pass + + def disconnect(self, channelName=''): + self._send_packet(0, channelName) + if channelName: + del self.channelByName[channelName] + + @property + def connected(self): + return self.connection.connected + + def connect(self, channelName, Namespace=BaseNamespace): + channel = Channel(self, channelName, Namespace) + self.channelByName[channelName] = channel + self._send_packet(1, channelName) + return channel + + def _send_heartbeat(self): + self._send_packet(2) + + def message(self, messageData, callback=None, channelName=''): + if isinstance(messageData, basestring): + code = 3 + data = messageData + else: + code = 4 + data = dumps(messageData) + self._send_packet(code, channelName, data, callback) + + def emit(self, eventName, *eventArguments, **eventKeywords): + code = 5 + if callable(eventArguments[-1]): + callback = eventArguments[-1] + eventArguments = eventArguments[:-1] + else: + callback = None + channelName = eventKeywords.get('channelName', '') + data = dumps(dict(name=eventName, args=eventArguments)) + self._send_packet(code, channelName, data, callback) + + def get_callback(self, channelName, eventName): + 'Get callback associated with channelName and eventName' + socketIO = self.channelByName[channelName] if channelName else self + try: + return socketIO.callbackByEvent[eventName] + except KeyError: + pass + namespace = socketIO.namespace + + def callback_(*eventArguments): + return namespace.on_(eventName, *eventArguments) + return getattr(namespace, name_callback(eventName), callback_) + + def set_callback(self, callback): + 'Set callback that will be called after receiving an acknowledgment' + self.messageID += 1 + self.namespaceThread.set_callback(self.messageID, callback) + return '%s+' % self.messageID + + def on(self, eventName, callback): + self.callbackByEvent[eventName] = callback + + def wait(self): + self.namespaceThread.wait() + + +class Channel(object): + + def __init__(self, socketIO, channelName, Namespace): + self.socketIO = socketIO + self.channelName = channelName + self.namespace = Namespace(self) + self.callbackByEvent = {} + + def disconnect(self): + self.socketIO.disconnect(self.channelName) + + def emit(self, eventName, *eventArguments): + self.socketIO.emit(eventName, *eventArguments, + channelName=self.channelName) + + def message(self, messageData, callback=None): + self.socketIO.message(messageData, callback, + channelName=self.channelName) + + def on(self, eventName, eventCallback): + self.callbackByEvent[eventName] = eventCallback + + +class ListenerThread(Thread): + 'Process messages from SocketIO server' + + daemon = True + + def __init__(self, socketIO): + super(ListenerThread, self).__init__() + self.socketIO = socketIO + self.done = Event() + self.waiting = Event() + self.callbackByMessageID = {} + self.get_callback = self.socketIO.get_callback + + def run(self): + while not self.done.is_set(): + code, packetID, channelName, data = self.socketIO._recv_packet() + delegate = { + 0: self.on_disconnect, + 1: self.on_connect, + 2: self.on_heartbeat, + 3: self.on_message, + 4: self.on_json, + 5: self.on_event, + 6: self.on_acknowledgment, + 7: self.on_error, + }[code] + delegate(packetID, channelName, data) + + def cancel(self): + self.done.set() + + def wait(self): + self.waiting.set() + self.join() + + def set_callback(self, messageID, callback): + self.callbackByMessageID[messageID] = callback + + def on_disconnect(self, packetID, channelName, data): + callback = self.get_callback(channelName, 'disconnect') + callback() + + def on_connect(self, packetID, channelName, data): + callback = self.get_callback(channelName, 'connect') + callback(self.socketIO) + + def on_heartbeat(self, packetID, channelName, data): + pass + + def on_message(self, packetID, channelName, data): + callback = self.get_callback(channelName, 'message') + callback(data) + + def on_json(self, packetID, channelName, data): + callback = self.get_callback(channelName, 'message') + callback(loads(data)) + + def on_event(self, packetID, channelName, data): + valueByName = loads(data) + eventName = valueByName['name'] + eventArguments = valueByName['args'] + callback = self.get_callback(channelName, eventName) + callback(*eventArguments) + + def on_acknowledgment(self, packetID, channelName, data): + dataParts = data.split('+', 1) + messageID = int(dataParts[0]) + print data + print dataParts + print dataParts[1] + arguments = loads(dataParts[1]) or [] + try: + callback = self.callbackByMessageID[messageID] + except KeyError: + pass + else: + del self.callbackByMessageID[messageID] + callback(*arguments) + if self.waiting.is_set() and not len(self.callbackByMessageID): + self.cancel() + + def on_error(self, packetID, channelName, data): + reason, advice = data.split('+', 1) + callback = self.get_callback(channelName, 'error') + callback(reason, advice) + + +class RhythmicThread(Thread): + 'Execute rhythmicFunction every few seconds' + + daemon = True + + def __init__(self, intervalInSeconds, rhythmicFunction, *args, **kw): + super(RhythmicThread, self).__init__() + self.intervalInSeconds = intervalInSeconds + self.rhythmicFunction = rhythmicFunction + self.args = args + self.kw = kw + self.done = Event() + + def run(self): + self.done.wait(self.intervalInSeconds) + while not self.done.is_set(): + self.rhythmicFunction(*self.args, **self.kw) + self.done.wait(self.intervalInSeconds) + + def cancel(self): + self.done.set() + + +class SocketIOError(Exception): + pass + + +def name_callback(eventName): + return 'on_' + eventName.replace(' ', '_') diff --git a/socketIOClient/tests.py b/socketIOClient/tests.py new file mode 100644 index 0000000..409e9bb --- /dev/null +++ b/socketIOClient/tests.py @@ -0,0 +1,56 @@ +from socketIOClient import SocketIO, BaseNamespace +from time import sleep +from unittest import TestCase + + +PAYLOAD = {'bbb': 'ccc'} +ON_RESPONSE_CALLED = False + + +class TestSocketIO(TestCase): + + def test_emit(self): + socketIO = SocketIO('localhost', 8000, Namespace) + socketIO.emit('aaa', PAYLOAD) + sleep(0.5) + self.assertEqual(socketIO.namespace.payload, PAYLOAD) + + def test_emit_with_callback(self): + global ON_RESPONSE_CALLED + ON_RESPONSE_CALLED = False + socketIO = SocketIO('localhost', 8000) + socketIO.emit('aaa', PAYLOAD, on_response) + socketIO.wait() + self.assertEqual(ON_RESPONSE_CALLED, True) + + def test_events(self): + global ON_RESPONSE_CALLED + ON_RESPONSE_CALLED = False + socketIO = SocketIO('localhost', 8000) + socketIO.on('ddd', on_response) + socketIO.emit('aaa', PAYLOAD) + sleep(0.5) + self.assertEqual(ON_RESPONSE_CALLED, True) + + def test_channels(self): + mainSocket = SocketIO('localhost', 8000, Namespace) + chatSocket = mainSocket.connect('/chat', Namespace) + newsSocket = mainSocket.connect('/news', Namespace) + newsSocket.emit('aaa', PAYLOAD) + sleep(0.5) + self.assertNotEqual(mainSocket.namespace.payload, PAYLOAD) + self.assertNotEqual(chatSocket.namespace.payload, PAYLOAD) + self.assertEqual(newsSocket.namespace.payload, PAYLOAD) + + +class Namespace(BaseNamespace): + + payload = None + + def on_ddd(self, data): + self.payload = data + + +def on_response(*args): + global ON_RESPONSE_CALLED + ON_RESPONSE_CALLED = True