Isolate transport

This commit is contained in:
Roy Hyunjin Han 2015-02-20 18:06:31 -05:00
commit 9afe5c2f2a
3 changed files with 183 additions and 103 deletions

View file

@ -5,9 +5,9 @@ import threading
import time
from collections import namedtuple
from .compat import get_byte, get_character, get_unicode
from . import transports
from .compat import get_byte, get_character, get_unicode, parse_url
from .exceptions import ConnectionError, TimeoutError, PacketError
from .transports import _get_response
__version__ = '0.6.1'
@ -183,7 +183,6 @@ class LoggingSocketIONamespace(SocketIONamespace, LoggingMixin):
class EngineIO(object):
_engineIO_protocol = 3
_engineIO_request_index = 0
def __init__(
@ -191,31 +190,22 @@ class EngineIO(object):
host, port=None, Namespace=None,
wait_for_connection=True, transports=TRANSPORTS,
resource='engine.io', **kw):
self._url = 'http://%s:%s/%s/' % (host, port, resource)
self._is_secure, self._url = _parse_host(host, port, resource)
self._wait_for_connection = wait_for_connection
self._client_transports = transports
self._kw = kw
self._http_session = requests.Session()
response = self._http_session.get(self._url, params={
'EIO': self._engineIO_protocol,
'transport': 'polling',
't': self._get_timestamp(),
})
engineIO_packets = _decode_engineIO_content(response.content)
engineIO_packet_type, engineIO_packet_data = engineIO_packets[0]
assert engineIO_packet_type == 0
value_by_name = json.loads(get_unicode(engineIO_packet_data))
# 'websocket' in value_by_name['upgrades']
self._ping_interval = value_by_name['pingInterval'] / float(1000)
self._ping_timeout = value_by_name['pingTimeout'] / float(1000)
self._session_id = value_by_name['sid']
if Namespace:
self.define(Namespace)
self._heartbeat_thread = HeartbeatThread(
send_heartbeat=self._ping,
relax_interval_in_seconds=self._ping_interval,
hurry_interval_in_seconds=1)
self._heartbeat_thread.start()
def __enter__(self):
return self
def __exit__(self, *exception_pack):
self.close()
def __del__(self):
self.close()
@property
def connected(self):
@ -266,7 +256,6 @@ class EngineIO(object):
namespace.on_disconnect()
except PacketError:
pass
self._heartbeat_thread.relax()
def _should_stop_waiting(self):
@ -274,7 +263,7 @@ class EngineIO(object):
return self.__transport._wants_to_disconnect
def _process_packets(self):
for engineIO_packet in self._recv_packet():
for engineIO_packet in self._transport.recv_packet():
try:
self._process_packet(engineIO_packet)
except PacketError as e:
@ -332,89 +321,100 @@ class EngineIO(object):
def _message(self, engineIO_packet_data):
engineIO_packet_type = 4
response = self._http_session.post(self._url, params={
'EIO': self._engineIO_protocol,
'transport': 'polling',
't': self._get_timestamp(),
'sid': self._session_id,
}, data=_encode_engineIO_content([
(engineIO_packet_type, engineIO_packet_data),
]), headers={
'content-type': 'application/octet-stream',
})
print('message()')
print(response.content)
engineIO_packets = _decode_engineIO_content(response.content)
for engineIO_packet_type, engineIO_packet_data in engineIO_packets:
socketIO_packet_type = int(get_character(engineIO_packet_data, 0))
socketIO_packet_data = engineIO_packet_data[1:]
print('engineIO_packet_type = %s' % engineIO_packet_type)
print('socketIO_packet_type = %s' % socketIO_packet_type)
print('socketIO_packet_data = %s' % socketIO_packet_data)
self._transport.send_packet(engineIO_packet_type, engineIO_packet_data)
def _ping(self, engineIO_packet_data=''):
engineIO_packet_type = 2
response = self._http_session.post(self._url, params={
'EIO': self._engineIO_protocol,
'transport': 'polling',
't': self._get_timestamp(),
'sid': self._session_id,
}, data=_encode_engineIO_content([
(engineIO_packet_type, engineIO_packet_data),
]), headers={
'content-type': 'application/octet-stream',
})
print('ping()')
print(response.content)
engineIO_packets = _decode_engineIO_content(response.content)
for engineIO_packet_type, engineIO_packet_data in engineIO_packets:
socketIO_packet_type = int(get_character(engineIO_packet_data, 0))
socketIO_packet_data = engineIO_packet_data[1:]
print('engineIO_packet_type = %s' % engineIO_packet_type)
print('socketIO_packet_type = %s' % socketIO_packet_type)
print('socketIO_packet_data = %s' % socketIO_packet_data)
self._transport.send_packet(engineIO_packet_type, engineIO_packet_data)
def _pong(self, engineIO_packet_data=''):
engineIO_packet_type = 3
response = self._http_session.post(self._url, params={
'EIO': self._engineIO_protocol,
'transport': 'polling',
't': self._get_timestamp(),
'sid': self._session_id,
}, data=_encode_engineIO_content([
(engineIO_packet_type, engineIO_packet_data),
]), headers={
'content-type': 'application/octet-stream',
})
print('pong()')
print(response.content)
engineIO_packets = _decode_engineIO_content(response.content)
for engineIO_packet_type, engineIO_packet_data in engineIO_packets:
socketIO_packet_type = int(get_character(engineIO_packet_data, 0))
socketIO_packet_data = engineIO_packet_data[1:]
print('engineIO_packet_type = %s' % engineIO_packet_type)
print('socketIO_packet_type = %s' % socketIO_packet_type)
print('socketIO_packet_data = %s' % socketIO_packet_data)
self._transport.send_packet(engineIO_packet_type, engineIO_packet_data)
def _recv_packet(self):
response = _get_response(
self._http_session.get,
self._url,
params={
'EIO': self._engineIO_protocol,
'transport': 'polling',
't': self._get_timestamp(),
'sid': self._session_id,
},
timeout=self._ping_timeout)
for engineIO_packet in _decode_engineIO_content(response.content):
yield engineIO_packet
@property
def _transport(self):
try:
if self.connected:
return self.__transport
except AttributeError:
pass
self._get_engineIO_session()
self._negotiate_transport()
self._reset_heartbeat()
self._connect_namespaces()
return self.__transport
def _get_engineIO_session(self):
url = '%s://%s/' % ('https' if self.is_secure else 'http', self._url)
warning_screen = _yield_warning_screen()
for elapsed_time in warning_screen:
try:
engineIO_packet_type, engineIO_packet_data = next(
XHR_PollingTransport().recv_packet())
response = _get_response(self._http_session.get, url, params={
'EIO': self._engineIO_protocol,
'transport': 'polling',
't': self._get_timestamp(),
}, **self._kw)
except (TimeoutError, ConnectionError) as e:
if not self._wait_for_connection:
raise
warning = Exception('[waiting for connection] %s' % e)
warning_screen.throw(warning)
engineIO_packets = _decode_engineIO_content(response.content)
engineIO_packet_type, engineIO_packet_data = engineIO_packets[0]
assert engineIO_packet_type == 0
value_by_name = json.loads(get_unicode(engineIO_packet_data))
self._session_id = value_by_name['sid']
self._ping_interval = value_by_name['pingInterval'] / float(1000)
self._ping_timeout = value_by_name['pingTimeout'] / float(1000)
self._transport_upgrades = value_by_name['upgrades']
def _negotiate_transport(self):
self.__transport = self._get_transport('xhr-polling')
def _reset_heartbeat(self):
try:
self._heartbeat_thread.stop()
except AttributeError:
pass
self._heartbeat_thread = HeartbeatThread(
send_heartbeat=self.__transport._ping,
relax_interval_in_seconds=self._ping_interval,
hurry_interval_in_seconds=1)
self._heartbeat_thread.start()
def _connect_namespaces(self):
pass
def _get_transport(self, transport_name):
self._log(logging.DEBUG, '[transport chosen] %s', transport_name)
return {
'xhr-polling': transports.XHR_PollingTransport,
}[transport_name]()
def _log(self, level, msg, *attrs):
_log.log(level, '%s: %s' % (self._url, msg), *attrs)
class SocketIO(EngineIO):
"""Create a socket.io client that connects to a socket.io server
at the specified host and port.
- Define the behavior of the client by specifying a custom Namespace.
- Prefix host with https:// to use SSL.
- Set wait_for_connection=True to block until we have a connection.
- Specify desired transports=['websocket', 'xhr-polling'].
- Pass query params, headers, cookies, proxies as keyword arguments.
SocketIO(
'localhost', 8000,
params={'q': 'qqq'},
headers={'Authorization': 'Basic ' + b64encode('username:password')},
cookies={'a': 'aaa'},
proxies={'https': 'https://proxy.example.com:8080'})
"""
def __init__(
self,
@ -429,6 +429,14 @@ class SocketIO(EngineIO):
wait_for_connection, transports,
resource, **kw)
def __exit__(self, *exception_pack):
self.disconnect()
super(SocketIO, self).__exit__(*exception_pack)
def __del__(self):
self.disconnect()
super(SocketIO, self).__del__()
def on(self, event, callback, path=''):
try:
namespace = self.get_namespace(path)
@ -555,6 +563,12 @@ class SocketIO(EngineIO):
'Return function that acknowledges the server'
return lambda *args: self._ack(path, ack_id, *args)
def _connect_namespaces(self):
for path, namespace in self._namespace_by_path.items():
namespace._transport = self.__transport
if path:
self.__transport.connect(path)
class HeartbeatThread(threading.Thread):
@ -573,13 +587,19 @@ class HeartbeatThread(threading.Thread):
self._stop = threading.Event()
def run(self):
while not self._stop.is_set():
self._send_heartbeat()
if self._adrenaline.is_set():
interval_in_seconds = self._hurry_interval_in_seconds
else:
interval_in_seconds = self._relax_interval_in_seconds
self._rest.wait(interval_in_seconds)
try:
while not self._stop.is_set():
try:
self._send_heartbeat()
except TimeoutError:
pass
if self._adrenaline.is_set():
interval_in_seconds = self._hurry_interval_in_seconds
else:
interval_in_seconds = self._relax_interval_in_seconds
self._rest.wait(interval_in_seconds)
except ConnectionError:
pass
def relax(self):
self._adrenaline.clear()
@ -590,6 +610,7 @@ class HeartbeatThread(threading.Thread):
self._rest.clear()
def stop(self):
self._rest.set()
self._stop.set()
@ -603,6 +624,16 @@ def find_callback(args, kw=None):
return None, args
def _parse_host(host, port, resource):
if not host.startswith('http'):
host = 'http://' + host
url_pack = parse_url(host)
is_secure = url_pack.scheme == 'https'
port = port or url_pack.port or (443 if is_secure else 80)
url = '%s:%d%s/%s' % (url_pack.hostname, port, url_pack.path, resource)
return is_secure, url
def _decode_engineIO_content(content):
packets = []
content_index = 0

View file

@ -1,4 +1,8 @@
import six
try:
from urllib.parse import urlparse as parse_url
except ImportError:
from urlparse import urlparse as parse_url
def get_byte(x, index):

View file

@ -2,6 +2,51 @@ import requests
from .exceptions import ConnectionError, TimeoutError
ENGINEIO_PROTOCOL = 3
class AbstractTransport(object):
pass
class XHR_PollingTransport(AbstractTransport):
# pass http session
# pass session id (can be none)
# pass timeout
def send_packet(self, engineIO_packet_type, engineIO_packet_data):
_get_response()
assert ok
response = self._http_session.post(self._url, params={
'EIO': self._engineIO_protocol,
'transport': 'polling',
't': self._get_timestamp(),
'sid': self._session_id,
}, data=_encode_engineIO_content([
(engineIO_packet_type, engineIO_packet_data),
]), headers={
'content-type': 'application/octet-stream',
})
assert response.content == 'ok'
def _recv_packet(self):
response = _get_response(
self._http_session.get,
self._url,
params={
'EIO': self._engineIO_protocol,
'transport': 'polling',
't': self._get_timestamp(),
'sid': self._session_id,
},
timeout=self._ping_timeout)
for engineIO_packet in _decode_engineIO_content(response.content):
yield engineIO_packet
def _get_response(request, *args, **kw):
try:
response = request(*args, **kw)