Add websocket transport

This commit is contained in:
Roy Hyunjin Han 2015-04-15 07:17:23 -04:00
commit 978a669d16
13 changed files with 250 additions and 78 deletions

View file

@ -8,13 +8,12 @@ before_install:
- sudo apt-get install nodejs
install:
- npm install -G socket.io
- npm install -G http-proxy
- npm install -G yargs
- pip install -U requests
- pip install -U six
- pip install -U websocket-client
- pip install -U coverage
before_script:
- DEBUG=* node socketIO_client/tests/serve.js &
- DEBUG=* node socketIO_client/tests/proxy.js &
- sleep 3
script: nosetests

View file

@ -1,9 +1,6 @@
.. image:: https://travis-ci.org/invisibleroads/socketIO-client.svg?branch=master
:target: https://travis-ci.org/invisibleroads/socketIO-client
.. image:: https://www.bountysource.com/badge/tracker?tracker_id=388415
:target: https://www.bountysource.com/trackers/388415-invisibleroads-socketio-client?utm_source=388415&utm_medium=shield&utm_campaign=TRACKER_BADGE
socketIO-client
===============
@ -139,7 +136,8 @@ Specify params, headers, cookies, proxies thanks to the `requests <http://python
from socketIO_client import SocketIO
from base64 import b64encode
SocketIO('localhost', 8000,
SocketIO(
localhost', 8000,
params={'q': 'qqq'},
headers={'Authorization': 'Basic ' + b64encode('username:password')},
cookies={'a': 'aaa'},

View file

@ -1,9 +1,5 @@
Add Websocket transport
Update proxy to include websocket depending on argument
Use prepared request to get headers from http_session
Include https://github.com/invisibleroads/socketIO-client/issues/68
Add test for on_reconnect using sarietta's bash scripts
Consider logging packets sent and received
Fix https tests for xhr-polling
Fix https tests for websocket
Implement rooms #65
Implement binary event
Implement binary ack

View file

@ -9,7 +9,8 @@ from .parsers import (
format_socketIO_packet_data, parse_socketIO_packet_data,
get_namespace_path)
from .symmetries import get_character
from .transports import XHR_PollingTransport, prepare_http_session, TRANSPORTS
from .transports import (
WebsocketTransport, XHR_PollingTransport, prepare_http_session, TRANSPORTS)
__all__ = 'SocketIO', 'SocketIONamespace'
@ -54,7 +55,7 @@ class EngineIO(LoggingMixin):
if self._opened:
return self._transport_instance
self._engineIO_session = self._get_engineIO_session()
self._transport_instance = self._negotiate_transport()
self._negotiate_transport()
self._connect_namespaces()
self._opened = True
self._reset_heartbeat()
@ -79,8 +80,22 @@ class EngineIO(LoggingMixin):
return parse_engineIO_session(engineIO_packet_data)
def _negotiate_transport(self):
self._transport_name = 'xhr-polling'
return self._get_transport(self._transport_name)
self._transport_instance = self._get_transport('xhr-polling')
self.transport_name = 'xhr-polling'
is_ws_client = 'websocket' in self._client_transports
is_ws_server = 'websocket' in self._engineIO_session.transport_upgrades
if is_ws_client and is_ws_server:
try:
transport = self._get_transport('websocket')
transport.send_packet(2, 'probe')
for packet_type, packet_data in transport.recv_packet():
if packet_type == 3 and packet_data == b'probe':
transport.send_packet(5, '')
self._transport_instance = transport
self.transport_name = 'websocket'
except Exception:
pass
self._debug('[transport selected] %s', self.transport_name)
def _reset_heartbeat(self):
try:
@ -88,10 +103,7 @@ class EngineIO(LoggingMixin):
except AttributeError:
pass
ping_interval = self._engineIO_session.ping_interval
if self._transport_name.endswith('-polling'):
hurry_interval_in_seconds = self._hurry_interval_in_seconds
else:
hurry_interval_in_seconds = ping_interval
hurry_interval_in_seconds = self._hurry_interval_in_seconds
self._heartbeat_thread = HeartbeatThread(
send_heartbeat=self._ping,
relax_interval_in_seconds=ping_interval,
@ -102,9 +114,9 @@ class EngineIO(LoggingMixin):
pass
def _get_transport(self, transport_name):
self._debug('[transport selected] %s', transport_name)
SelectedTransport = {
'xhr-polling': XHR_PollingTransport,
'websocket': WebsocketTransport,
}[transport_name]
return SelectedTransport(
self._http_session, self._is_secure, self._url,
@ -146,7 +158,7 @@ class EngineIO(LoggingMixin):
@retry
def _open(self):
engineIO_packet_type = 0
self._transport.send_packet(engineIO_packet_type, '')
self._transport_instance.send_packet(engineIO_packet_type)
def _close(self):
self._wants_to_close = True
@ -154,34 +166,37 @@ class EngineIO(LoggingMixin):
if not self._opened:
return
engineIO_packet_type = 1
self._transport.send_packet(engineIO_packet_type, '')
self._transport_instance.send_packet(engineIO_packet_type)
self._opened = False
@retry
def _ping(self, engineIO_packet_data=''):
engineIO_packet_type = 2
self._transport.send_packet(engineIO_packet_type, engineIO_packet_data)
self._transport_instance.send_packet(
engineIO_packet_type, engineIO_packet_data)
@retry
def _pong(self, engineIO_packet_data=''):
engineIO_packet_type = 3
self._transport.send_packet(engineIO_packet_type, engineIO_packet_data)
self._transport_instance.send_packet(
engineIO_packet_type, engineIO_packet_data)
@retry
def _message(self, engineIO_packet_data):
engineIO_packet_type = 4
self._transport.send_packet(engineIO_packet_type, engineIO_packet_data)
self._transport_instance.send_packet(
engineIO_packet_type, engineIO_packet_data)
self._debug('[socket.io packet sent] %s', engineIO_packet_data)
@retry
def _upgrade(self):
engineIO_packet_type = 5
self._transport.send_packet(engineIO_packet_type, '')
self._transport_instance.send_packet(engineIO_packet_type)
@retry
def _noop(self):
engineIO_packet_type = 6
self._transport.send_packet(engineIO_packet_type, '')
self._transport_instance.send_packet(engineIO_packet_type)
# React

View file

@ -22,15 +22,15 @@ class HeartbeatThread(Thread):
def run(self):
try:
while not self._halt.is_set():
try:
self._send_heartbeat()
except TimeoutError:
pass
if self._adrenaline.is_set():
interval_in_seconds = self._hurry_interval_in_seconds
else:
interval_in_seconds = self._relax_interval_in_seconds
self._rest.wait(interval_in_seconds)
try:
self._send_heartbeat()
except TimeoutError:
pass
except ConnectionError:
pass

View file

@ -32,8 +32,8 @@ def parse_engineIO_session(engineIO_packet_data):
def encode_engineIO_content(engineIO_packets):
content = bytearray()
for packet_type, packet_data in engineIO_packets:
packet_string = encode_string(str(packet_type) + packet_data)
content.extend(_make_packet_header(packet_string) + packet_string)
packet_text = format_packet_text(packet_type, packet_data)
content.extend(_make_packet_prefix(packet_text) + packet_text)
return content
@ -46,10 +46,10 @@ def decode_engineIO_content(content):
content, content_index)
except IndexError:
break
content_index, packet_string = _read_packet_string(
content_index, packet_text = _read_packet_text(
content, content_index, packet_length)
engineIO_packet_type = int(get_character(packet_string, 0))
engineIO_packet_data = packet_string[1:]
engineIO_packet_type, engineIO_packet_data = parse_packet_text(
packet_text)
yield engineIO_packet_type, engineIO_packet_data
@ -85,6 +85,16 @@ def parse_socketIO_packet_data(socketIO_packet_data):
return SocketIOData(path=path, ack_id=ack_id, args=args)
def format_packet_text(packet_type, packet_data):
return encode_string(str(packet_type) + packet_data)
def parse_packet_text(packet_text):
packet_type = int(get_character(packet_text, 0))
packet_data = packet_text[1:]
return packet_type, packet_data
def get_namespace_path(socketIO_packet_data):
if not socketIO_packet_data.startswith(b'/'):
return ''
@ -98,8 +108,8 @@ def get_namespace_path(socketIO_packet_data):
return ''.join(parts)
def _make_packet_header(packet_string):
length_string = str(len(packet_string))
def _make_packet_prefix(packet):
length_string = str(len(packet))
header_digits = bytearray([0])
for i in range(len(length_string)):
header_digits.append(ord(length_string[i]) - 48)
@ -120,8 +130,8 @@ def _read_packet_length(content, content_index):
return content_index, int(packet_length_string)
def _read_packet_string(content, content_index, packet_length):
def _read_packet_text(content, content_index, packet_length):
while get_byte(content, content_index) == 255:
content_index += 1
packet_string = content[content_index:content_index + packet_length]
return content_index + packet_length, packet_string
packet_text = content[content_index:content_index + packet_length]
return content_index + packet_length, packet_text

View file

@ -1,18 +1,22 @@
import six
try:
from urllib.parse import urlparse as parse_url
from urllib import urlencode as format_query
except ImportError:
from urllib.parse import urlencode as format_query
try:
from urlparse import urlparse as parse_url
except ImportError:
from urllib.parse import urlparse as parse_url
def get_character(x, index):
return chr(get_byte(x, index))
def get_byte(x, index):
return six.indexbytes(x, index)
def get_character(x, index):
return chr(six.indexbytes(x, index))
def encode_string(x):
return x.encode('utf-8')

View file

@ -6,7 +6,7 @@ from .. import SocketIO, LoggingNamespace, find_callback
HOST = 'localhost'
PORT = 8000
PORT = 9000
DATA = 'xxx'
PAYLOAD = {'xxx': 'yyy'}
logging.basicConfig(level=logging.DEBUG)
@ -159,10 +159,21 @@ class Test_XHR_PollingTransport(BaseMixin, TestCase):
def setUp(self):
super(Test_XHR_PollingTransport, self).setUp()
self.socketIO = SocketIO(HOST, PORT, LoggingNamespace, transports=[
'xhr-polling'])
'xhr-polling'], verify=False)
self.assertEqual(self.socketIO.transport_name, 'xhr-polling')
self.wait_time_in_seconds = 1
class Test_WebsocketTransport(BaseMixin, TestCase):
def setUp(self):
super(Test_WebsocketTransport, self).setUp()
self.socketIO = SocketIO(HOST, PORT, LoggingNamespace, transports=[
'xhr-polling', 'websocket'], verify=False)
self.assertEqual(self.socketIO.transport_name, 'websocket')
self.wait_time_in_seconds = 0.1
class Namespace(LoggingNamespace):
def initialize(self):

View file

@ -1,6 +1,6 @@
<script src="/socket.io/socket.io.js"></script>
<script>
var socket = io('http://localhost');
var socket = io('//localhost');
var chat = io('/chat');
var news = io('/news');

View file

@ -1,8 +1,18 @@
// DEBUG=* node serve.js
var app = require('http').createServer(serve).listen(9000);
var argv = require('yargs').argv;
if (argv.secure) {
var fs = require('fs');
var app = require('https').createServer({
key: fs.readFileSync('ssl.key'),
cert: fs.readFileSync('ssl.crt')
}, serve);
} else {
var app = require('http').createServer(serve);
}
app.listen(9000);
var io = require('socket.io')(app);
var fs = require('fs');
var PAYLOAD = {'xxx': 'yyy'};
io.on('connection', function(socket) {

View file

@ -0,0 +1,21 @@
-----BEGIN CERTIFICATE-----
MIIDbTCCAlWgAwIBAgIJALVDKggptosBMA0GCSqGSIb3DQEBBQUAME0xCzAJBgNV
BAYTAlVTMREwDwYDVQQIDAhOZXcgWW9yazERMA8GA1UEBwwITmV3IFlvcmsxGDAW
BgNVBAoMD0V4YW1wbGUgQ29tcGFueTAeFw0xNTA0MTQyMzE5MDlaFw0xNjA0MTMy
MzE5MDlaME0xCzAJBgNVBAYTAlVTMREwDwYDVQQIDAhOZXcgWW9yazERMA8GA1UE
BwwITmV3IFlvcmsxGDAWBgNVBAoMD0V4YW1wbGUgQ29tcGFueTCCASIwDQYJKoZI
hvcNAQEBBQADggEPADCCAQoCggEBALUOviKWvseO7gbqOXOBp+v6lUmReL2xFTiK
vrbdi9BMUMbl1CE+yiP7JTmSHCFty72xv/iwRgPG3rfUgEOZfuedhYXwl8hsnBzI
ZK3quQiBMxPzpGOO6w5I2moMI+u45gOFpmf8CySjlcnTsKaaMAAr+8IiP+NpiW3U
zKYLPtTkGdwBnilFEdLPlL3N5+yZ1zLA9WVMoEVLkg2Jbo5NEDK3JaGh34zxBOxh
WbhvarAgb8PYBcXJaf9Ctp6M077cr8qIVq/dtBjUQcBNspeDJksuhpMzIRQy0FFX
Qsso132LUsGBZqF4hVuLiSl5m0kjxxY3SOrNcvcEyLhxHJG+jLMCAwEAAaNQME4w
HQYDVR0OBBYEFAEu7HcWyPZYVWHp5NhUlmHTirV8MB8GA1UdIwQYMBaAFAEu7HcW
yPZYVWHp5NhUlmHTirV8MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADggEB
AJIUPUwx4sYn1aDTukMzcaGu1uElSdeayCe5mOmKd7XBald0PuUFqboWsTKmqELH
usFDAJNmWS/zCuEuW0huqb1c3orw9IfVFx+oPXAEGFvRmBhKd9UmFpqLmhhRcP25
RT2By00vN1A1f/XW0H2Rj5pgIBdbKGwzabQVy8RTMmmtiNlQqHwElUzggq+EbS2m
XwNJ3bMumwrjciZTbMo0MMTStqF5oqacCfvt2vTT1c2IdpiCafPMYqrldXOJddx5
uJ/Tu75ZHf+pjfg7SRGq5WmoKP36VCKbQJSP6kXcbTMP4KvlWlbmQgDsWvmnA0Nm
283Ms7he6efOphFh2XHxysU=
-----END CERTIFICATE-----

View file

@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQC1Dr4ilr7Hju4G
6jlzgafr+pVJkXi9sRU4ir623YvQTFDG5dQhPsoj+yU5khwhbcu9sb/4sEYDxt63
1IBDmX7nnYWF8JfIbJwcyGSt6rkIgTMT86RjjusOSNpqDCPruOYDhaZn/Asko5XJ
07CmmjAAK/vCIj/jaYlt1MymCz7U5BncAZ4pRRHSz5S9zefsmdcywPVlTKBFS5IN
iW6OTRAytyWhod+M8QTsYVm4b2qwIG/D2AXFyWn/QraejNO+3K/KiFav3bQY1EHA
TbKXgyZLLoaTMyEUMtBRV0LLKNd9i1LBgWaheIVbi4kpeZtJI8cWN0jqzXL3BMi4
cRyRvoyzAgMBAAECggEAWZMEoAoiMopU2ljwuWNw1z9usinlKTu9uu5xcfjjXdcT
s4Mq0pPE3841WWkev68ZOC0DQ5651M6Di0D3f/olnaMB1wTzorWQ+nBt1tkkV7/L
rFhCgPrI8ZU7cXG2J72fFij97b31KVhBF8vCPnpTuxIHiMD3CKRC2HKKGK/BkwK9
cxlPjUlh2LawIULQM0jl/oLGC6jwTupNrbZogVLBf6Koi3S5o0e9F9KaBl127Mh9
COZIFDnD0exGHCvH49YI/cuwHYeyDlU6Xi7Wv2f3cdmC/enUG5n8Ts/OpDTPiSgf
cZOrVQ8nCm/4jc/qhTubBZwPGqkMpFbE+DRSGDsgKQKBgQDbndwec9lrnyHUgJ/r
hBoMFwFMc6rZvAug4CF1ry7nZqLOI+Xg6rHjseu1Vc8aQMJCuK1YAr6ImYbN1Jwq
5cta4cPlPKis4xBbwhcELfVJB550LIwv3hgN/94Sh8aojPueIJcoUsOrG87wJ2KS
RCPK+Pz7W5b+cBc/Ii9ROGPAdwKBgQDTDY8E0P4Yp/okc2IyNU+gurSP61DITEpq
N51QNQ1BmCID/G7yyErsRJeQHhCKVvGEijh/DB4nQKO9lJcVSh4v511NojwpQ35V
FtSePXmhBY8J7vq/DAkvISi8g1/A6A10XEsPcO4ga7EL5CtzYWC4Vh14a0tWNGxW
vUJWNoiApQKBgQCMxSsK+gcrTN1KcQgQ1qQ7i6NxddLVrgtmG0RXQus1uDwzFh+L
g+RypuEHYvFVSp06V1YFS/0FYiNeXCGd/Z3Tq1L95VvZNdKOfmJyc2L+ZLvUi5lw
NVQF5TRbfFyCPZwrR4iceDCjxTdoCFbOmo6209KU66hlf0PnW+oojZSjEQKBgDTY
nkkcc6OE3BOoeJwN0URzu6aVy4J759400sU0o38bMtlAqh9Mm8YRXsoNXSLmpk8D
tSXKyPoXK2ja/gGsr2ZbTneT+fBzH/z6XH7K8dup4qkgF9UilGIisWqSkrVg5Y2P
VpQlONsRXCGYHnEjnu5JUdPHOfP56G7HsQaZXRCpAoGBAI2E2ui3nyzUWmnpT4gI
vTZE4DqtLSWRL85xSJJdw7phI/dgAtVe5I7vnv8kapD+m4xMoa0J1O5jBwNdC4/C
6rCmFC+wh3qu1f3DRwaUsM/hdkQyA7DQcYzayhiicPQCi0xuJuA8FuYHBkQCmOfN
77zJjF6uScFwr+2Ozpb0H30V
-----END PRIVATE KEY-----

View file

@ -1,12 +1,30 @@
import requests
import six
import socket
import ssl
import sys
import time
import websocket
from .exceptions import ConnectionError, TimeoutError
from .parsers import decode_engineIO_content, encode_engineIO_content
from .parsers import (
encode_engineIO_content, decode_engineIO_content,
format_packet_text, parse_packet_text)
from .symmetries import format_query, parse_url
if not hasattr(websocket, 'create_connection'):
sys.exit("""\
An incompatible websocket library is conflicting with the one we need.
You can remove the incompatible library and install the correct one
by running the following commands:
yes | pip uninstall websocket websocket-client
pip install -U websocket-client""")
ENGINEIO_PROTOCOL = 3
TRANSPORTS = 'websocket', 'xhr-polling'
TRANSPORTS = 'xhr-polling', 'websocket'
class AbstractTransport(object):
@ -20,7 +38,7 @@ class AbstractTransport(object):
def recv_packet(self):
pass
def send_packet(self, engineIO_packet_type, engineIO_packet_data):
def send_packet(self, engineIO_packet_type, engineIO_packet_data=''):
pass
@ -29,53 +47,115 @@ class XHR_PollingTransport(AbstractTransport):
def __init__(self, http_session, is_secure, url, engineIO_session=None):
super(XHR_PollingTransport, self).__init__(
http_session, is_secure, url, engineIO_session)
self.http_url = '%s://%s/' % ('https' if is_secure else 'http', url)
self.params = {
'EIO': ENGINEIO_PROTOCOL,
'transport': 'polling',
}
self._params = {
'EIO': ENGINEIO_PROTOCOL, 'transport': 'polling'}
if engineIO_session:
self.request_index = 1
self.kw_get = dict(timeout=engineIO_session.ping_timeout)
self.kw_post = dict(headers={
self._request_index = 1
self._kw_get = dict(timeout=engineIO_session.ping_timeout)
self._kw_post = dict(headers={
'content-type': 'application/octet-stream',
})
self.params['sid'] = engineIO_session.id
self._params['sid'] = engineIO_session.id
else:
self.request_index = 0
self.kw_get = {}
self.kw_post = {}
self._request_index = 0
self._kw_get = {}
self._kw_post = {}
http_scheme = 'https' if is_secure else 'http'
self._http_url = '%s://%s/' % (http_scheme, url)
def recv_packet(self):
params = dict(self.params)
params = dict(self._params)
params['t'] = self._get_timestamp()
response = get_response(
self.http_session.get,
self.http_url,
self._http_url,
params=params,
**self.kw_get)
**self._kw_get)
for engineIO_packet in decode_engineIO_content(response.content):
yield engineIO_packet
engineIO_packet_type, engineIO_packet_data = engineIO_packet
yield engineIO_packet_type, engineIO_packet_data
def send_packet(self, engineIO_packet_type, engineIO_packet_data):
params = dict(self.params)
def send_packet(self, engineIO_packet_type, engineIO_packet_data=''):
params = dict(self._params)
params['t'] = self._get_timestamp()
response = get_response(
self.http_session.post,
self.http_url,
self._http_url,
params=params,
data=encode_engineIO_content([
(engineIO_packet_type, engineIO_packet_data),
]),
**self.kw_post)
**self._kw_post)
assert response.content == b'ok'
def _get_timestamp(self):
timestamp = '%s-%s' % (int(time.time() * 1000), self.request_index)
self.request_index += 1
timestamp = '%s-%s' % (int(time.time() * 1000), self._request_index)
self._request_index += 1
return timestamp
class WebsocketTransport(AbstractTransport):
def __init__(self, http_session, is_secure, url, engineIO_session=None):
super(WebsocketTransport, self).__init__(
http_session, is_secure, url, engineIO_session)
params = dict(http_session.params, **{
'EIO': ENGINEIO_PROTOCOL, 'transport': 'websocket'})
request = http_session.prepare_request(requests.Request('GET', url))
kw = {'header': ['%s: %s' % x for x in request.headers.items()]}
if engineIO_session:
params['sid'] = engineIO_session.id
kw['timeout'] = engineIO_session.ping_timeout
ws_url = '%s://%s/?%s' % (
'wss' if is_secure else 'ws', url, format_query(params))
http_scheme = 'https' if is_secure else 'http'
if http_scheme in http_session.proxies: # Use the correct proxy
proxy_url_pack = parse_url(http_session.proxies[http_scheme])
kw['http_proxy_host'] = proxy_url_pack.hostname
kw['http_proxy_port'] = proxy_url_pack.port
if proxy_url_pack.username:
kw['http_proxy_auth'] = (
proxy_url_pack.username, proxy_url_pack.password)
if http_session.verify:
if http_session.cert: # Specify certificate path on disk
if isinstance(http_session.cert, basestring):
kw['ca_certs'] = http_session.cert
else:
kw['ca_certs'] = http_session.cert[0]
else: # Do not verify the SSL certificate
kw['sslopt'] = {'cert_reqs': ssl.CERT_NONE}
try:
self._connection = websocket.create_connection(ws_url, **kw)
except Exception as e:
raise ConnectionError(e)
def recv_packet(self):
try:
packet_text = self._connection.recv()
except websocket.WebSocketTimeoutException as e:
raise TimeoutError('recv timed out (%s)' % e)
except websocket.SSLError as e:
raise ConnectionError('recv disconnected (%s)' % e)
except websocket.WebSocketConnectionClosedException as e:
raise ConnectionError('recv disconnected (%s)' % e)
except socket.error as e:
raise ConnectionError('recv disconnected (%s)' % e)
engineIO_packet_type, engineIO_packet_data = parse_packet_text(
six.b(packet_text))
yield engineIO_packet_type, engineIO_packet_data
def send_packet(self, engineIO_packet_type, engineIO_packet_data=''):
packet = format_packet_text(engineIO_packet_type, engineIO_packet_data)
try:
self._connection.send(packet)
except websocket.WebSocketTimeoutException as e:
raise TimeoutError('send timed out (%s)' % e)
except socket.error as e:
raise ConnectionError('send disconnected (%s)' % e)
except websocket.WebSocketConnectionClosedException as e:
raise ConnectionError('send disconnected (%s)' % e)
def get_response(request, *args, **kw):
try:
response = request(*args, **kw)
@ -98,7 +178,7 @@ def prepare_http_session(kw):
http_session.proxies.update(kw.get('proxies', {}))
http_session.hooks.update(kw.get('hooks', {}))
http_session.params.update(kw.get('params', {}))
http_session.verify = kw.get('verify')
http_session.verify = kw.get('verify', True)
http_session.cert = kw.get('cert')
http_session.cookies.update(kw.get('cookies', {}))
return http_session