Add stream=True #45

This commit is contained in:
Roy Hyunjin Han 2015-02-15 08:48:27 -05:00
commit 5f7937dc1e
15 changed files with 29 additions and 394 deletions

View file

@ -1,13 +1,15 @@
+ Add __version__
Check that setting stream=True makes sure that we receive all server events #45
Use HTTP headers for Websocket connection #44
Do not connect to blank namespace if overriding default namespace exists #40
Pass wait timeout to underlying transports #34
Check that we have the correct version of websocket #62
Fall back to next transport #48
Make heartbeat time independent from elapsed time #38
Check that heartbeats are sent even with short wait time #64
Revive heartbeat as separate process thanks to sarietta
Restore namespace as separate process #50
Check that we have the correct version of websocket #62
Fall back to next transport #48
Check that SSL websocket connections work properly #54
Write a unit test to make sure that the client can disconnect properly #42 #46
Make sure that on_reconnect works #61
@ -19,4 +21,3 @@ Run under Python 3 #51
Use urllib.parse.urlparse
Use continuous integration with TravisCI
Credit everyone who took the time to submit an issue or pull request
Add __version__

View file

@ -1,17 +0,0 @@
// DEBUG=* node app.js
var app = require('express')();
var server = require('http').Server(app);
var io = require('socket.io')(server);
server.listen(9000);
app.get('/', function (req, res) {
res.sendFile(__dirname + '/index.html');
});
io.on('connection', function (socket) {
socket.emit('news', { hello: 'world' });
socket.on('my other event', function (data) {
console.log(data);
});
});

View file

@ -1,217 +0,0 @@
import json
import requests
import time
# def decode_payload(payload)
def get_packets(content):
packets = []
index = 0
content_length = len(content)
while index < content_length:
index, packet_length = read_packet_length(content, index)
index, packet = read_packet(content, index, packet_length)
packet_type = int(packet[0])
packet_payload = packet[1:]
packets.append((packet_type, packet_payload))
return packets
def read_packet_length(content, index):
while ord(content[index]) != 0:
index += 1
index += 1
packet_length_string = ''
while ord(content[index]) != 255:
packet_length_string += str(ord(content[index]))
index += 1
return index, int(packet_length_string)
def read_packet(content, index, packet_length):
while ord(content[index]) == 255:
index += 1
packet = content[index:index + packet_length]
return index + packet_length, packet
request_counter = 0
def get_timestamp():
global request_counter
timestamp = '%s-%s' % (int(time.time() * 1000), request_counter)
request_counter += 1
return timestamp
base_url = 'http://localhost:9000'
url = base_url + '/socket.io/'
print '*** Connect'
session = requests.Session()
print session.cookies.items()
response = session.get(url, params={
'EIO': 3,
'transport': 'polling',
't': get_timestamp(),
})
print response.url
print session.cookies.items()
packets = get_packets(response.content)
for packet_type, packet in packets:
print packet_type, packet
packet_type, packet = packets[0]
packet_json = json.loads(packet)
print packet_json
print packet_json['pingInterval']
print packet_json['pingTimeout']
print packet_json['sid']
assert packet_type == 0
response = session.get(url, params={
'EIO': 3,
'transport': 'polling',
't': get_timestamp(),
'sid': packet_json['sid'],
})
print response.url
print session.cookies.items()
packets = get_packets(response.content)
for packet_type, packet in packets:
print 'engineIO_packet_type = %s' % packet_type
print 'socketIO_packet_type = %s' % packet[0]
print 'packet = %s' % packet[1:]
# from IPython import embed; embed()
# def wrap_payload
# def pack_packets
# def pack_packs
# def wrap_payload
def encode_payload(packs):
parts = []
for packet_type, packet in packs:
content = str(packet_type) + str(packet)
parts.append(make_header(content) + content)
return ''.join(parts)
def make_header(content):
length_string = str(len(content))
print length_string
header_digits = [0]
for index in xrange(len(length_string)):
header_digits.append(ord(length_string[index]) - 48)
header_digits.append(255)
print '---'
for x in header_digits:
print str(x)
print '---'
return ''.join(chr(x) for x in header_digits)
# print '***'
# response = session.get(url, params={
# 'EIO': 3,
# 'transport': 'polling',
# 't': get_timestamp(),
# 'sid': packet_json['sid'],
# })
# print response.url
# print response.content
# packets = get_packets(response.content)
# for packet_type, packet in packets:
# print packet_type, packet
print '*** Send event'
packets = [
(4, '2["my other event",{"my":"data"}]'),
]
payload = encode_payload(packets)
print payload
print session.cookies.items()
response = session.post(url, params={
'EIO': 3,
'transport': 'polling',
't': get_timestamp(),
'sid': packet_json['sid'],
}, data=payload, headers={
'content-type': 'application/octet-stream',
})
print response.url
print response.content
from time import sleep
sleep(10)
print '*** Send event'
packets = [
(4, '2["my other event",{"my":"data"}]'),
]
payload = encode_payload(packets)
print payload
print session.cookies.items()
response = session.post(url, params={
'EIO': 3,
'transport': 'polling',
't': get_timestamp(),
'sid': packet_json['sid'],
}, data=payload, headers={
'content-type': 'application/octet-stream',
})
print response.url
print response.content
print '*** Send ping'
packets = [
(2, ''),
]
payload = encode_payload(packets)
print payload
response = session.post(url, params={
'EIO': 3,
'transport': 'polling',
't': get_timestamp(),
'sid': packet_json['sid'],
}, data=payload, headers={
'content-type': 'application/octet-stream',
})
print response.url
print response.content
print '*** Send ping'
packets = [
(2, ''),
]
payload = encode_payload(packets)
print payload
response = session.post(url, params={
'EIO': 3,
'transport': 'polling',
't': get_timestamp(),
'sid': packet_json['sid'],
}, data=payload, headers={
'content-type': 'application/octet-stream',
})
print response.url
print response.content
response = session.get(url, params={
'EIO': 3,
'transport': 'polling',
't': get_timestamp(),
'sid': packet_json['sid'],
})
print response.url
print session.cookies.items()
packets = get_packets(response.content)
for packet_type, packet in packets:
print packet_type, packet

View file

@ -1,11 +0,0 @@
from socketIO_client import SocketIO
# def on_news(self, data):
# print(data)
# self.emit('my other event', {'my': 'data'})
s = SocketIO('localhost', 9000)
s.emit('whee')
# s.on('news', on_news)

View file

@ -1,8 +0,0 @@
<script src='/socket.io/socket.io.js'></script>
<script>
var socket = io.connect('http://localhost');
socket.on('news', function (data) {
console.log(data);
socket.emit('my other event', { my: 'data' });
});
</script>

Binary file not shown.

View file

@ -1,30 +0,0 @@
var proxy = require('http-proxy').createProxyServer({
target: {host: 'localhost', port: 9000}
});
var server = require('http').createServer(function(req, res) {
console.log('[REQUEST.%s] %s', req.method, req.url);
console.log(req['headers']);
if (req.method == 'POST') {
var body = '';
req.on('data', function (data) {
body += data;
});
req.on('end', function () {
print_body('[REQUEST.BODY] ', body);
});
}
var _write = res.write;
res.write = function(data) {
print_body('[RESPONSE.BODY] ', data);
_write.call(res, data);
}
proxy.web(req, res);
});
function print_body(header, body) {
var text = String(body);
console.log(header + text);
for (var i = 0; i < text.length; i++) {
console.log('body[%s] = %s = %s', i, text[i], text.charCodeAt(i));
}
}
server.listen(8000);

Binary file not shown.

View file

@ -1,104 +0,0 @@
import json
import requests
import time
class EngineIO(object):
_path = 'engine.io'
_engine_io_protocol = 3
_request_index = 0
def __init__(self, host, port):
url = 'http://%s:%s/%s/' % (host, port, self._path)
self.session = requests.Session()
response = self.session.get(url, params={
'EIO': self._engine_io_protocol,
'transport': 'polling',
't': self._get_timestamp(),
})
packs = _decode_content(response.content)
packet_type, packet = packs[0]
assert packet_type == 0
packet_json = json.loads(packet)
self._session_id = packet_json['sid']
print(packet_json)
response = self.session.get(url, params={
'EIO': self._engine_io_protocol,
'transport': 'polling',
't': self._get_timestamp(),
'sid': self._session_id,
})
packs = _decode_content(response.content)
for packet_type, packet in packs:
print 'engineIO_packet_type = %s' % packet_type
print 'socketIO_packet_type = %s' % packet[0]
print 'packet = %s' % packet[1:]
def _get_timestamp(self):
timestamp = '%s-%s' % (int(time.time() * 1000), self._request_index)
self._request_index += 1
return timestamp
def _message(self, packet):
packet_type = 4
response = self.session.post(self.url, params={
'EIO': self._engine_io_protocol,
'transport': 'polling',
't': self._get_timestamp(),
'sid': self._session_id,
}, data=_encode_content([(packet_type, packet)]), headers={
'content-type': 'application/octet-stream',
})
class SocketIO(EngineIO):
_path = 'socket.io'
_socket_io_protocol = 4
def __init__(self, host, port):
super(SocketIO, self).__init__(host, port)
def on(self, event, callback):
pass
def emit(self, event):
packet_type = 2
packet = json.dumps([event])
self._message(str(packet_type) + packet)
def _decode_content(content):
packs = []
index = 0
content_length = len(content)
while index < content_length:
index, packet_length = _read_packet_length(content, index)
index, packet = _read_packet(content, index, packet_length)
packet_type = int(packet[0])
packet_payload = packet[1:]
packs.append((packet_type, packet_payload))
return packs
def _read_packet_length(content, index):
while ord(content[index]) != 0:
index += 1
index += 1
packet_length_string = ''
while ord(content[index]) != 255:
packet_length_string += str(ord(content[index]))
index += 1
return index, int(packet_length_string)
def _read_packet(content, index, packet_length):
while ord(content[index]) == 255:
index += 1
packet = content[index:index + packet_length]
return index + packet_length, packet
def _encode_content(packs):
pass

View file

@ -51,6 +51,13 @@ var main = io.of('').on('connection', function(socket) {
socket.on('wait_with_disconnect', function() {
socket.emit('wait_with_disconnect_response');
});
/*
socket.on('rapid_fire', function() {
for (var i = 0; i < 100000; i++) {
socket.emit('rapid_fire', i);
}
});
*/
});
var chat = io.of('/chat').on('connection', function (socket) {

View file

@ -3,5 +3,3 @@ detailed-errors = TRUE
with-coverage = TRUE
cover-package = socketIO_client
cover-erase = TRUE
[easy_install]

View file

@ -1,5 +1,6 @@
import os
from setuptools import setup, find_packages
from socketIO_client import __version__
here = os.path.abspath(os.path.dirname(__file__))
@ -9,7 +10,7 @@ CHANGES = open(os.path.join(here, 'CHANGES.rst')).read()
setup(
name='socketIO-client',
version='0.5.3.2',
version=__version__,
description='A socket.io client library',
long_description=README + '\n\n' + CHANGES,
license='MIT',

View file

@ -12,6 +12,7 @@ from .exceptions import ConnectionError, TimeoutError, PacketError
from .transports import _get_response, _negotiate_transport, TRANSPORTS
__version__ = '0.5.4'
_SocketIOSession = namedtuple('_SocketIOSession', [
'id',
'heartbeat_timeout',

View file

@ -171,6 +171,15 @@ class BaseMixin(object):
'ack_callback_response': (PAYLOAD,),
})
"""
def test_rapid_fire(self):
'Capture all server events'
namespace = self.socketIO.define(Namespace)
self.socketIO.emit('rapid_fire')
self.socketIO.wait(30)
self.assertEqual(namespace.messages, range(100000))
"""
class Test_WebsocketTransport(TestCase, BaseMixin):
@ -202,6 +211,7 @@ class Namespace(BaseNamespace):
self.response = None
self.args_by_event = {}
self.called_on_disconnect = False
self.messages = []
def on_disconnect(self):
self.called_on_disconnect = True
@ -217,3 +227,6 @@ class Namespace(BaseNamespace):
def on_wait_with_disconnect_response(self):
self.disconnect()
def on_rapid_fire(self, x):
self.messages.append(x)

View file

@ -204,7 +204,8 @@ class _XHR_PollingTransport(_AbstractTransport):
self._http_session.get,
self._url,
params=self._params,
timeout=TIMEOUT_IN_SECONDS)
timeout=TIMEOUT_IN_SECONDS,
stream=True)
response_text = response.text
if not response_text.startswith(BOUNDARY):
yield response_text