From 9afe5c2f2a7d40f321e6f062e40733d0045507a2 Mon Sep 17 00:00:00 2001 From: Roy Hyunjin Han Date: Fri, 20 Feb 2015 18:06:31 -0500 Subject: [PATCH] Isolate transport --- socketIO_client/__init__.py | 237 +++++++++++++++++++--------------- socketIO_client/compat.py | 4 + socketIO_client/transports.py | 45 +++++++ 3 files changed, 183 insertions(+), 103 deletions(-) diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index d390fda..0b29107 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -5,9 +5,9 @@ import threading import time from collections import namedtuple -from .compat import get_byte, get_character, get_unicode +from . import transports +from .compat import get_byte, get_character, get_unicode, parse_url from .exceptions import ConnectionError, TimeoutError, PacketError -from .transports import _get_response __version__ = '0.6.1' @@ -183,7 +183,6 @@ class LoggingSocketIONamespace(SocketIONamespace, LoggingMixin): class EngineIO(object): - _engineIO_protocol = 3 _engineIO_request_index = 0 def __init__( @@ -191,31 +190,22 @@ class EngineIO(object): host, port=None, Namespace=None, wait_for_connection=True, transports=TRANSPORTS, resource='engine.io', **kw): - self._url = 'http://%s:%s/%s/' % (host, port, resource) + self._is_secure, self._url = _parse_host(host, port, resource) + self._wait_for_connection = wait_for_connection + self._client_transports = transports + self._kw = kw self._http_session = requests.Session() - - response = self._http_session.get(self._url, params={ - 'EIO': self._engineIO_protocol, - 'transport': 'polling', - 't': self._get_timestamp(), - }) - engineIO_packets = _decode_engineIO_content(response.content) - engineIO_packet_type, engineIO_packet_data = engineIO_packets[0] - assert engineIO_packet_type == 0 - value_by_name = json.loads(get_unicode(engineIO_packet_data)) - # 'websocket' in value_by_name['upgrades'] - self._ping_interval = value_by_name['pingInterval'] / float(1000) - self._ping_timeout = value_by_name['pingTimeout'] / float(1000) - self._session_id = value_by_name['sid'] - if Namespace: self.define(Namespace) - self._heartbeat_thread = HeartbeatThread( - send_heartbeat=self._ping, - relax_interval_in_seconds=self._ping_interval, - hurry_interval_in_seconds=1) - self._heartbeat_thread.start() + def __enter__(self): + return self + + def __exit__(self, *exception_pack): + self.close() + + def __del__(self): + self.close() @property def connected(self): @@ -266,7 +256,6 @@ class EngineIO(object): namespace.on_disconnect() except PacketError: pass - self._heartbeat_thread.relax() def _should_stop_waiting(self): @@ -274,7 +263,7 @@ class EngineIO(object): return self.__transport._wants_to_disconnect def _process_packets(self): - for engineIO_packet in self._recv_packet(): + for engineIO_packet in self._transport.recv_packet(): try: self._process_packet(engineIO_packet) except PacketError as e: @@ -332,89 +321,100 @@ class EngineIO(object): def _message(self, engineIO_packet_data): engineIO_packet_type = 4 - response = self._http_session.post(self._url, params={ - 'EIO': self._engineIO_protocol, - 'transport': 'polling', - 't': self._get_timestamp(), - 'sid': self._session_id, - }, data=_encode_engineIO_content([ - (engineIO_packet_type, engineIO_packet_data), - ]), headers={ - 'content-type': 'application/octet-stream', - }) - print('message()') - print(response.content) - engineIO_packets = _decode_engineIO_content(response.content) - for engineIO_packet_type, engineIO_packet_data in engineIO_packets: - socketIO_packet_type = int(get_character(engineIO_packet_data, 0)) - socketIO_packet_data = engineIO_packet_data[1:] - print('engineIO_packet_type = %s' % engineIO_packet_type) - print('socketIO_packet_type = %s' % socketIO_packet_type) - print('socketIO_packet_data = %s' % socketIO_packet_data) + self._transport.send_packet(engineIO_packet_type, engineIO_packet_data) def _ping(self, engineIO_packet_data=''): engineIO_packet_type = 2 - response = self._http_session.post(self._url, params={ - 'EIO': self._engineIO_protocol, - 'transport': 'polling', - 't': self._get_timestamp(), - 'sid': self._session_id, - }, data=_encode_engineIO_content([ - (engineIO_packet_type, engineIO_packet_data), - ]), headers={ - 'content-type': 'application/octet-stream', - }) - print('ping()') - print(response.content) - engineIO_packets = _decode_engineIO_content(response.content) - for engineIO_packet_type, engineIO_packet_data in engineIO_packets: - socketIO_packet_type = int(get_character(engineIO_packet_data, 0)) - socketIO_packet_data = engineIO_packet_data[1:] - print('engineIO_packet_type = %s' % engineIO_packet_type) - print('socketIO_packet_type = %s' % socketIO_packet_type) - print('socketIO_packet_data = %s' % socketIO_packet_data) + self._transport.send_packet(engineIO_packet_type, engineIO_packet_data) def _pong(self, engineIO_packet_data=''): engineIO_packet_type = 3 - response = self._http_session.post(self._url, params={ - 'EIO': self._engineIO_protocol, - 'transport': 'polling', - 't': self._get_timestamp(), - 'sid': self._session_id, - }, data=_encode_engineIO_content([ - (engineIO_packet_type, engineIO_packet_data), - ]), headers={ - 'content-type': 'application/octet-stream', - }) - print('pong()') - print(response.content) - engineIO_packets = _decode_engineIO_content(response.content) - for engineIO_packet_type, engineIO_packet_data in engineIO_packets: - socketIO_packet_type = int(get_character(engineIO_packet_data, 0)) - socketIO_packet_data = engineIO_packet_data[1:] - print('engineIO_packet_type = %s' % engineIO_packet_type) - print('socketIO_packet_type = %s' % socketIO_packet_type) - print('socketIO_packet_data = %s' % socketIO_packet_data) + self._transport.send_packet(engineIO_packet_type, engineIO_packet_data) - def _recv_packet(self): - response = _get_response( - self._http_session.get, - self._url, - params={ - 'EIO': self._engineIO_protocol, - 'transport': 'polling', - 't': self._get_timestamp(), - 'sid': self._session_id, - }, - timeout=self._ping_timeout) - for engineIO_packet in _decode_engineIO_content(response.content): - yield engineIO_packet + @property + def _transport(self): + try: + if self.connected: + return self.__transport + except AttributeError: + pass + self._get_engineIO_session() + self._negotiate_transport() + self._reset_heartbeat() + self._connect_namespaces() + return self.__transport + + def _get_engineIO_session(self): + url = '%s://%s/' % ('https' if self.is_secure else 'http', self._url) + warning_screen = _yield_warning_screen() + for elapsed_time in warning_screen: + try: + engineIO_packet_type, engineIO_packet_data = next( + XHR_PollingTransport().recv_packet()) + + response = _get_response(self._http_session.get, url, params={ + 'EIO': self._engineIO_protocol, + 'transport': 'polling', + 't': self._get_timestamp(), + }, **self._kw) + except (TimeoutError, ConnectionError) as e: + if not self._wait_for_connection: + raise + warning = Exception('[waiting for connection] %s' % e) + warning_screen.throw(warning) + engineIO_packets = _decode_engineIO_content(response.content) + engineIO_packet_type, engineIO_packet_data = engineIO_packets[0] + assert engineIO_packet_type == 0 + value_by_name = json.loads(get_unicode(engineIO_packet_data)) + self._session_id = value_by_name['sid'] + self._ping_interval = value_by_name['pingInterval'] / float(1000) + self._ping_timeout = value_by_name['pingTimeout'] / float(1000) + self._transport_upgrades = value_by_name['upgrades'] + + def _negotiate_transport(self): + self.__transport = self._get_transport('xhr-polling') + + def _reset_heartbeat(self): + try: + self._heartbeat_thread.stop() + except AttributeError: + pass + self._heartbeat_thread = HeartbeatThread( + send_heartbeat=self.__transport._ping, + relax_interval_in_seconds=self._ping_interval, + hurry_interval_in_seconds=1) + self._heartbeat_thread.start() + + def _connect_namespaces(self): + pass + + def _get_transport(self, transport_name): + self._log(logging.DEBUG, '[transport chosen] %s', transport_name) + return { + 'xhr-polling': transports.XHR_PollingTransport, + }[transport_name]() def _log(self, level, msg, *attrs): _log.log(level, '%s: %s' % (self._url, msg), *attrs) class SocketIO(EngineIO): + """Create a socket.io client that connects to a socket.io server + at the specified host and port. + + - Define the behavior of the client by specifying a custom Namespace. + - Prefix host with https:// to use SSL. + - Set wait_for_connection=True to block until we have a connection. + - Specify desired transports=['websocket', 'xhr-polling']. + - Pass query params, headers, cookies, proxies as keyword arguments. + + SocketIO( + 'localhost', 8000, + params={'q': 'qqq'}, + headers={'Authorization': 'Basic ' + b64encode('username:password')}, + cookies={'a': 'aaa'}, + proxies={'https': 'https://proxy.example.com:8080'}) + """ def __init__( self, @@ -429,6 +429,14 @@ class SocketIO(EngineIO): wait_for_connection, transports, resource, **kw) + def __exit__(self, *exception_pack): + self.disconnect() + super(SocketIO, self).__exit__(*exception_pack) + + def __del__(self): + self.disconnect() + super(SocketIO, self).__del__() + def on(self, event, callback, path=''): try: namespace = self.get_namespace(path) @@ -555,6 +563,12 @@ class SocketIO(EngineIO): 'Return function that acknowledges the server' return lambda *args: self._ack(path, ack_id, *args) + def _connect_namespaces(self): + for path, namespace in self._namespace_by_path.items(): + namespace._transport = self.__transport + if path: + self.__transport.connect(path) + class HeartbeatThread(threading.Thread): @@ -573,13 +587,19 @@ class HeartbeatThread(threading.Thread): self._stop = threading.Event() def run(self): - while not self._stop.is_set(): - self._send_heartbeat() - if self._adrenaline.is_set(): - interval_in_seconds = self._hurry_interval_in_seconds - else: - interval_in_seconds = self._relax_interval_in_seconds - self._rest.wait(interval_in_seconds) + try: + while not self._stop.is_set(): + try: + self._send_heartbeat() + except TimeoutError: + pass + if self._adrenaline.is_set(): + interval_in_seconds = self._hurry_interval_in_seconds + else: + interval_in_seconds = self._relax_interval_in_seconds + self._rest.wait(interval_in_seconds) + except ConnectionError: + pass def relax(self): self._adrenaline.clear() @@ -590,6 +610,7 @@ class HeartbeatThread(threading.Thread): self._rest.clear() def stop(self): + self._rest.set() self._stop.set() @@ -603,6 +624,16 @@ def find_callback(args, kw=None): return None, args +def _parse_host(host, port, resource): + if not host.startswith('http'): + host = 'http://' + host + url_pack = parse_url(host) + is_secure = url_pack.scheme == 'https' + port = port or url_pack.port or (443 if is_secure else 80) + url = '%s:%d%s/%s' % (url_pack.hostname, port, url_pack.path, resource) + return is_secure, url + + def _decode_engineIO_content(content): packets = [] content_index = 0 diff --git a/socketIO_client/compat.py b/socketIO_client/compat.py index 1408e19..012993c 100644 --- a/socketIO_client/compat.py +++ b/socketIO_client/compat.py @@ -1,4 +1,8 @@ import six +try: + from urllib.parse import urlparse as parse_url +except ImportError: + from urlparse import urlparse as parse_url def get_byte(x, index): diff --git a/socketIO_client/transports.py b/socketIO_client/transports.py index 7620855..fe641a7 100644 --- a/socketIO_client/transports.py +++ b/socketIO_client/transports.py @@ -2,6 +2,51 @@ import requests from .exceptions import ConnectionError, TimeoutError +ENGINEIO_PROTOCOL = 3 + + +class AbstractTransport(object): + pass + + +class XHR_PollingTransport(AbstractTransport): + + # pass http session + # pass session id (can be none) + # pass timeout + + def send_packet(self, engineIO_packet_type, engineIO_packet_data): + _get_response() + + assert ok + + response = self._http_session.post(self._url, params={ + 'EIO': self._engineIO_protocol, + 'transport': 'polling', + 't': self._get_timestamp(), + 'sid': self._session_id, + }, data=_encode_engineIO_content([ + (engineIO_packet_type, engineIO_packet_data), + ]), headers={ + 'content-type': 'application/octet-stream', + }) + assert response.content == 'ok' + + def _recv_packet(self): + response = _get_response( + self._http_session.get, + self._url, + params={ + 'EIO': self._engineIO_protocol, + 'transport': 'polling', + 't': self._get_timestamp(), + 'sid': self._session_id, + }, + timeout=self._ping_timeout) + for engineIO_packet in _decode_engineIO_content(response.content): + yield engineIO_packet + + def _get_response(request, *args, **kw): try: response = request(*args, **kw)