From 5ecc5fb36c5ec43308016833bd08825b846d827f Mon Sep 17 00:00:00 2001 From: Roy Hyunjin Han Date: Sat, 21 Feb 2015 13:20:28 -0500 Subject: [PATCH] Split into modules --- socketIO_client/__init__.py | 648 +++++-------------- socketIO_client/heartbeats.py | 47 ++ socketIO_client/logs.py | 38 ++ socketIO_client/namespaces.py | 198 ++++++ socketIO_client/parsers.py | 93 +++ socketIO_client/{compat.py => symmetries.py} | 6 +- socketIO_client/transports.py | 105 ++- 7 files changed, 625 insertions(+), 510 deletions(-) create mode 100644 socketIO_client/heartbeats.py create mode 100644 socketIO_client/logs.py create mode 100644 socketIO_client/namespaces.py create mode 100644 socketIO_client/parsers.py rename socketIO_client/{compat.py => symmetries.py} (79%) diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index 0b29107..2d1cc66 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -1,211 +1,34 @@ import json -import logging -import requests -import threading -import time -from collections import namedtuple -from . import transports -from .compat import get_byte, get_character, get_unicode, parse_url from .exceptions import ConnectionError, TimeoutError, PacketError +from .heartbeats import HeartbeatThread +from .logs import LoggingMixin +from .namespaces import EngineIONamespace, SocketIONamespace, find_callback +from .parsers import parse_host, parse_socketIO_data +from .symmetries import encode_string, get_character +from .transports import XHR_PollingTransport, prepare_http_session, TRANSPORTS +__all__ = 'SocketIO', 'SocketIONamespace' __version__ = '0.6.1' -_log = logging.getLogger(__name__) -SocketIOData = namedtuple('SocketIOData', ['path', 'ack_id', 'args']) -TRANSPORTS = [] -RETRY_INTERVAL_IN_SECONDS = 1 -class EngineIONamespace(object): - 'Define engine.io client behavior' - - def __init__(self, io): - self._io = io - self._callback_by_event = {} - self.initialize() - - def initialize(self): - """Initialize custom variables here. - You can override this method.""" - - def on(self, event, callback): - 'Define a callback to handle an event emitted by the server' - self._callback_by_event[event] = callback - - def on_open(self): - """Called after engine.io connects. - You can override this method.""" - - def on_close(self): - """Called after engine.io disconnects. - You can override this method.""" - - def on_ping(self, data): - """Called after engine.io sends a ping packet. - You can override this method.""" - - def on_pong(self, data): - """Called after engine.io sends a pong packet. - You can override this method.""" - - def on_message(self, data): - """Called after engine.io sends a message packet. - You can override this method.""" - - def on_upgrade(self): - """Called after engine.io sends an upgrade packet. - You can override this method.""" - - def on_noop(self): - """Called after engine.io sends a noop packet. - You can override this method.""" - - def _find_packet_callback(self, event): - # Check callbacks defined by on() - try: - return self._callback_by_event[event] - except KeyError: - pass - # Check callbacks defined explicitly - return getattr(self, 'on_' + event) - - -class SocketIONamespace(EngineIONamespace): - 'Define socket.io client behavior' - - def __init__(self, io, path): - self.path = path - super(SocketIONamespace, self).__init__(io) - - def on_connect(self): - """Called after socket.io connects. - You can override this method.""" - - def on_reconnect(self): - """Called after socket.io reconnects. - You can override this method.""" - - def on_disconnect(self): - """Called after socket.io disconnects. - You can override this method.""" - - def on_event(self, event, *args): - """ - Called if there is no matching event handler. - You can override this method. - There are three ways to define an event handler: - - - Call socketIO.on() - - socketIO = SocketIO('localhost', 8000) - socketIO.on('my_event', my_function) - - - Call namespace.on() - - namespace = socketIO.get_namespace() - namespace.on('my_event', my_function) - - - Define namespace.on_xxx - - class Namespace(SocketIONamespace): - - def on_my_event(self, *args): - my_function(*args) - - socketIO.define(Namespace)""" - - def on_error(self, data): - """Called after socket.io sends an error packet. - You can override this method.""" - - def _find_packet_callback(self, event): - # Interpret events - if event == 'connect': - if not hasattr(self, '_was_connected'): - self._was_connected = True - else: - event = 'reconnect' - # Check callbacks defined by on() - try: - return self._callback_by_event[event] - except KeyError: - pass - # Check callbacks defined explicitly or use on_event() - return getattr( - self, 'on_' + event.replace(' ', '_'), - lambda *args: self.on_event(event, *args)) - - -class LoggingMixin(object): - - def _log(self, level, msg, *attrs): - _log.log(level, '%s: %s' % (self._io._url, msg), *attrs) - - -class LoggingEngineIONamespace(EngineIONamespace, LoggingMixin): - - def on_event(self, event, *args): - callback, args = find_callback(args) - arguments = [repr(_) for _ in args] - if callback: - arguments.append('callback(*args)') - self._log( - logging.INFO, '[event] %s(%s)', - event, ', '.join(arguments)) - super(LoggingEngineIONamespace, self).on_event(event, *args) - - -class LoggingSocketIONamespace(SocketIONamespace, LoggingMixin): - - def on_event(self, event, *args): - callback, args = find_callback(args) - arguments = [repr(_) for _ in args] - if callback: - arguments.append('callback(*args)') - self._log( - logging.INFO, '%s [event] %s(%s)', self.path, - event, ', '.join(arguments)) - super(LoggingSocketIONamespace, self).on_event(event, *args) - - def on_connect(self): - self._log(logging.DEBUG, '%s [connect]', self.path) - super(LoggingSocketIONamespace, self).on_connect() - - def on_reconnect(self): - self._log(logging.DEBUG, '%s [reconnect]', self.path) - super(LoggingSocketIONamespace, self).on_reconnect() - - def on_disconnect(self): - self._log(logging.DEBUG, '%s [disconnect]', self.path) - super(LoggingSocketIONamespace, self).on_disconnect() - - -class EngineIO(object): - - _engineIO_request_index = 0 +class EngineIO(LoggingMixin): def __init__( - self, - host, port=None, Namespace=None, + self, host, port=None, Namespace=None, wait_for_connection=True, transports=TRANSPORTS, resource='engine.io', **kw): - self._is_secure, self._url = _parse_host(host, port, resource) + self._is_secure, self._url = parse_host(host, port, resource) self._wait_for_connection = wait_for_connection self._client_transports = transports - self._kw = kw - self._http_session = requests.Session() + self._http_session = prepare_http_session(kw) + self._log_name = self._url + self._wants_to_close = False if Namespace: self.define(Namespace) - def __enter__(self): - return self - - def __exit__(self, *exception_pack): - self.close() - - def __del__(self): - self.close() + # Connect @property def connected(self): @@ -216,6 +39,81 @@ class EngineIO(object): else: return transport.connected + @property + def _transport(self): + try: + if self.connected: + return self.__transport + except AttributeError: + pass + self._get_engineIO_session() + self._negotiate_transport() + self._reset_heartbeat() + self._connect_namespaces() + return self.__transport + + def _get_engineIO_session(self): + warning_screen = self._yield_warning_screen() + for elapsed_time in warning_screen: + transport = XHR_PollingTransport( + self._http_session, self._is_secure, self._url) + try: + engineIO_packet_type, engineIO_packet_data = next( + transport.recv_packet()) + + except (TimeoutError, ConnectionError) as e: + if not self._wait_for_connection: + raise + warning = Exception('[waiting for connection] %s' % e) + warning_screen.throw(warning) + assert engineIO_packet_type == 0 + value_by_name = json.loads(encode_string(engineIO_packet_data)) + self._session_id = value_by_name['sid'] + self._ping_interval = value_by_name['pingInterval'] / float(1000) + self._ping_timeout = value_by_name['pingTimeout'] / float(1000) + self._transport_upgrades = value_by_name['upgrades'] + + def _negotiate_transport(self): + self.__transport = self._get_transport('xhr-polling') + + def _reset_heartbeat(self): + try: + self._heartbeat_thread.stop() + except AttributeError: + pass + self._heartbeat_thread = HeartbeatThread( + send_heartbeat=self.__transport._ping, + relax_interval_in_seconds=self._ping_interval, + hurry_interval_in_seconds=1) + self._heartbeat_thread.start() + + def _connect_namespaces(self): + pass + + def _get_transport(self, transport_name): + self._debug('[transport selected] %s', transport_name) + SelectedTransport = { + 'xhr-polling': XHR_PollingTransport, + }[transport_name] + return SelectedTransport( + self._http_session, self._is_secure, self._url, + self._engineIO_session) + + def __enter__(self): + return self + + def __exit__(self, *exception_pack): + self._close() + + def __del__(self): + self._close() + + # Define + + def define(self, Namespace): + self._namespace = namespace = Namespace(self) + return namespace + def on(self, event, callback): try: namespace = self.get_namespace() @@ -223,20 +121,51 @@ class EngineIO(object): namespace = self.define(EngineIONamespace) return namespace.on(event, callback) - def define(self, Namespace): - self._namespace = namespace = Namespace(self) - return namespace - def get_namespace(self): try: return self._namespace except AttributeError: raise PacketError('undefined engine.io namespace') + # Act + + def _open(self): + engineIO_packet_type = 0 + self._transport.send_packet(engineIO_packet_type, '') + + def _close(self): + self._wants_to_close = True + if not self.connected: + return + engineIO_packet_type = 1 + self._transport.send_packet(engineIO_packet_type, '') + + def _ping(self, engineIO_packet_data=''): + engineIO_packet_type = 2 + self._transport.send_packet(engineIO_packet_type, engineIO_packet_data) + + def _pong(self, engineIO_packet_data=''): + engineIO_packet_type = 3 + self._transport.send_packet(engineIO_packet_type, engineIO_packet_data) + + def _message(self, engineIO_packet_data): + engineIO_packet_type = 4 + self._transport.send_packet(engineIO_packet_type, engineIO_packet_data) + + def _upgrade(self): + engineIO_packet_type = 5 + self._transport.send_packet(engineIO_packet_type, '') + + def _noop(self): + engineIO_packet_type = 6 + self._transport.send_packet(engineIO_packet_type, '') + + # React + def wait(self, seconds=None, **kw): 'Wait in a loop and react to events as defined in the namespaces' self._heartbeat_thread.hurry() - warning_screen = _yield_warning_screen(seconds) + warning_screen = self._yield_warning_screen(seconds) for elapsed_time in warning_screen: if self._should_stop_waiting(**kw): break @@ -250,7 +179,7 @@ class EngineIO(object): warning = Exception('[connection error] %s' % e) warning_screen.throw(warning) except StopIteration: - self._log(logging.WARNING, warning) + self._warn(warning) try: namespace = self.get_namespace() namespace.on_disconnect() @@ -260,14 +189,14 @@ class EngineIO(object): def _should_stop_waiting(self): # Use __transport to make sure that we do not reconnect inadvertently - return self.__transport._wants_to_disconnect + return self._wants_to_close def _process_packets(self): for engineIO_packet in self._transport.recv_packet(): try: self._process_packet(engineIO_packet) except PacketError as e: - self._log(logging.WARNING, '[packet error] %s', e) + self._warn('[packet error] %s', e) def _process_packet(self, packet): engineIO_packet_type, engineIO_packet_data = packet @@ -313,90 +242,6 @@ class EngineIO(object): def _on_noop(self, data, find_packet_callback): find_packet_callback('noop')() - def _get_timestamp(self): - timestamp = '%s-%s' % ( - int(time.time() * 1000), self._engineIO_request_index) - self._engineIO_request_index += 1 - return timestamp - - def _message(self, engineIO_packet_data): - engineIO_packet_type = 4 - self._transport.send_packet(engineIO_packet_type, engineIO_packet_data) - - def _ping(self, engineIO_packet_data=''): - engineIO_packet_type = 2 - self._transport.send_packet(engineIO_packet_type, engineIO_packet_data) - - def _pong(self, engineIO_packet_data=''): - engineIO_packet_type = 3 - self._transport.send_packet(engineIO_packet_type, engineIO_packet_data) - - @property - def _transport(self): - try: - if self.connected: - return self.__transport - except AttributeError: - pass - self._get_engineIO_session() - self._negotiate_transport() - self._reset_heartbeat() - self._connect_namespaces() - return self.__transport - - def _get_engineIO_session(self): - url = '%s://%s/' % ('https' if self.is_secure else 'http', self._url) - warning_screen = _yield_warning_screen() - for elapsed_time in warning_screen: - try: - engineIO_packet_type, engineIO_packet_data = next( - XHR_PollingTransport().recv_packet()) - - response = _get_response(self._http_session.get, url, params={ - 'EIO': self._engineIO_protocol, - 'transport': 'polling', - 't': self._get_timestamp(), - }, **self._kw) - except (TimeoutError, ConnectionError) as e: - if not self._wait_for_connection: - raise - warning = Exception('[waiting for connection] %s' % e) - warning_screen.throw(warning) - engineIO_packets = _decode_engineIO_content(response.content) - engineIO_packet_type, engineIO_packet_data = engineIO_packets[0] - assert engineIO_packet_type == 0 - value_by_name = json.loads(get_unicode(engineIO_packet_data)) - self._session_id = value_by_name['sid'] - self._ping_interval = value_by_name['pingInterval'] / float(1000) - self._ping_timeout = value_by_name['pingTimeout'] / float(1000) - self._transport_upgrades = value_by_name['upgrades'] - - def _negotiate_transport(self): - self.__transport = self._get_transport('xhr-polling') - - def _reset_heartbeat(self): - try: - self._heartbeat_thread.stop() - except AttributeError: - pass - self._heartbeat_thread = HeartbeatThread( - send_heartbeat=self.__transport._ping, - relax_interval_in_seconds=self._ping_interval, - hurry_interval_in_seconds=1) - self._heartbeat_thread.start() - - def _connect_namespaces(self): - pass - - def _get_transport(self, transport_name): - self._log(logging.DEBUG, '[transport chosen] %s', transport_name) - return { - 'xhr-polling': transports.XHR_PollingTransport, - }[transport_name]() - - def _log(self, level, msg, *attrs): - _log.log(level, '%s: %s' % (self._url, msg), *attrs) - class SocketIO(EngineIO): """Create a socket.io client that connects to a socket.io server @@ -417,18 +262,24 @@ class SocketIO(EngineIO): """ def __init__( - self, - host, port=None, Namespace=None, + self, host, port=None, Namespace=None, wait_for_connection=True, transports=TRANSPORTS, resource='socket.io', **kw): self._namespace_by_path = {} self._callback_by_ack_id = {} self._ack_id = 0 super(SocketIO, self).__init__( - host, port, Namespace, - wait_for_connection, transports, + host, port, Namespace, wait_for_connection, transports, resource, **kw) + # Connect + + def _connect_namespaces(self): + for path, namespace in self._namespace_by_path.items(): + namespace._transport = self.__transport + if path: + self.__transport.connect(path) + def __exit__(self, *exception_pack): self.disconnect() super(SocketIO, self).__exit__(*exception_pack) @@ -437,12 +288,7 @@ class SocketIO(EngineIO): self.disconnect() super(SocketIO, self).__del__() - def on(self, event, callback, path=''): - try: - namespace = self.get_namespace(path) - except PacketError: - namespace = self.define(SocketIONamespace, path) - return namespace.on(event, callback) + # Define def define(self, Namespace, path=''): if path: @@ -450,23 +296,26 @@ class SocketIO(EngineIO): self._namespace_by_path[path] = namespace = Namespace(self, path) return namespace + def on(self, event, callback, path=''): + try: + namespace = self.get_namespace(path) + except PacketError: + namespace = self.define(SocketIONamespace, path) + return namespace.on(event, callback) + def get_namespace(self, path=''): try: return self._namespace_by_path[path] except KeyError: raise PacketError('undefined socket.io namespace (%s)' % path) - def wait(self, seconds=None, for_callbacks=False): - super(SocketIO, self).wait(seconds, for_callbacks=for_callbacks) + # Act - def wait_for_callbacks(self, seconds=None): - self.wait(seconds, for_callbacks=True) + def connect(): + pass - def _should_stop_waiting(self, for_callbacks): - # Use __transport to make sure that we do not reconnect inadvertently - if for_callbacks and not self.__transport.has_ack_callback: - return True - return super(SocketIO, self)._should_stop_waiting() + def disconnect(): + pass def emit(self, event, *args, **kw): path = kw.get('path', '') @@ -496,6 +345,20 @@ class SocketIO(EngineIO): args.append(callback) self.emit('message', *args) + # React + + def wait(self, seconds=None, for_callbacks=False): + super(SocketIO, self).wait(seconds, for_callbacks=for_callbacks) + + def wait_for_callbacks(self, seconds=None): + self.wait(seconds, for_callbacks=True) + + def _should_stop_waiting(self, for_callbacks): + # Use __transport to make sure that we do not reconnect inadvertently + if for_callbacks and not self.__transport.has_ack_callback: + return True + return super(SocketIO, self)._should_stop_waiting() + def _process_packet(self, packet): engineIO_packet_data = super(SocketIO, self)._process_packet(packet) if engineIO_packet_data is None: @@ -528,7 +391,7 @@ class SocketIO(EngineIO): find_packet_callback('disconnect')() def _on_event(self, data, find_packet_callback): - data_parsed = _parse_socketIO_data(data) + data_parsed = parse_socketIO_data(data) args = data_parsed.args try: event = args.pop(0) @@ -540,194 +403,25 @@ class SocketIO(EngineIO): find_packet_callback(event)(*args) def _on_ack(self, data, find_packet_callback): - data_parsed = _parse_socketIO_data(data) + data_parsed = parse_socketIO_data(data) try: ack_callback = self._get_ack_callback(data_parsed.ack_id) except KeyError: return ack_callback(*data_parsed.args) - def _get_ack_callback(self, ack_id): - return self._callback_by_ack_id.pop(ack_id) - def _on_error(self, data, find_packet_callback): find_packet_callback('error')(data) def _on_binary_event(self, data, find_packet_callback): - self._log(logging.WARNING, '[not implemented] binary event') + self._warn('[not implemented] binary event') def _on_binary_ack(self, data, find_packet_callback): - self._log(logging.WARNING, '[not implemented] binary ack') + self._warn('[not implemented] binary ack') def _prepare_to_send_ack(self, path, ack_id): 'Return function that acknowledges the server' return lambda *args: self._ack(path, ack_id, *args) - def _connect_namespaces(self): - for path, namespace in self._namespace_by_path.items(): - namespace._transport = self.__transport - if path: - self.__transport.connect(path) - - -class HeartbeatThread(threading.Thread): - - daemon = True - - def __init__( - self, send_heartbeat, - relax_interval_in_seconds, - hurry_interval_in_seconds): - super(HeartbeatThread, self).__init__() - self._send_heartbeat = send_heartbeat - self._relax_interval_in_seconds = relax_interval_in_seconds - self._hurry_interval_in_seconds = hurry_interval_in_seconds - self._adrenaline = threading.Event() - self._rest = threading.Event() - self._stop = threading.Event() - - def run(self): - try: - while not self._stop.is_set(): - try: - self._send_heartbeat() - except TimeoutError: - pass - if self._adrenaline.is_set(): - interval_in_seconds = self._hurry_interval_in_seconds - else: - interval_in_seconds = self._relax_interval_in_seconds - self._rest.wait(interval_in_seconds) - except ConnectionError: - pass - - def relax(self): - self._adrenaline.clear() - - def hurry(self): - self._adrenaline.set() - self._rest.set() - self._rest.clear() - - def stop(self): - self._rest.set() - self._stop.set() - - -def find_callback(args, kw=None): - 'Return callback whether passed as a last argument or as a keyword' - if args and callable(args[-1]): - return args[-1], args[:-1] - try: - return kw['callback'], args - except (KeyError, TypeError): - return None, args - - -def _parse_host(host, port, resource): - if not host.startswith('http'): - host = 'http://' + host - url_pack = parse_url(host) - is_secure = url_pack.scheme == 'https' - port = port or url_pack.port or (443 if is_secure else 80) - url = '%s:%d%s/%s' % (url_pack.hostname, port, url_pack.path, resource) - return is_secure, url - - -def _decode_engineIO_content(content): - packets = [] - content_index = 0 - content_length = len(content) - while content_index < content_length: - try: - content_index, packet_length = _read_packet_length( - content, content_index) - except IndexError: - break - content_index, packet_string = _read_packet_string( - content, content_index, packet_length) - packet_type = int(get_character(packet_string, 0)) - packet_data = packet_string[1:] - packets.append((packet_type, packet_data)) - return packets - - -def _encode_engineIO_content(packets): - parts = [] - for packet_type, packet_data in packets: - packet_string = str(packet_type) + str(packet_data) - parts.append(_make_packet_header(packet_string) + packet_string) - return ''.join(parts) - - -def _read_packet_length(content, content_index): - while get_byte(content, content_index) != 0: - content_index += 1 - content_index += 1 - packet_length_string = '' - byte = get_byte(content, content_index) - while byte != 255: - packet_length_string += str(byte) - content_index += 1 - byte = get_byte(content, content_index) - return content_index, int(packet_length_string) - - -def _read_packet_string(content, content_index, packet_length): - while get_byte(content, content_index) == 255: - content_index += 1 - packet_string = content[content_index:content_index + packet_length] - return content_index + packet_length, packet_string - - -def _make_packet_header(packet_string): - length_string = str(len(packet_string)) - header_digits = [0] - for i in range(len(length_string)): - header_digits.append(ord(length_string[i]) - 48) - header_digits.append(255) - return ''.join(chr(x) for x in header_digits) - - -def _parse_socketIO_data(data): - data = get_unicode(data) - if data.startswith('/'): - try: - path, data = data.split(',', 1) - except ValueError: - path = data - data = '' - else: - path = '' - try: - ack_id = int(data[0]) - data = data[1:] - except (ValueError, IndexError): - ack_id = None - try: - args = json.loads(data) - except ValueError: - args = [] - return SocketIOData(path=path, ack_id=ack_id, args=args) - - -def _yield_warning_screen(seconds=None): - last_warning = None - for elapsed_time in _yield_elapsed_time(seconds): - try: - yield elapsed_time - except Exception as warning: - warning = str(warning) - if last_warning != warning: - last_warning = warning - _log.warn(warning) - time.sleep(RETRY_INTERVAL_IN_SECONDS) - - -def _yield_elapsed_time(seconds=None): - start_time = time.time() - if seconds is None: - while True: - yield time.time() - start_time - while time.time() - start_time < seconds: - yield time.time() - start_time + def _get_ack_callback(self, ack_id): + return self._callback_by_ack_id.pop(ack_id) diff --git a/socketIO_client/heartbeats.py b/socketIO_client/heartbeats.py new file mode 100644 index 0000000..f1de194 --- /dev/null +++ b/socketIO_client/heartbeats.py @@ -0,0 +1,47 @@ +from threading import Thread, Event + +from .exceptions import ConnectionError, TimeoutError + + +class HeartbeatThread(Thread): + + daemon = True + + def __init__( + self, send_heartbeat, + relax_interval_in_seconds, + hurry_interval_in_seconds): + super(HeartbeatThread, self).__init__() + self._send_heartbeat = send_heartbeat + self._relax_interval_in_seconds = relax_interval_in_seconds + self._hurry_interval_in_seconds = hurry_interval_in_seconds + self._adrenaline = Event() + self._rest = Event() + self._stop = Event() + + def run(self): + try: + while not self._stop.is_set(): + try: + self._send_heartbeat() + except TimeoutError: + pass + if self._adrenaline.is_set(): + interval_in_seconds = self._hurry_interval_in_seconds + else: + interval_in_seconds = self._relax_interval_in_seconds + self._rest.wait(interval_in_seconds) + except ConnectionError: + pass + + def relax(self): + self._adrenaline.clear() + + def hurry(self): + self._adrenaline.set() + self._rest.set() + self._rest.clear() + + def stop(self): + self._rest.set() + self._stop.set() diff --git a/socketIO_client/logs.py b/socketIO_client/logs.py new file mode 100644 index 0000000..ff37d19 --- /dev/null +++ b/socketIO_client/logs.py @@ -0,0 +1,38 @@ +import logging +import time + + +class LoggingMixin(object): + + def _log(self, level, msg, *attrs): + logging.log(level, '%s %s' % (self._log_name, msg), *attrs) + + def _debug(self, msg, *attrs): + self._log(logging.DEBUG, msg, *attrs) + + def _info(self, msg, *attrs): + self._log(logging.INFO, msg, *attrs) + + def _warn(self, msg, *attrs): + self._log(logging.WARNING, msg, *attrs) + + def _yield_warning_screen(self, seconds=None): + last_warning = None + for elapsed_time in _yield_elapsed_time(seconds): + try: + yield elapsed_time + except Exception as warning: + warning = str(warning) + if last_warning != warning: + last_warning = warning + self._warn(warning) + time.sleep(1) + + +def _yield_elapsed_time(seconds=None): + start_time = time.time() + if seconds is None: + while True: + yield time.time() - start_time + while time.time() - start_time < seconds: + yield time.time() - start_time diff --git a/socketIO_client/namespaces.py b/socketIO_client/namespaces.py new file mode 100644 index 0000000..db96007 --- /dev/null +++ b/socketIO_client/namespaces.py @@ -0,0 +1,198 @@ +from .logs import LoggingMixin + + +class EngineIONamespace(LoggingMixin): + 'Define engine.io client behavior' + + def __init__(self, io): + self._io = io + self._callback_by_event = {} + self._log_name = io._url + self.initialize() + + def initialize(self): + """Initialize custom variables here. + You can override this method.""" + + def on(self, event, callback): + 'Define a callback to handle an event emitted by the server' + self._callback_by_event[event] = callback + + def on_open(self): + """Called after engine.io connects. + You can override this method.""" + + def on_close(self): + """Called after engine.io disconnects. + You can override this method.""" + + def on_ping(self, data): + """Called after engine.io sends a ping packet. + You can override this method.""" + + def on_pong(self, data): + """Called after engine.io sends a pong packet. + You can override this method.""" + + def on_message(self, data): + """Called after engine.io sends a message packet. + You can override this method.""" + + def on_upgrade(self): + """Called after engine.io sends an upgrade packet. + You can override this method.""" + + def on_noop(self): + """Called after engine.io sends a noop packet. + You can override this method.""" + + def _find_packet_callback(self, event): + # Check callbacks defined by on() + try: + return self._callback_by_event[event] + except KeyError: + pass + # Check callbacks defined explicitly + return getattr(self, 'on_' + event) + + +class SocketIONamespace(EngineIONamespace): + 'Define socket.io client behavior' + + def __init__(self, io, path): + self.path = path + super(SocketIONamespace, self).__init__(io) + + def on_connect(self): + """Called after socket.io connects. + You can override this method.""" + + def on_reconnect(self): + """Called after socket.io reconnects. + You can override this method.""" + + def on_disconnect(self): + """Called after socket.io disconnects. + You can override this method.""" + + def on_event(self, event, *args): + """ + Called if there is no matching event handler. + You can override this method. + There are three ways to define an event handler: + + - Call socketIO.on() + + socketIO = SocketIO('localhost', 8000) + socketIO.on('my_event', my_function) + + - Call namespace.on() + + namespace = socketIO.get_namespace() + namespace.on('my_event', my_function) + + - Define namespace.on_xxx + + class Namespace(SocketIONamespace): + + def on_my_event(self, *args): + my_function(*args) + + socketIO.define(Namespace)""" + + def on_error(self, data): + """Called after socket.io sends an error packet. + You can override this method.""" + + def _find_packet_callback(self, event): + # Interpret events + if event == 'connect': + if not hasattr(self, '_was_connected'): + self._was_connected = True + else: + event = 'reconnect' + # Check callbacks defined by on() + try: + return self._callback_by_event[event] + except KeyError: + pass + # Check callbacks defined explicitly or use on_event() + return getattr( + self, 'on_' + event.replace(' ', '_'), + lambda *args: self.on_event(event, *args)) + + +class LoggingEngineIONamespace(EngineIONamespace): + + def on_open(self): + self._debug('[open]') + super(LoggingEngineIONamespace, self).on_open() + + def on_close(self): + self._debug('[close]') + super(LoggingEngineIONamespace, self).on_close() + + def on_ping(self, data): + self._debug('[ping] %s' % data) + super(LoggingEngineIONamespace, self).on_ping(data) + + def on_pong(self, data): + self._debug('[pong] %s' % data) + super(LoggingEngineIONamespace, self).on_pong(data) + + def on_message(self, data): + self._debug('[message] %s' % data) + super(LoggingEngineIONamespace, self).on_message(data) + + def on_upgrade(self): + self._debug('[upgrade]') + super(LoggingEngineIONamespace, self).on_upgrade() + + def on_noop(self): + self._debug('[noop]') + super(LoggingEngineIONamespace, self).on_noop() + + def on_event(self, event, *args): + callback, args = find_callback(args) + arguments = [repr(_) for _ in args] + if callback: + arguments.append('callback(*args)') + self._info('[event] %s(%s)', event, ', '.join(arguments)) + super(LoggingEngineIONamespace, self).on_event(event, *args) + + +class LoggingSocketIONamespace(SocketIONamespace): + + def on_connect(self): + self._debug('%s [connect]' % self.path) + super(LoggingSocketIONamespace, self).on_connect() + + def on_reconnect(self): + self._debug('%s [reconnect]' % self.path) + super(LoggingSocketIONamespace, self).on_reconnect() + + def on_disconnect(self): + self._debug('%s [disconnect]' % self.path) + super(LoggingSocketIONamespace, self).on_disconnect() + + def on_event(self, event, *args): + callback, args = find_callback(args) + arguments = [repr(_) for _ in args] + if callback: + arguments.append('callback(*args)') + self._info('%s [event] %s(%s)', self.path, event, ', '.join(arguments)) + super(LoggingSocketIONamespace, self).on_event(event, *args) + + def on_error(self, data): + self._debug('%s [error] %s' % (self.path, data)) + super(LoggingSocketIONamespace, self).on_error() + + +def find_callback(args, kw=None): + 'Return callback whether passed as a last argument or as a keyword' + if args and callable(args[-1]): + return args[-1], args[:-1] + try: + return kw['callback'], args + except (KeyError, TypeError): + return None, args diff --git a/socketIO_client/parsers.py b/socketIO_client/parsers.py new file mode 100644 index 0000000..7a5a9d7 --- /dev/null +++ b/socketIO_client/parsers.py @@ -0,0 +1,93 @@ +import json +from collections import namedtuple + +from .symmetries import encode_string, get_byte, get_character, parse_url + + +SocketIOData = namedtuple('SocketIOData', ['path', 'ack_id', 'args']) + + +def parse_host(host, port, resource): + if not host.startswith('http'): + host = 'http://' + host + url_pack = parse_url(host) + is_secure = url_pack.scheme == 'https' + port = port or url_pack.port or (443 if is_secure else 80) + url = '%s:%d%s/%s' % (url_pack.hostname, port, url_pack.path, resource) + return is_secure, url + + +def encode_engineIO_content(engineIO_packets): + parts = [] + for packet_type, packet_data in engineIO_packets: + packet_string = str(packet_type) + encode_string(packet_data) + parts.append(_make_packet_header(packet_string) + packet_string) + return ''.join(parts) + + +def decode_engineIO_content(content): + content_index = 0 + content_length = len(content) + while content_index < content_length: + try: + content_index, packet_length = _read_packet_length( + content, content_index) + except IndexError: + break + content_index, packet_string = _read_packet_string( + content, content_index, packet_length) + engineIO_packet_type = int(get_character(packet_string, 0)) + engineIO_packet_data = packet_string[1:] + yield engineIO_packet_type, engineIO_packet_data + + +def parse_socketIO_data(data): + data = encode_string(data) + if data.startswith('/'): + try: + path, data = data.split(',', 1) + except ValueError: + path = data + data = '' + else: + path = '' + try: + ack_id_string, data = data.split('[', 1) + data = '[' + data + ack_id = int(ack_id_string) + except (ValueError, IndexError): + ack_id = None + try: + args = json.loads(data) + except ValueError: + args = [] + return SocketIOData(path=path, ack_id=ack_id, args=args) + + +def _make_packet_header(packet_string): + length_string = str(len(packet_string)) + header_digits = [0] + for i in range(len(length_string)): + header_digits.append(ord(length_string[i]) - 48) + header_digits.append(255) + return ''.join(chr(x) for x in header_digits) + + +def _read_packet_length(content, content_index): + while get_byte(content, content_index) != 0: + content_index += 1 + content_index += 1 + packet_length_string = '' + byte = get_byte(content, content_index) + while byte != 255: + packet_length_string += str(byte) + content_index += 1 + byte = get_byte(content, content_index) + return content_index, int(packet_length_string) + + +def _read_packet_string(content, content_index, packet_length): + while get_byte(content, content_index) == 255: + content_index += 1 + packet_string = content[content_index:content_index + packet_length] + return content_index + packet_length, packet_string diff --git a/socketIO_client/compat.py b/socketIO_client/symmetries.py similarity index 79% rename from socketIO_client/compat.py rename to socketIO_client/symmetries.py index 012993c..b061a20 100644 --- a/socketIO_client/compat.py +++ b/socketIO_client/symmetries.py @@ -13,5 +13,9 @@ def get_character(x, index): return chr(six.indexbytes(x, index)) -def get_unicode(x): +def encode_string(x): + return x.encode('utf-8') + + +def decode_string(x): return x.decode('utf-8') diff --git a/socketIO_client/transports.py b/socketIO_client/transports.py index fe641a7..f917b8a 100644 --- a/socketIO_client/transports.py +++ b/socketIO_client/transports.py @@ -1,53 +1,81 @@ import requests +import time + from .exceptions import ConnectionError, TimeoutError +from .parsers import decode_engineIO_content, encode_engineIO_content ENGINEIO_PROTOCOL = 3 +TRANSPORTS = 'websocket', 'xhr-polling' class AbstractTransport(object): - pass + + def __init__(self, http_session, is_secure, url, engineIO_session=None): + self.http_session = http_session + self.is_secure = is_secure + self.url = url + self.engineIO_session = engineIO_session + + def send_packet(self, engineIO_packet_type, engineIO_packet_data): + pass + + def recv_packet(self): + pass class XHR_PollingTransport(AbstractTransport): - # pass http session - # pass session id (can be none) - # pass timeout + def __init__(self, http_session, is_secure, url, engineIO_session=None): + super(XHR_PollingTransport, self).__init__( + http_session, is_secure, url, engineIO_session) + self.http_url = '%s://%s/' % ('https' if is_secure else 'http', url) + self.params = { + 'EIO': ENGINEIO_PROTOCOL, + 'transport': 'polling', + } + if engineIO_session: + self.request_index = 1 + self.kw_get = dict(timeout=engineIO_session.ping_timeout) + self.kw_post = dict(headers={ + 'content-type': 'application/octet-stream', + }) + self.params['sid'] = engineIO_session.id + else: + self.request_index = 0 + self.kw_get = {} + self.kw_post = {} + + def recv_packet(self): + params = dict(self.params) + params['t'] = self.get_timestamp() + response = get_response( + self.http_session.get, + self.http_url, + params=params, + **self.kw_get) + return decode_engineIO_content(response.content) def send_packet(self, engineIO_packet_type, engineIO_packet_data): - _get_response() - - assert ok - - response = self._http_session.post(self._url, params={ - 'EIO': self._engineIO_protocol, - 'transport': 'polling', - 't': self._get_timestamp(), - 'sid': self._session_id, - }, data=_encode_engineIO_content([ - (engineIO_packet_type, engineIO_packet_data), - ]), headers={ - 'content-type': 'application/octet-stream', - }) + params = dict(self.params) + params['t'] = self.get_timestamp() + response = get_response( + self.http_session.post, + self.http_url, + params=params, + data=encode_engineIO_content([ + (engineIO_packet_type, engineIO_packet_data), + ]), + **self.kw_post) assert response.content == 'ok' - def _recv_packet(self): - response = _get_response( - self._http_session.get, - self._url, - params={ - 'EIO': self._engineIO_protocol, - 'transport': 'polling', - 't': self._get_timestamp(), - 'sid': self._session_id, - }, - timeout=self._ping_timeout) - for engineIO_packet in _decode_engineIO_content(response.content): - yield engineIO_packet + def get_timestamp(self): + timestamp = '%s-%s' % (int(time.time() * 1000), self.request_index) + self.request_index += 1 + return timestamp -def _get_response(request, *args, **kw): +def get_response(request, *args, **kw): try: response = request(*args, **kw) except requests.exceptions.Timeout as e: @@ -60,3 +88,16 @@ def _get_response(request, *args, **kw): if 200 != status_code: raise ConnectionError('unexpected status code (%s)' % status_code) return response + + +def prepare_http_session(kw): + http_session = requests.Session() + http_session.headers.update(kw.get('headers', {})) + http_session.auth = kw.get('auth') + http_session.proxies.update(kw.get('proxies', {})) + http_session.hooks.update(kw.get('hooks', {})) + http_session.params.update(kw.get('params', {})) + http_session.verify = kw.get('verify') + http_session.cert = kw.get('cert') + http_session.cookies.update(kw.get('cookies', {})) + return http_session