From f34c7014fb766e8ec2e30d199ce7622d3e8c8abb Mon Sep 17 00:00:00 2001 From: Roy Hyunjin Han Date: Thu, 19 Feb 2015 21:39:09 -0500 Subject: [PATCH] Passed a unit test --- .travis.yml | 2 +- socketIO_client/__init__.py | 309 ++++++++++++++++++++---------------- 2 files changed, 170 insertions(+), 141 deletions(-) diff --git a/.travis.yml b/.travis.yml index af9d4b0..2d8f75d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,7 +7,7 @@ before_install: - sudo apt-get update - sudo apt-get install nodejs install: - - npm install -G socket.io + - npm install -G socket.io@1.3.1 - npm install -G http-proxy - pip install -U requests - pip install -U six diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index 1860de4..99a9a2d 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -3,6 +3,7 @@ import logging import requests import threading import time +from collections import namedtuple from .exceptions import PacketError, TimeoutError from .transports import _get_response @@ -10,10 +11,116 @@ from .transports import _get_response __version__ = '0.6.1' _log = logging.getLogger(__name__) +EngineIOData = namedtuple('EngineIOData', ['data']) +SocketIOData = namedtuple('SocketIOData', ['path', 'ack_id', 'event', 'args']) TRANSPORTS = [] RETRY_INTERVAL_IN_SECONDS = 1 +class EngineIONamespace(object): + 'Define engine.io client behavior' + + def __init__(self, io): + self._io = io + self._callback_by_event = {} + self.initialize() + + def initialize(self): + 'Initialize custom variables here; you can override this method' + pass + + def on_event(self, event, *args): + """ + Called after server sends an event; you can override this method. + Called only if a custom event handler does not exist, + such as one defined by namespace.on('my_event', my_function). + """ + callback, args = find_callback(args) + if callback: + callback(*args) + + def _find_packet_callback(self, event): + # Check callbacks defined by on() + try: + return self._callback_by_event[event] + except KeyError: + pass + # Check callbacks defined explicitly or use on_event() + return getattr( + self, 'on_' + event.replace(' ', '_'), + lambda *args: self.on_event(event, *args)) + + +class SocketIONamespace(EngineIONamespace): + 'Define socket.io client behavior' + + def __init__(self, io, path): + self.path = path + super(SocketIONamespace, self).__init__(io) + + def on_connect(self): + 'Called after server connects; you can override this method' + + def on_reconnect(self): + 'Called after server reconnects; you can override this method' + + def on_disconnect(self): + 'Called after server disconnects; you can override this method' + + def _find_packet_callback(self, event): + # Interpret events + if event == 'connect': + if not hasattr(self, '_was_connected'): + self._was_connected = True + else: + event = 'reconnect' + return super(SocketIONamespace, self)._find_packet_callback(event) + + +class LoggingMixin(object): + + def _log(self, level, msg, *attrs): + _log.log(level, '%s: %s' % (self._io._url, msg), *attrs) + + +class LoggingEngineIONamespace(EngineIONamespace, LoggingMixin): + + def on_event(self, event, *args): + callback, args = find_callback(args) + arguments = [repr(_) for _ in args] + if callback: + arguments.append('callback(*args)') + self._log( + logging.INFO, '[event] %s(%s)', + event, ', '.join(arguments)) + super(LoggingEngineIONamespace, self).on_event(event, *args) + + +class LoggingSocketIONamespace(SocketIONamespace, LoggingMixin): + + def on_event(self, event, *args): + callback, args = find_callback(args) + arguments = [repr(_) for _ in args] + if callback: + arguments.append('callback(*args)') + self._log( + logging.INFO, '%s [event] %s(%s)', self.path, + event, ', '.join(arguments)) + super(LoggingSocketIONamespace, self).on_event(event, *args) + + def on_connect(self): + self._log(logging.DEBUG, '%s [connect]', self.path) + super(LoggingSocketIONamespace, self).on_connect() + + def on_reconnect(self): + self._log(logging.DEBUG, '%s [reconnect]', self.path) + super(LoggingSocketIONamespace, self).on_reconnect() + + def on_disconnect(self): + self._log(logging.DEBUG, '%s [disconnect]', self.path) + super(LoggingSocketIONamespace, self).on_disconnect() + + class EngineIO(object): _engineIO_protocol = 3 @@ -38,12 +145,12 @@ class EngineIO(object): engineIO_packets = _decode_content(response.content) engineIO_packet_type, engineIO_packet_data = engineIO_packets[0] assert engineIO_packet_type == 0 - packet_json = json.loads(engineIO_packet_data) - print packet_json - # packet_json['upgrades'] - self._ping_interval = packet_json['pingInterval'] / float(1000) - self._ping_timeout = packet_json['pingTimeout'] / float(1000) - self._session_id = packet_json['sid'] + value_by_name = json.loads(engineIO_packet_data) + print(value_by_name) + # value_by_name['upgrades'] + self._ping_interval = value_by_name['pingInterval'] / float(1000) + self._ping_timeout = value_by_name['pingTimeout'] / float(1000) + self._session_id = value_by_name['sid'] if Namespace: self.define(Namespace) @@ -250,8 +357,6 @@ class SocketIO(EngineIO): except KeyError: raise PacketError( 'unexpected socket.io packet type (%s)' % socketIO_packet_type) - print socketIO_packet_data_parsed - print namespace._find_packet_callback delegate(socketIO_packet_data_parsed, namespace._find_packet_callback) return socketIO_packet_data @@ -262,7 +367,11 @@ class SocketIO(EngineIO): find_packet_callback('disconnect')() def _on_event(self, data_parsed, find_packet_callback): - pass + args = data_parsed.args + if data_parsed.ack_id: + args.append(self._prepare_to_send_ack( + data_parsed.path, data_parsed.ack_id)) + find_packet_callback(data_parsed.event)(*args) def _on_ack(self, data_parsed, find_packet_callback): pass @@ -276,109 +385,44 @@ class SocketIO(EngineIO): def _on_binary_ack(self, data_parsed, find_packet_callback): pass - -class EngineIONamespace(object): - 'Define engine.io client behavior' - - def __init__(self, io): - self._io = io - self._callback_by_event = {} - self.initialize() - - def initialize(self): - 'Initialize custom variables here; you can override this method' - pass - - def on_event(self, event, *args): - """ - Called after server sends an event; you can override this method. - Called only if a custom event handler does not exist, - such as one defined by namespace.on('my_event', my_function). - """ - callback, args = find_callback(args) - if callback: - callback(*args) - - def _find_packet_callback(self, event): - # Check callbacks defined by on() - try: - return self._callback_by_event[event] - except KeyError: - pass - # Check callbacks defined explicitly or use on_event() - return getattr( - self, 'on_' + event.replace(' ', '_'), - lambda *args: self.on_event(event, *args)) + def _prepare_to_send_ack(self, path, ack_id): + 'Return function that acknowledges the server' + return lambda *args: self._ack(path, ack_id, *args) -class SocketIONamespace(EngineIONamespace): - 'Define socket.io client behavior' +class HeartbeatThread(threading.Thread): - def __init__(self, io, path): - self.path = path - super(SocketIONamespace, self).__init__(io) + daemon = True - def on_connect(self): - 'Called after server connects; you can override this method' + def __init__( + self, send_heartbeat, + relax_interval_in_seconds, + hurry_interval_in_seconds): + super(HeartbeatThread, self).__init__() + self._send_heartbeat = send_heartbeat + self._relax_interval_in_seconds = relax_interval_in_seconds + self._hurry_interval_in_seconds = hurry_interval_in_seconds + self._adrenaline = threading.Event() + self._rest = threading.Event() + self._stop = threading.Event() - def on_reconnect(self): - 'Called after server reconnects; you can override this method' - - def on_disconnect(self): - 'Called after server disconnects; you can override this method' - - def _find_packet_callback(self, event): - # Interpret events - if event == 'connect': - if not hasattr(self, '_was_connected'): - self._was_connected = True + def run(self): + while not self._stop.is_set(): + self._send_heartbeat() + if self._adrenaline.is_set(): + interval_in_seconds = self._hurry_interval_in_seconds else: - event = 'reconnect' - return super(SocketIONamespace, self)._find_packet_callback(event) + interval_in_seconds = self._relax_interval_in_seconds + self._rest.wait(interval_in_seconds) + def relax(self): + self._adrenaline.clear() -class LoggingMixin(object): + def hurry(self): + self._adrenaline.set() - def _log(self, level, msg, *attrs): - _log.log(level, '%s: %s' % (self._io._url, msg), *attrs) - - -class LoggingEngineIONamespace(EngineIONamespace, LoggingMixin): - - def on_event(self, event, *args): - callback, args = find_callback(args) - arguments = [repr(_) for _ in args] - if callback: - arguments.append('callback(*args)') - self._log( - logging.INFO, '[event] %s(%s)', - event, ', '.join(arguments)) - super(LoggingEngineIONamespace, self).on_event(event, *args) - - -class LoggingSocketIONamespace(SocketIONamespace, LoggingMixin): - - def on_event(self, event, *args): - callback, args = find_callback(args) - arguments = [repr(_) for _ in args] - if callback: - arguments.append('callback(*args)') - self._log( - logging.INFO, '%s [event] %s(%s)', self.path, - event, ', '.join(arguments)) - super(LoggingSocketIONamespace, self).on_event(event, *args) - - def on_connect(self): - self._log(logging.DEBUG, '%s [connect]', self.path) - super(LoggingSocketIONamespace, self).on_connect() - - def on_reconnect(self): - self._log(logging.DEBUG, '%s [reconnect]', self.path) - super(LoggingSocketIONamespace, self).on_reconnect() - - def on_disconnect(self): - self._log(logging.DEBUG, '%s [disconnect]', self.path) - super(LoggingSocketIONamespace, self).on_disconnect() + def stop(self): + self._stop.set() def find_callback(args, kw=None): @@ -445,11 +489,31 @@ def _make_packet_header(packet_string): def _parse_engineIO_data(data): - return data + return EngineIOData(data=data) def _parse_socketIO_data(data): - return data + if data.startswith('/'): + try: + path, data = data.split(',', 1) + except ValueError: + path = data + data = '' + else: + path = '' + try: + ack_id = int(data[0]) + data = data[1:] + except (ValueError, IndexError): + ack_id = None + if data: + x = json.loads(data) + event = x[0] + args = x[1:] + else: + event = '' + args = [] + return SocketIOData(path=path, ack_id=ack_id, event=event, args=args) def _yield_warning_screen(seconds=None): @@ -472,38 +536,3 @@ def _yield_elapsed_time(seconds=None): yield time.time() - start_time while time.time() - start_time < seconds: yield time.time() - start_time - - -class HeartbeatThread(threading.Thread): - - daemon = True - - def __init__( - self, send_heartbeat, - relax_interval_in_seconds, - hurry_interval_in_seconds): - super(HeartbeatThread, self).__init__() - self._send_heartbeat = send_heartbeat - self._relax_interval_in_seconds = relax_interval_in_seconds - self._hurry_interval_in_seconds = hurry_interval_in_seconds - self._adrenaline = threading.Event() - self._rest = threading.Event() - self._stop = threading.Event() - - def run(self): - while not self._stop.is_set(): - self._send_heartbeat() - if self._adrenaline.is_set(): - interval_in_seconds = self._hurry_interval_in_seconds - else: - interval_in_seconds = self._relax_interval_in_seconds - self._rest.wait(interval_in_seconds) - - def relax(self): - self._adrenaline.clear() - - def hurry(self): - self._adrenaline.set() - - def stop(self): - self._stop.set()