Revive heartbeat thread to exit recv with pong thanks to sarietta
This commit is contained in:
parent
911c04cbf1
commit
8289624c47
5 changed files with 147 additions and 35 deletions
|
|
@ -1,14 +1,17 @@
|
|||
import json
|
||||
import logging
|
||||
import requests
|
||||
import threading
|
||||
import time
|
||||
|
||||
from .exceptions import PacketError
|
||||
from .exceptions import PacketError, TimeoutError
|
||||
from .transports import _get_response
|
||||
|
||||
|
||||
__version__ = '0.6.1'
|
||||
_log = logging.getLogger(__name__)
|
||||
TRANSPORTS = []
|
||||
RETRY_INTERVAL_IN_SECONDS = 1
|
||||
|
||||
|
||||
class EngineIO(object):
|
||||
|
|
@ -22,10 +25,10 @@ class EngineIO(object):
|
|||
wait_for_connection=True, transports=TRANSPORTS,
|
||||
resource='engine.io', **kw):
|
||||
self._url = 'http://%s:%s/%s/' % (host, port, resource)
|
||||
self._session = requests.Session()
|
||||
self._http_session = requests.Session()
|
||||
print self._url
|
||||
|
||||
response = self._session.get(self._url, params={
|
||||
response = self._http_session.get(self._url, params={
|
||||
'EIO': self._engineIO_protocol,
|
||||
'transport': 'polling',
|
||||
't': self._get_timestamp(),
|
||||
|
|
@ -35,23 +38,48 @@ class EngineIO(object):
|
|||
engineIO_packets = _decode_content(response.content)
|
||||
engineIO_packet_type, engineIO_packet_data = engineIO_packets[0]
|
||||
assert engineIO_packet_type == 0
|
||||
engineIO_packet_json = json.loads(engineIO_packet_data)
|
||||
print engineIO_packet_json
|
||||
# engineIO_packet_json['pingInterval']
|
||||
# engineIO_packet_json['pingTimeout']
|
||||
# engineIO_packet_json['upgrades']
|
||||
self._session_id = engineIO_packet_json['sid']
|
||||
packet_json = json.loads(engineIO_packet_data)
|
||||
print packet_json
|
||||
# packet_json['upgrades']
|
||||
self._ping_interval = packet_json['pingInterval'] / float(1000)
|
||||
self._ping_timeout = packet_json['pingTimeout'] / float(1000)
|
||||
self._session_id = packet_json['sid']
|
||||
|
||||
if Namespace:
|
||||
self.define(Namespace)
|
||||
|
||||
def wait(self):
|
||||
while True:
|
||||
self._process_packets()
|
||||
self._heartbeat_thread = HeartbeatThread(
|
||||
send_heartbeat=self._ping,
|
||||
relax_interval_in_seconds=self._ping_interval,
|
||||
hurry_interval_in_seconds=1)
|
||||
self._heartbeat_thread.start()
|
||||
|
||||
def define(self, Namespace):
|
||||
self._namespace = namespace = Namespace(self)
|
||||
return namespace
|
||||
|
||||
def get_namespace(self):
|
||||
try:
|
||||
return self._namespace
|
||||
except AttributeError:
|
||||
raise PacketError('undefined engine.io namespace')
|
||||
|
||||
def wait(self, seconds=None):
|
||||
self._heartbeat_thread.hurry()
|
||||
warning_screen = _yield_warning_screen(seconds)
|
||||
for elapsed_time in warning_screen:
|
||||
try:
|
||||
self._process_packets()
|
||||
except TimeoutError:
|
||||
pass
|
||||
self._heartbeat_thread.relax()
|
||||
|
||||
def _process_packets(self):
|
||||
for engineIO_packet in self._recv_packet():
|
||||
self._process_packet(engineIO_packet)
|
||||
try:
|
||||
self._process_packet(engineIO_packet)
|
||||
except PacketError as e:
|
||||
self._log(logging.WARNING, '[packet error] %s', e)
|
||||
|
||||
def _process_packet(self, packet):
|
||||
engineIO_packet_type, engineIO_packet_data = packet
|
||||
|
|
@ -74,17 +102,8 @@ class EngineIO(object):
|
|||
raise PacketError(
|
||||
'unexpected engine.io packet type (%s)' % engineIO_packet_type)
|
||||
delegate(engineIO_packet_data_parsed, namespace._find_packet_callback)
|
||||
return engineIO_packet_data
|
||||
|
||||
def define(self, Namespace):
|
||||
self._namespace = namespace = Namespace(self)
|
||||
return namespace
|
||||
|
||||
def get_namespace(self):
|
||||
try:
|
||||
return self._namespace
|
||||
except AttributeError:
|
||||
raise PacketError('undefined engine.io namespace')
|
||||
if engineIO_packet_type is 4:
|
||||
return engineIO_packet_data
|
||||
|
||||
def _on_open(self, data_parsed, find_packet_callback):
|
||||
pass
|
||||
|
|
@ -115,7 +134,7 @@ class EngineIO(object):
|
|||
|
||||
def _message(self, engineIO_packet_data):
|
||||
engineIO_packet_type = 4
|
||||
response = self._session.post(self._url, params={
|
||||
response = self._http_session.post(self._url, params={
|
||||
'EIO': self._engineIO_protocol,
|
||||
'transport': 'polling',
|
||||
't': self._get_timestamp(),
|
||||
|
|
@ -125,6 +144,7 @@ class EngineIO(object):
|
|||
]), headers={
|
||||
'content-type': 'application/octet-stream',
|
||||
})
|
||||
print 'message()'
|
||||
engineIO_packets = _decode_content(response.content)
|
||||
for engineIO_packet_type, engineIO_packet_data in engineIO_packets:
|
||||
socketIO_packet_type = int(engineIO_packet_data[0])
|
||||
|
|
@ -136,7 +156,7 @@ class EngineIO(object):
|
|||
def _ping(self):
|
||||
engineIO_packet_type = 2
|
||||
engineIO_packet_data = ''
|
||||
response = self._session.post(self._url, params={
|
||||
response = self._http_session.post(self._url, params={
|
||||
'EIO': self._engineIO_protocol,
|
||||
'transport': 'polling',
|
||||
't': self._get_timestamp(),
|
||||
|
|
@ -146,6 +166,7 @@ class EngineIO(object):
|
|||
]), headers={
|
||||
'content-type': 'application/octet-stream',
|
||||
})
|
||||
print 'ping()'
|
||||
engineIO_packets = _decode_content(response.content)
|
||||
for engineIO_packet_type, engineIO_packet_data in engineIO_packets:
|
||||
socketIO_packet_type = int(engineIO_packet_data[0])
|
||||
|
|
@ -155,15 +176,22 @@ class EngineIO(object):
|
|||
print 'socketIO_packet_data = %s' % socketIO_packet_data
|
||||
|
||||
def _recv_packet(self):
|
||||
response = self._session.get(self._url, params={
|
||||
'EIO': self._engineIO_protocol,
|
||||
'transport': 'polling',
|
||||
't': self._get_timestamp(),
|
||||
'sid': self._session_id,
|
||||
})
|
||||
response = _get_response(
|
||||
self._http_session.get,
|
||||
self._url,
|
||||
params={
|
||||
'EIO': self._engineIO_protocol,
|
||||
'transport': 'polling',
|
||||
't': self._get_timestamp(),
|
||||
'sid': self._session_id,
|
||||
},
|
||||
timeout=self._ping_timeout)
|
||||
for engineIO_packet in _decode_content(response.content):
|
||||
yield engineIO_packet
|
||||
|
||||
def _log(self, level, msg, *attrs):
|
||||
_log.log(level, '%s: %s' % (self._url, msg), *attrs)
|
||||
|
||||
|
||||
class SocketIO(EngineIO):
|
||||
|
||||
|
|
@ -200,6 +228,8 @@ class SocketIO(EngineIO):
|
|||
|
||||
def _process_packet(self, packet):
|
||||
engineIO_packet_data = super(SocketIO, self)._process_packet(packet)
|
||||
if engineIO_packet_data is None:
|
||||
return
|
||||
socketIO_packet_type = int(engineIO_packet_data[0])
|
||||
socketIO_packet_data = engineIO_packet_data[1:]
|
||||
print 'socketIO_packet_type = %s' % socketIO_packet_type
|
||||
|
|
@ -220,6 +250,8 @@ class SocketIO(EngineIO):
|
|||
except KeyError:
|
||||
raise PacketError(
|
||||
'unexpected socket.io packet type (%s)' % socketIO_packet_type)
|
||||
print socketIO_packet_data_parsed
|
||||
print namespace._find_packet_callback
|
||||
delegate(socketIO_packet_data_parsed, namespace._find_packet_callback)
|
||||
return socketIO_packet_data
|
||||
|
||||
|
|
@ -418,3 +450,60 @@ def _parse_engineIO_data(data):
|
|||
|
||||
def _parse_socketIO_data(data):
|
||||
return data
|
||||
|
||||
|
||||
def _yield_warning_screen(seconds=None):
|
||||
last_warning = None
|
||||
for elapsed_time in _yield_elapsed_time(seconds):
|
||||
try:
|
||||
yield elapsed_time
|
||||
except Exception as warning:
|
||||
warning = str(warning)
|
||||
if last_warning != warning:
|
||||
last_warning = warning
|
||||
_log.warn(warning)
|
||||
time.sleep(RETRY_INTERVAL_IN_SECONDS)
|
||||
|
||||
|
||||
def _yield_elapsed_time(seconds=None):
|
||||
start_time = time.time()
|
||||
if seconds is None:
|
||||
while True:
|
||||
yield time.time() - start_time
|
||||
while time.time() - start_time < seconds:
|
||||
yield time.time() - start_time
|
||||
|
||||
|
||||
class HeartbeatThread(threading.Thread):
|
||||
|
||||
daemon = True
|
||||
|
||||
def __init__(
|
||||
self, send_heartbeat,
|
||||
relax_interval_in_seconds,
|
||||
hurry_interval_in_seconds):
|
||||
super(HeartbeatThread, self).__init__()
|
||||
self._send_heartbeat = send_heartbeat
|
||||
self._relax_interval_in_seconds = relax_interval_in_seconds
|
||||
self._hurry_interval_in_seconds = hurry_interval_in_seconds
|
||||
self._adrenaline = threading.Event()
|
||||
self._rest = threading.Event()
|
||||
self._stop = threading.Event()
|
||||
|
||||
def run(self):
|
||||
while not self._stop.is_set():
|
||||
self._send_heartbeat()
|
||||
if self._adrenaline.is_set():
|
||||
interval_in_seconds = self._hurry_interval_in_seconds
|
||||
else:
|
||||
interval_in_seconds = self._relax_interval_in_seconds
|
||||
self._rest.wait(interval_in_seconds)
|
||||
|
||||
def relax(self):
|
||||
self._adrenaline.clear()
|
||||
|
||||
def hurry(self):
|
||||
self._adrenaline.set()
|
||||
|
||||
def stop(self):
|
||||
self._stop.set()
|
||||
|
|
|
|||
|
|
@ -2,5 +2,13 @@ class SocketIOError(Exception):
|
|||
pass
|
||||
|
||||
|
||||
class ConnectionError(SocketIOError):
|
||||
pass
|
||||
|
||||
|
||||
class TimeoutError(SocketIOError):
|
||||
pass
|
||||
|
||||
|
||||
class PacketError(SocketIOError):
|
||||
pass
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ class Namespace(LoggingSocketIONamespace):
|
|||
self.args_by_event = {}
|
||||
|
||||
def on_event(self, event, *args):
|
||||
print 'xxx *** xxx'
|
||||
callback, args = find_callback(args)
|
||||
if callback:
|
||||
callback(*args)
|
||||
|
|
|
|||
17
socketIO_client/transports.py
Normal file
17
socketIO_client/transports.py
Normal file
|
|
@ -0,0 +1,17 @@
|
|||
import requests
|
||||
from .exceptions import ConnectionError, TimeoutError
|
||||
|
||||
|
||||
def _get_response(request, *args, **kw):
|
||||
try:
|
||||
response = request(*args, **kw)
|
||||
except requests.exceptions.Timeout as e:
|
||||
raise TimeoutError(e)
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
raise ConnectionError(e)
|
||||
except requests.exceptions.SSLError as e:
|
||||
raise ConnectionError('could not negotiate SSL (%s)' % e)
|
||||
status_code = response.status_code
|
||||
if 200 != status_code:
|
||||
raise ConnectionError('unexpected status code (%s)' % status_code)
|
||||
return response
|
||||
Loading…
Add table
Add a link
Reference in a new issue