Use timeout to unblock recv in websocket transport
This commit is contained in:
parent
978a669d16
commit
f5b157014d
5 changed files with 30 additions and 11 deletions
|
|
@ -103,12 +103,18 @@ class EngineIO(LoggingMixin):
|
|||
except AttributeError:
|
||||
pass
|
||||
ping_interval = self._engineIO_session.ping_interval
|
||||
hurry_interval_in_seconds = self._hurry_interval_in_seconds
|
||||
if self.transport_name.endswith('-polling'):
|
||||
# Use ping/pong to unblock recv for polling transport
|
||||
hurry_interval_in_seconds = self._hurry_interval_in_seconds
|
||||
else:
|
||||
# Use timeout to unblock recv for websocket transport
|
||||
hurry_interval_in_seconds = ping_interval
|
||||
self._heartbeat_thread = HeartbeatThread(
|
||||
send_heartbeat=self._ping,
|
||||
relax_interval_in_seconds=ping_interval,
|
||||
hurry_interval_in_seconds=hurry_interval_in_seconds)
|
||||
self._heartbeat_thread.start()
|
||||
self._debug('[heartbeat reset]')
|
||||
|
||||
def _connect_namespaces(self):
|
||||
pass
|
||||
|
|
@ -202,7 +208,11 @@ class EngineIO(LoggingMixin):
|
|||
|
||||
def wait(self, seconds=None, **kw):
|
||||
'Wait in a loop and react to events as defined in the namespaces'
|
||||
# Use ping/pong to unblock recv for polling transport
|
||||
self._heartbeat_thread.hurry()
|
||||
# Use timeout to unblock recv for websocket transport
|
||||
self._transport.set_timeout(seconds)
|
||||
# Listen
|
||||
warning_screen = self._yield_warning_screen(seconds)
|
||||
for elapsed_time in warning_screen:
|
||||
if self._should_stop_waiting(**kw):
|
||||
|
|
@ -224,6 +234,7 @@ class EngineIO(LoggingMixin):
|
|||
except PacketError:
|
||||
pass
|
||||
self._heartbeat_thread.relax()
|
||||
self._transport.set_timeout()
|
||||
|
||||
def _should_stop_waiting(self):
|
||||
return self._wants_to_close
|
||||
|
|
|
|||
|
|
@ -33,6 +33,10 @@ def _yield_elapsed_time(seconds=None):
|
|||
start_time = time.time()
|
||||
if seconds is None:
|
||||
while True:
|
||||
yield time.time() - start_time
|
||||
while time.time() - start_time < seconds:
|
||||
yield time.time() - start_time
|
||||
yield _get_elapsed_time(start_time)
|
||||
while _get_elapsed_time(start_time) < seconds:
|
||||
yield _get_elapsed_time(start_time)
|
||||
|
||||
|
||||
def _get_elapsed_time(start_time):
|
||||
return time.time() - start_time
|
||||
|
|
|
|||
|
|
@ -116,7 +116,7 @@ class BaseMixin(object):
|
|||
def test_wait_with_disconnect(self):
|
||||
'Exit loop when the client wants to disconnect'
|
||||
self.socketIO.define(Namespace)
|
||||
self.socketIO.emit('wait_with_disconnect')
|
||||
self.socketIO.disconnect()
|
||||
timeout_in_seconds = 5
|
||||
start_time = time.time()
|
||||
self.socketIO.wait(timeout_in_seconds)
|
||||
|
|
@ -171,7 +171,7 @@ class Test_WebsocketTransport(BaseMixin, TestCase):
|
|||
self.socketIO = SocketIO(HOST, PORT, LoggingNamespace, transports=[
|
||||
'xhr-polling', 'websocket'], verify=False)
|
||||
self.assertEqual(self.socketIO.transport_name, 'websocket')
|
||||
self.wait_time_in_seconds = 0.1
|
||||
self.wait_time_in_seconds = 1
|
||||
|
||||
|
||||
class Namespace(LoggingNamespace):
|
||||
|
|
|
|||
|
|
@ -64,9 +64,6 @@ io.on('connection', function(socket) {
|
|||
socket.on('bbb', function(payload, fn) {
|
||||
if (fn) fn(payload);
|
||||
});
|
||||
socket.on('wait_with_disconnect', function() {
|
||||
socket.emit('wait_with_disconnect_response');
|
||||
});
|
||||
});
|
||||
|
||||
io.of('/chat').on('connection', function(socket) {
|
||||
|
|
|
|||
|
|
@ -41,6 +41,9 @@ class AbstractTransport(object):
|
|||
def send_packet(self, engineIO_packet_type, engineIO_packet_data=''):
|
||||
pass
|
||||
|
||||
def set_timeout(self, seconds=None):
|
||||
pass
|
||||
|
||||
|
||||
class XHR_PollingTransport(AbstractTransport):
|
||||
|
||||
|
|
@ -105,7 +108,7 @@ class WebsocketTransport(AbstractTransport):
|
|||
kw = {'header': ['%s: %s' % x for x in request.headers.items()]}
|
||||
if engineIO_session:
|
||||
params['sid'] = engineIO_session.id
|
||||
kw['timeout'] = engineIO_session.ping_timeout
|
||||
kw['timeout'] = self._timeout = engineIO_session.ping_timeout
|
||||
ws_url = '%s://%s/?%s' % (
|
||||
'wss' if is_secure else 'ws', url, format_query(params))
|
||||
http_scheme = 'https' if is_secure else 'http'
|
||||
|
|
@ -155,6 +158,9 @@ class WebsocketTransport(AbstractTransport):
|
|||
except websocket.WebSocketConnectionClosedException as e:
|
||||
raise ConnectionError('send disconnected (%s)' % e)
|
||||
|
||||
def set_timeout(self, seconds=None):
|
||||
self._connection.settimeout(seconds or self._timeout)
|
||||
|
||||
|
||||
def get_response(request, *args, **kw):
|
||||
try:
|
||||
|
|
@ -167,7 +173,8 @@ def get_response(request, *args, **kw):
|
|||
raise ConnectionError('could not negotiate SSL (%s)' % e)
|
||||
status_code = response.status_code
|
||||
if 200 != status_code:
|
||||
raise ConnectionError('unexpected status code (%s)' % status_code)
|
||||
raise ConnectionError('unexpected status code (%s %s)' % (
|
||||
status_code, response.text))
|
||||
return response
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue