From ce5aceb7d9fc0e0b6cc4347658266e5bebd3d162 Mon Sep 17 00:00:00 2001 From: Roy Hyunjin Han Date: Fri, 10 Aug 2012 19:08:16 -0400 Subject: [PATCH] Improved exception handling in heartbeatThread and namespaceThread --- CHANGES.rst | 6 ++ README.rst | 42 ++++++++++---- TODO.rst | 1 - setup.cfg | 5 ++ socketIO_client/__init__.py | 106 +++++++++++++++++++++--------------- socketIO_client/tests.py | 7 ++- 6 files changed, 110 insertions(+), 57 deletions(-) create mode 100644 setup.cfg diff --git a/CHANGES.rst b/CHANGES.rst index 94e8c72..8afe01b 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,9 @@ +0.3 +--- +- Added support for secure connections +- Added socketIO.wait() +- Improved exception handling in heartbeatThread and namespaceThread + 0.2 --- - Added support for callbacks and channels thanks to Paul Kienzle diff --git a/README.rst b/README.rst index 853cf00..bba2b1d 100644 --- a/README.rst +++ b/README.rst @@ -38,27 +38,29 @@ Emit. :: socketIO = SocketIO('localhost', 8000) socketIO.emit('aaa', {'bbb': 'ccc'}) + socketIO.wait(seconds=1) Emit with callback. :: from socketIO_client import SocketIO - def on_response(arg1, arg2, arg3, arg4): - print arg1, arg2, arg3, arg4 + def on_response(*args): + print args socketIO = SocketIO('localhost', 8000) socketIO.emit('aaa', {'bbb': 'ccc'}, on_response) - socketIO.wait() + socketIO.wait(forCallbacks=True) Define events. :: from socketIO_client import SocketIO - def on_ddd(arg1, arg2, arg3, arg4): - print arg1, arg2, arg3, arg4 + def on_ddd(*args): + print args socketIO = SocketIO('localhost', 8000) socketIO.on('ddd', on_ddd) + socketIO.wait() Define events in a namespace. :: @@ -66,10 +68,11 @@ Define events in a namespace. :: class Namespace(BaseNamespace): - def on_ddd(self, arg1, arg2): - self.socketIO.emit('eee', {'fff': arg1 + arg2}) + def on_ddd(self, *args): + self.socketIO.emit('eee', {'fff': 'ggg'}) socketIO = SocketIO('localhost', 8000, Namespace) + socketIO.wait() Define standard events. :: @@ -90,12 +93,31 @@ Define standard events. :: print '[Message] %s: %s' % (id, message) socketIO = SocketIO('localhost', 8000, Namespace) + socketIO.wait() Define different behavior for different channels on a single socket. :: - mainSocket = SocketIO('localhost', 8000, MainNamespace()) - chatSocket = mainSocket.connect('/chat', ChatNamespace()) - newsSocket = mainSocket.connect('/news', NewsNamespace()) + from socketIO_client import SocketIO, BaseNamespace + + class MainNamespace(BaseNamespace): + + def on_aaa(self, *args): + print 'aaa', args + + class ChatNamespace(BaseNamespace): + + def on_bbb(self, *args): + print 'bbb', args + + class NewsNamespace(BaseNamespace): + + def on_ccc(self, *args): + print 'ccc', args + + mainSocket = SocketIO('localhost', 8000, MainNamespace) + chatSocket = mainSocket.connect('/chat', ChatNamespace) + newsSocket = mainSocket.connect('/news', NewsNamespace) + mainSocket.wait() License diff --git a/TODO.rst b/TODO.rst index f8bee38..e69de29 100644 --- a/TODO.rst +++ b/TODO.rst @@ -1 +0,0 @@ -- Consider enabling multiple callbacks for a single event diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..8a2014f --- /dev/null +++ b/setup.cfg @@ -0,0 +1,5 @@ +[nosetests] +detailed-errors=TRUE +with-coverage=TRUE +cover-package=socketIO_client +cover-erase=TRUE diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index a3313b0..e32996d 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -1,16 +1,17 @@ import websocket from anyjson import dumps, loads from threading import Thread, Event +from time import sleep from urllib import urlopen -__version__ = '0.2' +__version__ = '0.3' PROTOCOL = 1 # SocketIO protocol version -class BaseNamespace(object): +class BaseNamespace(object): # pragma: no cover def __init__(self, socketIO): self.socketIO = socketIO @@ -47,10 +48,11 @@ class SocketIO(object): messageID = 0 - def __init__(self, host, port, Namespace=BaseNamespace): + def __init__(self, host, port, Namespace=BaseNamespace, secure=False): self.host = host self.port = int(port) self.namespace = Namespace(self) + self.secure = secure self.__connect() heartbeatInterval = self.heartbeatTimeout - 2 @@ -63,7 +65,7 @@ class SocketIO(object): self.namespaceThread = ListenerThread(self) self.namespaceThread.start() - def __del__(self): + def __del__(self): # pragma: no cover self.heartbeatThread.cancel() self.namespaceThread.cancel() self.connection.close() @@ -71,10 +73,11 @@ class SocketIO(object): def __connect(self): baseURL = '%s:%d/socket.io/%s' % (self.host, self.port, PROTOCOL) try: - response = urlopen('http://%s/' % baseURL) - except IOError: + response = urlopen('%s://%s/' % ( + 'https' if self.secure else 'http', baseURL)) + except IOError: # pragma: no cover raise SocketIOError('Could not start connection') - if 200 != response.getcode(): + if 200 != response.getcode(): # pragma: no cover raise SocketIOError('Could not establish connection') responseParts = response.readline().split(':') self.sessionID = responseParts[0] @@ -82,26 +85,22 @@ class SocketIO(object): self.connectionTimeout = int(responseParts[2]) self.supportedTransports = responseParts[3].split(',') if 'websocket' not in self.supportedTransports: - raise SocketIOError('Could not parse handshake') - socketURL = 'ws://%s/websocket/%s' % (baseURL, self.sessionID) + raise SocketIOError('Could not parse handshake') # pragma: no cover + socketURL = '%s://%s/websocket/%s' % ( + 'wss' if self.secure else 'ws', baseURL, self.sessionID) self.connection = websocket.create_connection(socketURL) def _recv_packet(self): - packetID, channelName, data = None, None, None - try: - packet = self.connection.recv() - packetParts = packet.split(':', 3) - except (websocket.WebSocketException, AttributeError): - return 0, packetID, channelName, data + code, packetID, channelName, data = -1, None, None, None + packet = self.connection.recv() + packetParts = packet.split(':', 3) packetCount = len(packetParts) if 4 == packetCount: code, packetID, channelName, data = packetParts elif 3 == packetCount: code, packetID, channelName = packetParts - elif 1 == packetCount: + elif 1 == packetCount: # pragma: no cover code = packetParts[0] - else: - raise ValueError('Could not parse packet:\n' + packet) return int(code), packetID, channelName, data def _send_packet(self, code, channelName='', data='', callback=None): @@ -115,6 +114,8 @@ class SocketIO(object): self._send_packet(0, channelName) if channelName: del self.channelByName[channelName] + else: + self.__del__() @property def connected(self): @@ -129,8 +130,8 @@ class SocketIO(object): def _send_heartbeat(self): try: self._send_packet(2) - except TypeError: - pass + except: + self.__del__() def message(self, messageData, callback=None, channelName=''): if isinstance(messageData, basestring): @@ -174,8 +175,17 @@ class SocketIO(object): def on(self, eventName, callback): self.callbackByEvent[eventName] = callback - def wait(self): - self.namespaceThread.wait() + def wait(self, seconds=None, forCallbacks=False): + if forCallbacks: + self.namespaceThread.wait_for_callbacks(seconds) + elif seconds: + sleep(seconds) + else: + try: + while self.connected: + sleep(1) + except KeyboardInterrupt: + pass class Channel(object): @@ -210,31 +220,37 @@ class ListenerThread(Thread): super(ListenerThread, self).__init__() self.socketIO = socketIO self.done = Event() - self.waiting = Event() + self.waitingForCallbacks = Event() self.callbackByMessageID = {} self.get_callback = self.socketIO.get_callback def run(self): while not self.done.is_set(): - code, packetID, channelName, data = self.socketIO._recv_packet() - delegate = { - 0: self.on_disconnect, - 1: self.on_connect, - 2: self.on_heartbeat, - 3: self.on_message, - 4: self.on_json, - 5: self.on_event, - 6: self.on_acknowledgment, - 7: self.on_error, - }[code] + try: + code, packetID, channelName, data = self.socketIO._recv_packet() + except: + continue + try: + delegate = { + 0: self.on_disconnect, + 1: self.on_connect, + 2: self.on_heartbeat, + 3: self.on_message, + 4: self.on_json, + 5: self.on_event, + 6: self.on_acknowledgment, + 7: self.on_error, + }[code] + except KeyError: + continue delegate(packetID, channelName, data) def cancel(self): self.done.set() - def wait(self): - self.waiting.set() - self.join() + def wait_for_callbacks(self, seconds): + self.waitingForCallbacks.set() + self.join(seconds) def set_callback(self, messageID, callback): self.callbackByMessageID[messageID] = callback @@ -268,9 +284,6 @@ class ListenerThread(Thread): def on_acknowledgment(self, packetID, channelName, data): dataParts = data.split('+', 1) messageID = int(dataParts[0]) - print data - print dataParts - print dataParts[1] arguments = loads(dataParts[1]) or [] try: callback = self.callbackByMessageID[messageID] @@ -279,7 +292,8 @@ class ListenerThread(Thread): else: del self.callbackByMessageID[messageID] callback(*arguments) - if self.waiting.is_set() and not len(self.callbackByMessageID): + callbackCount = len(self.callbackByMessageID) + if self.waitingForCallbacks.is_set() and not callbackCount: self.cancel() def on_error(self, packetID, channelName, data): @@ -302,10 +316,12 @@ class RhythmicThread(Thread): self.done = Event() def run(self): - self.done.wait(self.intervalInSeconds) - while not self.done.is_set(): - self.rhythmicFunction(*self.args, **self.kw) - self.done.wait(self.intervalInSeconds) + try: + while not self.done.is_set(): + self.rhythmicFunction(*self.args, **self.kw) + self.done.wait(self.intervalInSeconds) + except: + pass def cancel(self): self.done.set() diff --git a/socketIO_client/tests.py b/socketIO_client/tests.py index 241d37e..a85c48e 100644 --- a/socketIO_client/tests.py +++ b/socketIO_client/tests.py @@ -9,6 +9,11 @@ ON_RESPONSE_CALLED = False class TestSocketIO(TestCase): + def test_disconnect(self): + socketIO = SocketIO('localhost', 8000) + socketIO.disconnect() + self.assertEqual(socketIO.connected, False) + def test_emit(self): socketIO = SocketIO('localhost', 8000, Namespace) socketIO.emit('aaa', PAYLOAD) @@ -20,7 +25,7 @@ class TestSocketIO(TestCase): ON_RESPONSE_CALLED = False socketIO = SocketIO('localhost', 8000) socketIO.emit('aaa', PAYLOAD, on_response) - socketIO.wait() + socketIO.wait(forCallbacks=True) self.assertEqual(ON_RESPONSE_CALLED, True) def test_events(self):