diff --git a/TODO.goals b/TODO.goals index 50a9c42..c163d11 100644 --- a/TODO.goals +++ b/TODO.goals @@ -1,6 +1,4 @@ Release 0.6.1 #41 #52 - Implement define - Implement wait Implement on Get pong @@ -9,6 +7,5 @@ Release 0.6.1 #41 #52 Put tests in index.html Update tests - Revive heartbeat as separate process Merge sarietta's pull request Implement rooms #65 diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index 589370d..1860de4 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -1,14 +1,17 @@ import json import logging import requests +import threading import time -from .exceptions import PacketError +from .exceptions import PacketError, TimeoutError +from .transports import _get_response __version__ = '0.6.1' _log = logging.getLogger(__name__) TRANSPORTS = [] +RETRY_INTERVAL_IN_SECONDS = 1 class EngineIO(object): @@ -22,10 +25,10 @@ class EngineIO(object): wait_for_connection=True, transports=TRANSPORTS, resource='engine.io', **kw): self._url = 'http://%s:%s/%s/' % (host, port, resource) - self._session = requests.Session() + self._http_session = requests.Session() print self._url - response = self._session.get(self._url, params={ + response = self._http_session.get(self._url, params={ 'EIO': self._engineIO_protocol, 'transport': 'polling', 't': self._get_timestamp(), @@ -35,23 +38,48 @@ class EngineIO(object): engineIO_packets = _decode_content(response.content) engineIO_packet_type, engineIO_packet_data = engineIO_packets[0] assert engineIO_packet_type == 0 - engineIO_packet_json = json.loads(engineIO_packet_data) - print engineIO_packet_json - # engineIO_packet_json['pingInterval'] - # engineIO_packet_json['pingTimeout'] - # engineIO_packet_json['upgrades'] - self._session_id = engineIO_packet_json['sid'] + packet_json = json.loads(engineIO_packet_data) + print packet_json + # packet_json['upgrades'] + self._ping_interval = packet_json['pingInterval'] / float(1000) + self._ping_timeout = packet_json['pingTimeout'] / float(1000) + self._session_id = packet_json['sid'] if Namespace: self.define(Namespace) - def wait(self): - while True: - self._process_packets() + self._heartbeat_thread = HeartbeatThread( + send_heartbeat=self._ping, + relax_interval_in_seconds=self._ping_interval, + hurry_interval_in_seconds=1) + self._heartbeat_thread.start() + + def define(self, Namespace): + self._namespace = namespace = Namespace(self) + return namespace + + def get_namespace(self): + try: + return self._namespace + except AttributeError: + raise PacketError('undefined engine.io namespace') + + def wait(self, seconds=None): + self._heartbeat_thread.hurry() + warning_screen = _yield_warning_screen(seconds) + for elapsed_time in warning_screen: + try: + self._process_packets() + except TimeoutError: + pass + self._heartbeat_thread.relax() def _process_packets(self): for engineIO_packet in self._recv_packet(): - self._process_packet(engineIO_packet) + try: + self._process_packet(engineIO_packet) + except PacketError as e: + self._log(logging.WARNING, '[packet error] %s', e) def _process_packet(self, packet): engineIO_packet_type, engineIO_packet_data = packet @@ -74,17 +102,8 @@ class EngineIO(object): raise PacketError( 'unexpected engine.io packet type (%s)' % engineIO_packet_type) delegate(engineIO_packet_data_parsed, namespace._find_packet_callback) - return engineIO_packet_data - - def define(self, Namespace): - self._namespace = namespace = Namespace(self) - return namespace - - def get_namespace(self): - try: - return self._namespace - except AttributeError: - raise PacketError('undefined engine.io namespace') + if engineIO_packet_type is 4: + return engineIO_packet_data def _on_open(self, data_parsed, find_packet_callback): pass @@ -115,7 +134,7 @@ class EngineIO(object): def _message(self, engineIO_packet_data): engineIO_packet_type = 4 - response = self._session.post(self._url, params={ + response = self._http_session.post(self._url, params={ 'EIO': self._engineIO_protocol, 'transport': 'polling', 't': self._get_timestamp(), @@ -125,6 +144,7 @@ class EngineIO(object): ]), headers={ 'content-type': 'application/octet-stream', }) + print 'message()' engineIO_packets = _decode_content(response.content) for engineIO_packet_type, engineIO_packet_data in engineIO_packets: socketIO_packet_type = int(engineIO_packet_data[0]) @@ -136,7 +156,7 @@ class EngineIO(object): def _ping(self): engineIO_packet_type = 2 engineIO_packet_data = '' - response = self._session.post(self._url, params={ + response = self._http_session.post(self._url, params={ 'EIO': self._engineIO_protocol, 'transport': 'polling', 't': self._get_timestamp(), @@ -146,6 +166,7 @@ class EngineIO(object): ]), headers={ 'content-type': 'application/octet-stream', }) + print 'ping()' engineIO_packets = _decode_content(response.content) for engineIO_packet_type, engineIO_packet_data in engineIO_packets: socketIO_packet_type = int(engineIO_packet_data[0]) @@ -155,15 +176,22 @@ class EngineIO(object): print 'socketIO_packet_data = %s' % socketIO_packet_data def _recv_packet(self): - response = self._session.get(self._url, params={ - 'EIO': self._engineIO_protocol, - 'transport': 'polling', - 't': self._get_timestamp(), - 'sid': self._session_id, - }) + response = _get_response( + self._http_session.get, + self._url, + params={ + 'EIO': self._engineIO_protocol, + 'transport': 'polling', + 't': self._get_timestamp(), + 'sid': self._session_id, + }, + timeout=self._ping_timeout) for engineIO_packet in _decode_content(response.content): yield engineIO_packet + def _log(self, level, msg, *attrs): + _log.log(level, '%s: %s' % (self._url, msg), *attrs) + class SocketIO(EngineIO): @@ -200,6 +228,8 @@ class SocketIO(EngineIO): def _process_packet(self, packet): engineIO_packet_data = super(SocketIO, self)._process_packet(packet) + if engineIO_packet_data is None: + return socketIO_packet_type = int(engineIO_packet_data[0]) socketIO_packet_data = engineIO_packet_data[1:] print 'socketIO_packet_type = %s' % socketIO_packet_type @@ -220,6 +250,8 @@ class SocketIO(EngineIO): except KeyError: raise PacketError( 'unexpected socket.io packet type (%s)' % socketIO_packet_type) + print socketIO_packet_data_parsed + print namespace._find_packet_callback delegate(socketIO_packet_data_parsed, namespace._find_packet_callback) return socketIO_packet_data @@ -418,3 +450,60 @@ def _parse_engineIO_data(data): def _parse_socketIO_data(data): return data + + +def _yield_warning_screen(seconds=None): + last_warning = None + for elapsed_time in _yield_elapsed_time(seconds): + try: + yield elapsed_time + except Exception as warning: + warning = str(warning) + if last_warning != warning: + last_warning = warning + _log.warn(warning) + time.sleep(RETRY_INTERVAL_IN_SECONDS) + + +def _yield_elapsed_time(seconds=None): + start_time = time.time() + if seconds is None: + while True: + yield time.time() - start_time + while time.time() - start_time < seconds: + yield time.time() - start_time + + +class HeartbeatThread(threading.Thread): + + daemon = True + + def __init__( + self, send_heartbeat, + relax_interval_in_seconds, + hurry_interval_in_seconds): + super(HeartbeatThread, self).__init__() + self._send_heartbeat = send_heartbeat + self._relax_interval_in_seconds = relax_interval_in_seconds + self._hurry_interval_in_seconds = hurry_interval_in_seconds + self._adrenaline = threading.Event() + self._rest = threading.Event() + self._stop = threading.Event() + + def run(self): + while not self._stop.is_set(): + self._send_heartbeat() + if self._adrenaline.is_set(): + interval_in_seconds = self._hurry_interval_in_seconds + else: + interval_in_seconds = self._relax_interval_in_seconds + self._rest.wait(interval_in_seconds) + + def relax(self): + self._adrenaline.clear() + + def hurry(self): + self._adrenaline.set() + + def stop(self): + self._stop.set() diff --git a/socketIO_client/exceptions.py b/socketIO_client/exceptions.py index b90fe20..ed2b4d2 100644 --- a/socketIO_client/exceptions.py +++ b/socketIO_client/exceptions.py @@ -2,5 +2,13 @@ class SocketIOError(Exception): pass +class ConnectionError(SocketIOError): + pass + + +class TimeoutError(SocketIOError): + pass + + class PacketError(SocketIOError): pass diff --git a/socketIO_client/tests/__init__.py b/socketIO_client/tests/__init__.py index ed194c3..8143a05 100644 --- a/socketIO_client/tests/__init__.py +++ b/socketIO_client/tests/__init__.py @@ -35,6 +35,7 @@ class Namespace(LoggingSocketIONamespace): self.args_by_event = {} def on_event(self, event, *args): + print 'xxx *** xxx' callback, args = find_callback(args) if callback: callback(*args) diff --git a/socketIO_client/transports.py b/socketIO_client/transports.py new file mode 100644 index 0000000..7620855 --- /dev/null +++ b/socketIO_client/transports.py @@ -0,0 +1,17 @@ +import requests +from .exceptions import ConnectionError, TimeoutError + + +def _get_response(request, *args, **kw): + try: + response = request(*args, **kw) + except requests.exceptions.Timeout as e: + raise TimeoutError(e) + except requests.exceptions.ConnectionError as e: + raise ConnectionError(e) + except requests.exceptions.SSLError as e: + raise ConnectionError('could not negotiate SSL (%s)' % e) + status_code = response.status_code + if 200 != status_code: + raise ConnectionError('unexpected status code (%s)' % status_code) + return response