Improved exception handling in heartbeatThread and namespaceThread

This commit is contained in:
Roy Hyunjin Han 2012-08-10 19:08:16 -04:00
commit ce5aceb7d9
6 changed files with 110 additions and 57 deletions

View file

@ -1,3 +1,9 @@
0.3
---
- Added support for secure connections
- Added socketIO.wait()
- Improved exception handling in heartbeatThread and namespaceThread
0.2
---
- Added support for callbacks and channels thanks to Paul Kienzle

View file

@ -38,27 +38,29 @@ Emit. ::
socketIO = SocketIO('localhost', 8000)
socketIO.emit('aaa', {'bbb': 'ccc'})
socketIO.wait(seconds=1)
Emit with callback. ::
from socketIO_client import SocketIO
def on_response(arg1, arg2, arg3, arg4):
print arg1, arg2, arg3, arg4
def on_response(*args):
print args
socketIO = SocketIO('localhost', 8000)
socketIO.emit('aaa', {'bbb': 'ccc'}, on_response)
socketIO.wait()
socketIO.wait(forCallbacks=True)
Define events. ::
from socketIO_client import SocketIO
def on_ddd(arg1, arg2, arg3, arg4):
print arg1, arg2, arg3, arg4
def on_ddd(*args):
print args
socketIO = SocketIO('localhost', 8000)
socketIO.on('ddd', on_ddd)
socketIO.wait()
Define events in a namespace. ::
@ -66,10 +68,11 @@ Define events in a namespace. ::
class Namespace(BaseNamespace):
def on_ddd(self, arg1, arg2):
self.socketIO.emit('eee', {'fff': arg1 + arg2})
def on_ddd(self, *args):
self.socketIO.emit('eee', {'fff': 'ggg'})
socketIO = SocketIO('localhost', 8000, Namespace)
socketIO.wait()
Define standard events. ::
@ -90,12 +93,31 @@ Define standard events. ::
print '[Message] %s: %s' % (id, message)
socketIO = SocketIO('localhost', 8000, Namespace)
socketIO.wait()
Define different behavior for different channels on a single socket. ::
mainSocket = SocketIO('localhost', 8000, MainNamespace())
chatSocket = mainSocket.connect('/chat', ChatNamespace())
newsSocket = mainSocket.connect('/news', NewsNamespace())
from socketIO_client import SocketIO, BaseNamespace
class MainNamespace(BaseNamespace):
def on_aaa(self, *args):
print 'aaa', args
class ChatNamespace(BaseNamespace):
def on_bbb(self, *args):
print 'bbb', args
class NewsNamespace(BaseNamespace):
def on_ccc(self, *args):
print 'ccc', args
mainSocket = SocketIO('localhost', 8000, MainNamespace)
chatSocket = mainSocket.connect('/chat', ChatNamespace)
newsSocket = mainSocket.connect('/news', NewsNamespace)
mainSocket.wait()
License

View file

@ -1 +0,0 @@
- Consider enabling multiple callbacks for a single event

5
setup.cfg Normal file
View file

@ -0,0 +1,5 @@
[nosetests]
detailed-errors=TRUE
with-coverage=TRUE
cover-package=socketIO_client
cover-erase=TRUE

View file

@ -1,16 +1,17 @@
import websocket
from anyjson import dumps, loads
from threading import Thread, Event
from time import sleep
from urllib import urlopen
__version__ = '0.2'
__version__ = '0.3'
PROTOCOL = 1 # SocketIO protocol version
class BaseNamespace(object):
class BaseNamespace(object): # pragma: no cover
def __init__(self, socketIO):
self.socketIO = socketIO
@ -47,10 +48,11 @@ class SocketIO(object):
messageID = 0
def __init__(self, host, port, Namespace=BaseNamespace):
def __init__(self, host, port, Namespace=BaseNamespace, secure=False):
self.host = host
self.port = int(port)
self.namespace = Namespace(self)
self.secure = secure
self.__connect()
heartbeatInterval = self.heartbeatTimeout - 2
@ -63,7 +65,7 @@ class SocketIO(object):
self.namespaceThread = ListenerThread(self)
self.namespaceThread.start()
def __del__(self):
def __del__(self): # pragma: no cover
self.heartbeatThread.cancel()
self.namespaceThread.cancel()
self.connection.close()
@ -71,10 +73,11 @@ class SocketIO(object):
def __connect(self):
baseURL = '%s:%d/socket.io/%s' % (self.host, self.port, PROTOCOL)
try:
response = urlopen('http://%s/' % baseURL)
except IOError:
response = urlopen('%s://%s/' % (
'https' if self.secure else 'http', baseURL))
except IOError: # pragma: no cover
raise SocketIOError('Could not start connection')
if 200 != response.getcode():
if 200 != response.getcode(): # pragma: no cover
raise SocketIOError('Could not establish connection')
responseParts = response.readline().split(':')
self.sessionID = responseParts[0]
@ -82,26 +85,22 @@ class SocketIO(object):
self.connectionTimeout = int(responseParts[2])
self.supportedTransports = responseParts[3].split(',')
if 'websocket' not in self.supportedTransports:
raise SocketIOError('Could not parse handshake')
socketURL = 'ws://%s/websocket/%s' % (baseURL, self.sessionID)
raise SocketIOError('Could not parse handshake') # pragma: no cover
socketURL = '%s://%s/websocket/%s' % (
'wss' if self.secure else 'ws', baseURL, self.sessionID)
self.connection = websocket.create_connection(socketURL)
def _recv_packet(self):
packetID, channelName, data = None, None, None
try:
packet = self.connection.recv()
packetParts = packet.split(':', 3)
except (websocket.WebSocketException, AttributeError):
return 0, packetID, channelName, data
code, packetID, channelName, data = -1, None, None, None
packet = self.connection.recv()
packetParts = packet.split(':', 3)
packetCount = len(packetParts)
if 4 == packetCount:
code, packetID, channelName, data = packetParts
elif 3 == packetCount:
code, packetID, channelName = packetParts
elif 1 == packetCount:
elif 1 == packetCount: # pragma: no cover
code = packetParts[0]
else:
raise ValueError('Could not parse packet:\n' + packet)
return int(code), packetID, channelName, data
def _send_packet(self, code, channelName='', data='', callback=None):
@ -115,6 +114,8 @@ class SocketIO(object):
self._send_packet(0, channelName)
if channelName:
del self.channelByName[channelName]
else:
self.__del__()
@property
def connected(self):
@ -129,8 +130,8 @@ class SocketIO(object):
def _send_heartbeat(self):
try:
self._send_packet(2)
except TypeError:
pass
except:
self.__del__()
def message(self, messageData, callback=None, channelName=''):
if isinstance(messageData, basestring):
@ -174,8 +175,17 @@ class SocketIO(object):
def on(self, eventName, callback):
self.callbackByEvent[eventName] = callback
def wait(self):
self.namespaceThread.wait()
def wait(self, seconds=None, forCallbacks=False):
if forCallbacks:
self.namespaceThread.wait_for_callbacks(seconds)
elif seconds:
sleep(seconds)
else:
try:
while self.connected:
sleep(1)
except KeyboardInterrupt:
pass
class Channel(object):
@ -210,31 +220,37 @@ class ListenerThread(Thread):
super(ListenerThread, self).__init__()
self.socketIO = socketIO
self.done = Event()
self.waiting = Event()
self.waitingForCallbacks = Event()
self.callbackByMessageID = {}
self.get_callback = self.socketIO.get_callback
def run(self):
while not self.done.is_set():
code, packetID, channelName, data = self.socketIO._recv_packet()
delegate = {
0: self.on_disconnect,
1: self.on_connect,
2: self.on_heartbeat,
3: self.on_message,
4: self.on_json,
5: self.on_event,
6: self.on_acknowledgment,
7: self.on_error,
}[code]
try:
code, packetID, channelName, data = self.socketIO._recv_packet()
except:
continue
try:
delegate = {
0: self.on_disconnect,
1: self.on_connect,
2: self.on_heartbeat,
3: self.on_message,
4: self.on_json,
5: self.on_event,
6: self.on_acknowledgment,
7: self.on_error,
}[code]
except KeyError:
continue
delegate(packetID, channelName, data)
def cancel(self):
self.done.set()
def wait(self):
self.waiting.set()
self.join()
def wait_for_callbacks(self, seconds):
self.waitingForCallbacks.set()
self.join(seconds)
def set_callback(self, messageID, callback):
self.callbackByMessageID[messageID] = callback
@ -268,9 +284,6 @@ class ListenerThread(Thread):
def on_acknowledgment(self, packetID, channelName, data):
dataParts = data.split('+', 1)
messageID = int(dataParts[0])
print data
print dataParts
print dataParts[1]
arguments = loads(dataParts[1]) or []
try:
callback = self.callbackByMessageID[messageID]
@ -279,7 +292,8 @@ class ListenerThread(Thread):
else:
del self.callbackByMessageID[messageID]
callback(*arguments)
if self.waiting.is_set() and not len(self.callbackByMessageID):
callbackCount = len(self.callbackByMessageID)
if self.waitingForCallbacks.is_set() and not callbackCount:
self.cancel()
def on_error(self, packetID, channelName, data):
@ -302,10 +316,12 @@ class RhythmicThread(Thread):
self.done = Event()
def run(self):
self.done.wait(self.intervalInSeconds)
while not self.done.is_set():
self.rhythmicFunction(*self.args, **self.kw)
self.done.wait(self.intervalInSeconds)
try:
while not self.done.is_set():
self.rhythmicFunction(*self.args, **self.kw)
self.done.wait(self.intervalInSeconds)
except:
pass
def cancel(self):
self.done.set()

View file

@ -9,6 +9,11 @@ ON_RESPONSE_CALLED = False
class TestSocketIO(TestCase):
def test_disconnect(self):
socketIO = SocketIO('localhost', 8000)
socketIO.disconnect()
self.assertEqual(socketIO.connected, False)
def test_emit(self):
socketIO = SocketIO('localhost', 8000, Namespace)
socketIO.emit('aaa', PAYLOAD)
@ -20,7 +25,7 @@ class TestSocketIO(TestCase):
ON_RESPONSE_CALLED = False
socketIO = SocketIO('localhost', 8000)
socketIO.emit('aaa', PAYLOAD, on_response)
socketIO.wait()
socketIO.wait(forCallbacks=True)
self.assertEqual(ON_RESPONSE_CALLED, True)
def test_events(self):