diff --git a/TODO.rst b/TODO.rst index e362fcd..b15199b 100644 --- a/TODO.rst +++ b/TODO.rst @@ -1,7 +1,8 @@ + Fix unittests + Fix exceptions when websocket server disappears - Fix thread exceptions + Finish creating low-level _SocketIO to eliminate cyclic references + Move namespace and callback handling to _ListenerThread Integrate Zac's fork #6 Integrate Sajal's fork #7 diff --git a/experiments/callableweakref-tests.py b/experiments/callableweakref-tests.py new file mode 100644 index 0000000..c7a95ec --- /dev/null +++ b/experiments/callableweakref-tests.py @@ -0,0 +1,170 @@ +# Note 1: The expected behavior of weakref to an instance is that it should +# return None if no other strong references to the instance exist, +# signalling that the instance can be safely garbage collected. +# +# Note 2: The expected behavior of weakref to a method is that it should +# return None if no other strong references to the parent instance exist, +# signalling that the parent instance can be safely garbage collected. +# +# Note 3: The IPython interpreter stores its own references and will +# produce results different from those of the default Python interpreter. +import callableweakref +import weakref +import unittest + + +class TestCallableWeakRef(unittest.TestCase): + + def test_instanceDirectWeakref_dies_on_arrival(self): + 'Assert that weakref works as expected' + instanceDirectWeakref = weakref.ref(Instance()) + assert instanceDirectWeakref() is None + + def test_instanceIndirectWeakref_dies_when_instance_dies(self): + 'Assert that weakref works as expected' + instance = Instance() + instanceIndirectWeakref = weakref.ref(instance) + assert instanceIndirectWeakref() is instance + del instance + assert instanceIndirectWeakref() is None + + def test_boundMethodDirectWeakref_dies_on_arrival(self): + 'Assert that weakref does not work as expected' + instance = Instance() + boundMethodDirectWeakref = weakref.ref(instance.call) + assert boundMethodDirectWeakref() is None # Should be instance + + def test_boundMethodIndirectWeakref_lives_when_instance_dies(self): + 'Assert that weakref works as expected' + instance = Instance() + boundMethod = instance.call + boundMethodIndirectWeakref = weakref.ref(boundMethod) + assert boundMethodIndirectWeakref() is boundMethod + del instance + assert boundMethodIndirectWeakref() is boundMethod + del boundMethod + assert boundMethodIndirectWeakref() is None + + def test_unboundMethodDirectWeakref_dies_on_arrival(self): + 'Assert that weakref does not work as expected' + unboundMethodDirectWeakref = weakref.ref(Instance.call) + assert unboundMethodDirectWeakref() is None # Should be Instance.call + + def test_unboundMethodIndirectWeakref_dies_when_unboundMethod_dies(self): + 'Assert that weakref works as expected' + unboundMethod = Instance.call + unboundMethodIndirectWeakref = weakref.ref(unboundMethod) + assert unboundMethodIndirectWeakref() is unboundMethod + del unboundMethod + assert unboundMethodIndirectWeakref() is None + + def test_classFunctionDirectWeakref_dies_on_arrival(self): + 'Assert that weakref works as expected' + classFunctionDirectWeakref = weakref.ref(Instance()) + assert classFunctionDirectWeakref() is None + + def test_classFunctionIndirectWeakref_dies_when_classFunction_dies(self): + 'Assert that weakref works as expected' + classFunction = Instance() + classFunctionIndirectWeakref = weakref.ref(classFunction) + assert classFunctionIndirectWeakref() is classFunction + del classFunction + assert classFunctionIndirectWeakref() is None + + def test_normalFunctionDirectWeakref_dies_when_normal_function_dies(self): + 'Assert that weakref works as expected' + call = lambda: None + normalFunctionDirectWeakref = weakref.ref(call) + assert normalFunctionDirectWeakref() is call + del call + assert normalFunctionDirectWeakref() is None + + def test_normalFunctionIndirectWeakref_dies_when_normal_function_dies(self): + 'Assert that weakref works as expected' + call = lambda: None + normalFunction = call + normalFunctionIndirectWeakref = weakref.ref(normalFunction) + assert normalFunctionIndirectWeakref() is normalFunction + del normalFunction + assert normalFunctionIndirectWeakref() is call + del call + assert normalFunctionIndirectWeakref() is None + + # Assert that CallableWeakref works as expected for callables + + def test_boundMethodDirectCallableWeakref_dies_when_instance_dies(self): + instance = Instance() + boundMethodDirectCallableWeakref = callableweakref.ref(instance.call) + self.assertEqual(boundMethodDirectCallableWeakref(), instance.call) + del instance + assert boundMethodDirectCallableWeakref() is None + + def test_boundMethodIndirectCallableWeakref_dies_when_instance_dies(self): + instance = Instance() + boundMethod = instance.call + boundMethodIndirectCallableWeakref = callableweakref.ref(boundMethod) + self.assertEqual(boundMethodIndirectCallableWeakref(), boundMethod) + del instance + self.assertEqual(boundMethodIndirectCallableWeakref(), boundMethod) + del boundMethod + assert boundMethodIndirectCallableWeakref() is None + + def test_unboundMethodDirectCallableWeakref_lives_on_arrival(self): + unboundMethodDirectCallableWeakref = callableweakref.ref(Instance.call) + self.assertEqual(unboundMethodDirectCallableWeakref(), Instance.call) + + def test_unboundMethodIndirectCallableWeakref_lives_on_arrival(self): + unboundMethod = Instance.call + unboundMethodIndirectCallableWeakref = callableweakref.ref(unboundMethod) + self.assertEqual(unboundMethodIndirectCallableWeakref(), unboundMethod) + + def test_classMethodIndirectCallableWeakref_lives_on_arrival(self): + classMethod = Instance.call_classmethod + classMethodIndirectCallableWeakref = callableweakref.ref(classMethod) + self.assertEqual(classMethodIndirectCallableWeakref(), classMethod) + + def test_staticMethodIndirectCallableWeakref_lives_on_arrival(self): + staticMethod = Instance.call_staticmethod + staticMethodIndirectCallableWeakref = callableweakref.ref(staticMethod) + self.assertEqual(staticMethodIndirectCallableWeakref(), staticMethod) + + def test_classFunctionDirectCallableWeakref_dies_on_arrival(self): + classFunctionDirectCallableWeakref = callableweakref.ref(Instance()) + assert classFunctionDirectCallableWeakref() is None + + def test_classFunctionIndirectCallableWeakref_dies_when_classFunction_dies(self): + classFunction = Instance() + classFunctionIndirectCallableWeakref = callableweakref.ref(classFunction) + self.assertEqual(classFunctionIndirectCallableWeakref(), classFunction.__call__) + del classFunction + assert classFunctionIndirectCallableWeakref() is None + + def test_normalFunctionDirectCallableWeakref_lives_on_arrival(self): + call = lambda: None + normalFunctionDirectCallableWeakref = callableweakref.ref(call) + self.assertEqual(normalFunctionDirectCallableWeakref(), call) + + def test_normalFunctionIndirectCallableWeakref_lives_on_arrival(self): + call = lambda: None + normalFunction = call + normalFunctionIndirectWeakref = callableweakref.ref(normalFunction) + self.assertEqual(normalFunctionIndirectWeakref(), normalFunction) + del normalFunction + self.assertEqual(normalFunctionIndirectWeakref(), call) + + +class Instance(object): + + def __call__(self): + pass + + def call(self): + pass + + @classmethod + def call_classmethod(Class): + pass + + @staticmethod + def call_staticmethod(): + pass diff --git a/experiments/callableweakref.py b/experiments/callableweakref.py new file mode 100644 index 0000000..788592c --- /dev/null +++ b/experiments/callableweakref.py @@ -0,0 +1,52 @@ +import types +import weakref + + +def ref(function): + return CallableWeakReference(function) + + +class CallableWeakReference(object): + + def __init__(self, function): + 'Create a weak reference to a callable' + try: + if function.im_self: + # We have a bound method or class method + self._reference = weakref.ref(function.im_self) + else: + # We have an unbound method + self._reference = None + self._function = function.im_func + self._class = function.im_class + except AttributeError: + try: + function.func_code + # We have a normal function or static method + self._reference = None + self._function = function + self._class = None + except AttributeError: + function = function.__call__ + # We have a class masquerading as a function + self._reference = weakref.ref(function.im_self) + self._function = function.im_func + self._class = function.im_class + + def __call__(self): + if self.dead: + return + if self._reference: + # We have a bound method + return types.MethodType(self._function, self._reference(), self._class) + elif self._class: + return types.MethodType(self._function, None, self._class) + else: + return self._function + + @property + def dead(self): + if self._reference and not self._reference(): + # We have a bound method whose parent reference has died + return True + return False diff --git a/experiments/t0.py b/experiments/t0.py new file mode 100644 index 0000000..3980052 --- /dev/null +++ b/experiments/t0.py @@ -0,0 +1,6 @@ +from socketIO_client import SocketIO + +s = SocketIO('localhost', 8000) +del s +from time import sleep +sleep(3) diff --git a/experiments/t1.py b/experiments/t1.py new file mode 100644 index 0000000..0955ed6 --- /dev/null +++ b/experiments/t1.py @@ -0,0 +1,7 @@ +class O(object): + + def __del__(self): + print '__del__()' + + +o = O() diff --git a/experiments/t10.py b/experiments/t10.py new file mode 100644 index 0000000..24733d9 --- /dev/null +++ b/experiments/t10.py @@ -0,0 +1,16 @@ +class A(object): + + def __init__(self): + self.b = B(self) + + def __del__(self): + print '__del__()' + + +class B(object): + + def __init__(self, a): + self.a = a + + +a = A() diff --git a/experiments/t11.py b/experiments/t11.py new file mode 100644 index 0000000..11121de --- /dev/null +++ b/experiments/t11.py @@ -0,0 +1,21 @@ +class A(object): + + def __init__(self): + self.c = Common() + self.b = B(self.c) + + def __del__(self): + print '__del__()' + + +class B(object): + + def __init__(self, c): + self.c = c + + +class Common(object): + pass + + +a = A() diff --git a/experiments/t12.py b/experiments/t12.py new file mode 100644 index 0000000..b4e9bb3 --- /dev/null +++ b/experiments/t12.py @@ -0,0 +1,32 @@ +# Will the destructor of a class be called if its two children have cyclic +# references to each other? Yes. + + +class Parent(object): + + def __init__(self): + self.child = Child() + + def __del__(self): + print 'Parent.__del__()' + + +class Child(object): + + def __init__(self): + self.grandChild = GrandChild(self) + + def __del__(self): + print 'Child.__del__()' + + +class GrandChild(object): + + def __init__(self, parent): + self.parent = parent + + def __del__(self): + print 'GrandChild.__del__()' + + +parent = Parent() diff --git a/experiments/t2.py b/experiments/t2.py new file mode 100644 index 0000000..3ad124c --- /dev/null +++ b/experiments/t2.py @@ -0,0 +1,19 @@ +import weakref + + +class O(object): + + def __init__(self): + self.p = P(self) + + def __del__(self): + print '__del__()' + + +class P(object): + + def __init__(self, parent): + self.parent = weakref.ref(parent) + + +o = O() diff --git a/experiments/t3.py b/experiments/t3.py new file mode 100644 index 0000000..2de3123 --- /dev/null +++ b/experiments/t3.py @@ -0,0 +1,19 @@ +class O(object): + + def __init__(self): + self.p = P(self.f) + + def __del__(self): + print '__del__()' + + def f(self): + pass + + +class P(object): + + def __init__(self, parentMethod): + self.parentMethod = parentMethod + + +o = O() diff --git a/experiments/t4.py b/experiments/t4.py new file mode 100644 index 0000000..da46dfe --- /dev/null +++ b/experiments/t4.py @@ -0,0 +1,26 @@ +import weakref + + +class O(object): + + def __init__(self): + self.p = P(self.f) + + def __del__(self): + print '__del__()' + + def f(self): + pass + + +class P(object): + + def __init__(self, parentMethod): + self.parentMethod = weakref.ref(parentMethod) + + def show(self): + print self.parentMethod + + +o = O() +o.p.show() # Dead on arrival diff --git a/experiments/t5.py b/experiments/t5.py new file mode 100644 index 0000000..a2ff710 --- /dev/null +++ b/experiments/t5.py @@ -0,0 +1,30 @@ +import callableweakref + + +class O(object): + + def __init__(self): + self.p = P(self.f) + + def __del__(self): + print '__del__()' + + def f(self): + print 'f()' + + +class P(object): + + def __init__(self, parentMethod): + self.parentMethod = callableweakref.ref(parentMethod) + + def show(self): + print self.parentMethod + + def run(self): + self.parentMethod()() + + +o = O() +o.p.show() +o.p.run() diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index 493748a..9ae06a2 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -1,18 +1,13 @@ -import sys -import traceback - import socket +import weakref from anyjson import dumps, loads from functools import partial -from threading import Thread, Event +from threading import Event, Thread from time import sleep from urllib import urlopen from websocket import WebSocketConnectionClosedException, create_connection -__version__ = '0.4' - - PROTOCOL = 1 # SocketIO protocol version @@ -49,112 +44,64 @@ class BaseNamespace(object): # pragma: no cover print '[Reconnect]', args +class Channel(object): + + def __init__(self, socketIO, channelName, Namespace): + self._socketIO = weakref.proxy(socketIO) + self._channelName = channelName + self._namespace = Namespace(self) + self._callbackByEvent = {} + + def disconnect(self): + self._socketIO.disconnect(self._channelName) + + def emit(self, eventName, *eventArguments): + self._socketIO.emit(eventName, *eventArguments, channelName=self._channelName) + + def message(self, messageData, callback=None): + self._socketIO.message(messageData, callback, channelName=self._channelName) + + def on(self, eventName, eventCallback): + self._callbackByEvent[eventName] = eventCallback + + class SocketIO(object): - _messageID = 0 - def __init__(self, host, port, Namespace=BaseNamespace, secure=False, proxies=None): - self._host = host - self._port = int(port) - self._namespace = Namespace(self) - self._secure = secure - self._proxies = proxies - self._connect() + self._socketIO = _SocketIO(host, port, secure, proxies) - heartbeatInterval = self._heartbeatTimeout - 2 - self._heartbeatThread = RhythmicThread(heartbeatInterval, self._send_heartbeat) + self._heartbeatThread = _RhythmicThread( + self._socketIO.heartbeatTimeout, + self._socketIO.send_heartbeat) self._heartbeatThread.start() - self._channelByName = {} - self._callbackByEvent = {} - self._namespaceThread = ListenerThread(self._recv_packet, self._get_callback) + self._namespace = Namespace(self._socketIO) + self._namespaceThread = _ListenerThread(self._socketIO) self._namespaceThread.start() def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): - self.__del__() + self.disconnect() def __del__(self): - self._heartbeatThread.cancel() - self._namespaceThread.cancel() - self._connection.close() - - def _connect(self): - baseURL = '%s:%d/socket.io/%s' % (self._host, self._port, PROTOCOL) - try: - response = urlopen('%s://%s/' % ( - 'https' if self._secure else 'http', baseURL), - proxies=self._proxies) - except IOError: # pragma: no cover - raise SocketIOError('Could not start connection') - if 200 != response.getcode(): # pragma: no cover - raise SocketIOError('Could not establish connection') - responseParts = response.readline().split(':') - self._sessionID = responseParts[0] - self._heartbeatTimeout = int(responseParts[1]) - self._connectionTimeout = int(responseParts[2]) - self._supportedTransports = responseParts[3].split(',') - if 'websocket' not in self._supportedTransports: - raise SocketIOError('Could not parse handshake') # pragma: no cover - socketURL = '%s://%s/websocket/%s' % ( - 'wss' if self._secure else 'ws', baseURL, self._sessionID) - self._connection = create_connection(socketURL) - - def _recv_packet(self): - code, packetID, channelName, data = -1, None, None, None - try: - packet = self._connection.recv() - except WebSocketConnectionClosedException: - raise SocketIOConnectionError('Lost connection (Connection closed)') - except socket.timeout: - raise SocketIOConnectionError('Lost connection (Connection timed out)') - try: - packetParts = packet.split(':', 3) - except AttributeError: - raise SocketIOPacketError('Received invalid packet (%s)' % packet) - packetCount = len(packetParts) - if 4 == packetCount: - code, packetID, channelName, data = packetParts - elif 3 == packetCount: - code, packetID, channelName = packetParts - elif 1 == packetCount: # pragma: no cover - code = packetParts[0] - return int(code), packetID, channelName, data - - def _send_packet(self, code, channelName='', data='', callback=None): - callbackNumber = self._set_callback(callback) if callback else '' - packetParts = [str(code), callbackNumber, channelName, data] - try: - self._connection.send(':'.join(packetParts)) - except socket.error: - raise SocketIOPacketError('Could not send packet') + self.disconnect() def disconnect(self, channelName=''): self._send_packet(0, channelName) if channelName: - del self._channelByName[channelName] + del self.channelByName[channelName] else: - self.__del__() - - @property - def connected(self): - return self._connection.connected + self._heartbeatThread.cancel() + self._namespaceThread.cancel() def connect(self, channelName, Namespace=BaseNamespace): channel = Channel(self, channelName, Namespace) - self._channelByName[channelName] = channel - self._send_packet(1, channelName) + self.channelByName[channelName] = channel + self.send_packet(1, channelName) return channel - def _send_heartbeat(self): - try: - self._send_packet(2) - except SocketIOPacketError: - print 'Could not send heartbeat' - pass - def message(self, messageData, callback=None, channelName=''): if isinstance(messageData, basestring): code = 3 @@ -174,25 +121,6 @@ class SocketIO(object): data = dumps(dict(name=eventName, args=eventArguments)) self._send_packet(code, channelName, data, callback) - def _get_callback(self, channelName, eventName): - 'Get callback associated with channelName and eventName' - socketIO = self._channelByName[channelName] if channelName else self - try: - return socketIO._callbackByEvent[eventName] - except KeyError: - pass - - def callback_(*eventArguments): - return socketIO._namespace.on_(eventName, *eventArguments) - callbackName = 'on_' + eventName.replace(' ', '_') - return getattr(socketIO._namespace, callbackName, callback_) - - def _set_callback(self, callback): - 'Set callback that will be called after receiving an acknowledgment' - self._messageID += 1 - self._namespaceThread.set_callback(self._messageID, callback) - return '%s+' % self._messageID - def on(self, eventName, callback): self._callbackByEvent[eventName] = callback @@ -203,75 +131,179 @@ class SocketIO(object): sleep(seconds) else: try: - while self.connected: + while self._socketIO.connected: sleep(1) except KeyboardInterrupt: pass -class Channel(object): +class _SocketIO(object): + 'Low-level interface to remove cyclic references in child threads' - def __init__(self, socketIO, channelName, Namespace): - self._socketIO = socketIO - self._channelName = channelName - self._namespace = Namespace(self) - self._callbackByEvent = {} + messageID = 0 - def disconnect(self): - self._socketIO.disconnect(self._channelName) + def __init__(self, host, port, secure=False, proxies=None): + self.connect(host, port, secure, proxies) + self.callbackByMessageID = {} + self.callbackByEvent = {} + self.channelByName = {} - def emit(self, eventName, *eventArguments): - self._socketIO.emit(eventName, *eventArguments, channelName=self._channelName) + def __del__(self): + self.connection.close() - def message(self, messageData, callback=None): - self._socketIO.message(messageData, callback, channelName=self._channelName) + def connect(self, host, port, secure, proxies): + baseURL = '%s:%d/socket.io/%s' % (host, port, PROTOCOL) + targetScheme = 'https' if secure else 'http' + targetURL = '%s://%s/' % (targetScheme, baseURL) + try: + response = urlopen(targetURL, proxies=proxies) + except IOError: # pragma: no cover + raise SocketIOError('Could not start connection') + if 200 != response.getcode(): # pragma: no cover + raise SocketIOError('Could not establish connection') + responseParts = response.readline().split(':') + sessionID = responseParts[0] + heartbeatTimeout = int(responseParts[1]) + # connectionTimeout = int(responseParts[2]) + supportedTransports = responseParts[3].split(',') + if 'websocket' not in supportedTransports: + raise SocketIOError('Could not parse handshake') # pragma: no cover + socketScheme = 'wss' if secure else 'ws' + socketURL = '%s://%s/websocket/%s' % (socketScheme, baseURL, sessionID) + self.connection = create_connection(socketURL) + self.heartbeatInterval = heartbeatTimeout - 2 - def on(self, eventName, eventCallback): - self._callbackByEvent[eventName] = eventCallback + def recv_packet(self): + code, packetID, channelName, data = -1, None, None, None + try: + packet = self.connection.recv() + except WebSocketConnectionClosedException: + raise SocketIOConnectionError('Lost connection (Connection closed)') + except socket.timeout: + raise SocketIOConnectionError('Lost connection (Connection timed out)') + try: + packetParts = packet.split(':', 3) + except AttributeError: + raise SocketIOPacketError('Received invalid packet (%s)' % packet) + packetCount = len(packetParts) + if 4 == packetCount: + code, packetID, channelName, data = packetParts + elif 3 == packetCount: + code, packetID, channelName = packetParts + elif 1 == packetCount: # pragma: no cover + code = packetParts[0] + return int(code), packetID, channelName, data + + def send_packet(self, code, channelName='', data='', callback=None): + callbackNumber = self.set_messageID_callback(callback) if callback else '' + packetParts = [str(code), callbackNumber, channelName, data] + try: + self.connection.send(':'.join(packetParts)) + except socket.error: + raise SocketIOPacketError('Could not send packet') + + def set_messageID_callback(self, callback): + 'Set callback that will be called after receiving an acknowledgment' + self.messageID += 1 + self.callbackByMessageID[self.messageID] = callback + return '%s+' % self.messageID + + def get_messageID_callback(self, messageID): + 'Get callback associated with messageID' + try: + callback = self.callbackByMessageID[messageID] + del self.callbackByMessageID[messageID] + return callback + except KeyError: + return + + @property + def has_messageID_callback(self): + return True if self.callbackByMessageID else False + + def get_event_callback(self, channelName, eventName): + 'Get callback associated with channelName and eventName' + _socketIO = self.channelByName[channelName] if channelName else self + try: + return _socketIO.callbackByEvent[eventName] + except KeyError: + pass + + def callback_(*eventArguments): + return _socketIO.namespace.on_(eventName, *eventArguments) + callbackName = 'on_' + eventName.replace(' ', '_') + return getattr(_socketIO.namespace, callbackName, callback_) + + @property + def connected(self): + return self.connection.connected + + def send_heartbeat(self): + try: + self.send_packet(2) + except SocketIOPacketError: + print 'Could not send heartbeat' + pass -class ListenerThread(Thread): +class _RhythmicThread(Thread): + 'Execute call every few seconds' + + daemon = True + + def __init__(self, intervalInSeconds, call, *args, **kw): + super(_RhythmicThread, self).__init__() + self.intervalInSeconds = intervalInSeconds + self.call + self.args = args + self.kw = kw + self.done = Event() + + def run(self): + while not self.done.is_set(): + self.call(*self.args, **self.kw) + self.done.wait(self.intervalInSeconds) + + def cancel(self): + self.done.set() + + +class _ListenerThread(Thread): 'Process messages from SocketIO server' daemon = True - def __init__(self, recv_packet, get_callback): - super(ListenerThread, self).__init__() + def __init__(self, _socketIO, get_event_callback, ): + super(_ListenerThread, self).__init__() + self._socketIO = _socketIO self.done = Event() self.waitingForCallbacks = Event() - self.callbackByMessageID = {} - self.recv_packet = recv_packet - self.get_callback = get_callback def run(self): - try: - while not self.done.is_set(): - try: - code, packetID, channelName, data = self.recv_packet() - except SocketIOConnectionError, error: - print error - return - except SocketIOPacketError, error: - print error - continue - get_channel_callback = partial(self.get_callback, channelName) - try: - delegate = { - 0: self.on_disconnect, - 1: self.on_connect, - 2: self.on_heartbeat, - 3: self.on_message, - 4: self.on_json, - 5: self.on_event, - 6: self.on_acknowledgment, - 7: self.on_error, - }[code] - except KeyError: - continue - delegate(packetID, get_channel_callback, data) - except: - exc_type, exc_value, exc_traceback = sys.exc_info() - open('tracebacks.log', 'a+t').write('\n'.join(traceback.format_tb(exc_traceback))) + while not self.done.is_set(): + try: + code, packetID, channelName, data = self._socketIO.recv_packet() + except SocketIOConnectionError, error: + print error + return + except SocketIOPacketError, error: + print error + continue + get_channel_callback = partial(self._socketIO.get_event_callback, channelName) + try: + delegate = { + 0: self.on_disconnect, + 1: self.on_connect, + 2: self.on_heartbeat, + 3: self.on_message, + 4: self.on_json, + 5: self.on_event, + 6: self.on_acknowledgment, + 7: self.on_error, + }[code] + except KeyError: + continue + delegate(packetID, get_channel_callback, data) def cancel(self): self.done.set() @@ -280,9 +312,6 @@ class ListenerThread(Thread): self.waitingForCallbacks.set() self.join(seconds) - def set_callback(self, messageID, callback): - self.callbackByMessageID[messageID] = callback - def on_disconnect(self, packetID, get_channel_callback, data): get_channel_callback('disconnect')() @@ -308,48 +337,18 @@ class ListenerThread(Thread): dataParts = data.split('+', 1) messageID = int(dataParts[0]) arguments = loads(dataParts[1]) or [] - try: - callback = self.callbackByMessageID[messageID] - except KeyError: - pass - else: - del self.callbackByMessageID[messageID] - callback(*arguments) - callbackCount = len(self.callbackByMessageID) - if self.waitingForCallbacks.is_set() and not callbackCount: - self.cancel() + callback = self._socketIO.get_messageID_callback(messageID) + if not callback: + return + callback(*arguments) + if self.waitingForCallbacks.is_set() and not self._socketIO.has_messageID_callback: + self.cancel() def on_error(self, packetID, get_channel_callback, data): reason, advice = data.split('+', 1) get_channel_callback('error')(reason, advice) -class RhythmicThread(Thread): - 'Execute call every few seconds' - - daemon = True - - def __init__(self, intervalInSeconds, call, *args, **kw): - super(RhythmicThread, self).__init__() - self.intervalInSeconds = intervalInSeconds - self.call = call - self.args = args - self.kw = kw - self.done = Event() - - def run(self): - try: - while not self.done.is_set(): - self.call(*self.args, **self.kw) - self.done.wait(self.intervalInSeconds) - except: - exc_type, exc_value, exc_traceback = sys.exc_info() - open('tracebacks.log', 'a+t').write('\n'.join(traceback.format_tb(exc_traceback))) - - def cancel(self): - self.done.set() - - class SocketIOError(Exception): pass