From 37bd6786d735247f28b794594aa79a88cab1a701 Mon Sep 17 00:00:00 2001 From: Sean Arietta Date: Sat, 20 Dec 2014 16:28:22 -0800 Subject: [PATCH 01/22] Checkpoint --- socketIO_client/__init__.py | 27 ++++++++++++++++------- socketIO_client/parser.py | 41 +++++++++++++++++++++++++++++++++++ socketIO_client/transports.py | 5 ++++- 3 files changed, 64 insertions(+), 9 deletions(-) create mode 100644 socketIO_client/parser.py diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index e966e3a..e4d5fa1 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -1,8 +1,10 @@ +from collections import namedtuple import logging import json +import parser import requests import time -from collections import namedtuple + try: from urllib import parse as parse_url except ImportError: @@ -367,8 +369,7 @@ def _parse_host(host, port): url_pack = parse_url(host) is_secure = url_pack.scheme == 'https' port = port or url_pack.port or (443 if is_secure else 80) - base_url = '%s:%d%s/socket.io/%s' % ( - url_pack.hostname, port, url_pack.path, PROTOCOL_VERSION) + base_url = '%s:%d%s/socket.io/%s' % (url_pack.hostname, port, url_pack.path, PROTOCOL_VERSION) return is_secure, base_url @@ -395,13 +396,23 @@ def _yield_elapsed_time(seconds=None): def _get_socketIO_session(is_secure, base_url, **kw): - server_url = '%s://%s/' % ('https' if is_secure else 'http', base_url) + server_url = '%s://%s/?transport=polling' % ('https' if is_secure else 'http', base_url) + _log.debug('[session] %s', server_url) try: response = _get_response(requests.get, server_url, **kw) except TimeoutError as e: raise ConnectionError(e) - response_parts = response.text.split(':') + + _log.debug("[response] %s", response.text); + decoded = parser.decode_response(response); + _log.debug("[decoded] %s", repr(decoded)); + return _SocketIOSession( - id=response_parts[0], - heartbeat_timeout=int(response_parts[1]), - server_supported_transports=response_parts[3].split(',')) + id = decoded["payload"]["sid"], + heartbeat_timeout = int(decoded["payload"]["pingInterval"]), + server_supported_transports = ["xhr-polling"]);#decoded["payload"]["upgrades"]); + + #return _SocketIOSession( + # id=response_parts[0], + # heartbeat_timeout=int(response_parts[1]), + # server_supported_transports=response_parts[3].split(',')) diff --git a/socketIO_client/parser.py b/socketIO_client/parser.py new file mode 100644 index 0000000..2231481 --- /dev/null +++ b/socketIO_client/parser.py @@ -0,0 +1,41 @@ +import logging +import json + +_log = logging.getLogger(__name__) + +""" Decodes a response from requests lib. +""" +def decode_response(response): + # TODO(sean): Should we use the 'raw' stream instead? + raw_bytes = response.content; + packet_type = "string" if ord(raw_bytes[0]) == 0 else "binary"; + _log.debug("Packet type: %s" % packet_type); + + if packet_type is "string": + length_bytes = []; + offset = 1; + while ord(raw_bytes[offset]) is not 255: + length_bytes.append(ord(raw_bytes[offset])); + offset += 1; + offset += 1; + + length = 0; + base = 1; + for digit in reversed(length_bytes): + length += (int(digit) * base); + base *= 10; + _log.debug("Packet length: %d" % length); + + message_type = raw_bytes[offset]; + offset += 1; + + message = {"type": message_type, "payload": json.loads(raw_bytes[offset:offset + length - 1])}; + _log.debug("Message: %s" % repr(message)); + return message; + else: + pass; + + return ""; + +def decode_packet(packet): + pass; diff --git a/socketIO_client/transports.py b/socketIO_client/transports.py index 172ca15..e1a0064 100644 --- a/socketIO_client/transports.py +++ b/socketIO_client/transports.py @@ -1,5 +1,6 @@ import json import logging +import parser import re import requests import six @@ -79,6 +80,7 @@ class _AbstractTransport(object): for packet_text in self.recv(): _log.debug('[packet received] %s', packet_text) try: + #packet = parser.decode_response(packet_text); packet_parts = packet_text.split(':', 3) except AttributeError: _log.warn('[packet error] %s', packet_text) @@ -167,7 +169,7 @@ class _XHR_PollingTransport(_AbstractTransport): def __init__(self, socketIO_session, is_secure, base_url, **kw): super(_XHR_PollingTransport, self).__init__() - self._url = '%s://%s/xhr-polling/%s' % ( + self._url = '%s://%s/?transport=polling&sid=%s' % ( 'https' if is_secure else 'http', base_url, socketIO_session.id) self._connected = True @@ -198,6 +200,7 @@ class _XHR_PollingTransport(_AbstractTransport): self._url, params=self._params, timeout=TIMEOUT_IN_SECONDS) + #response_text = response.content response_text = response.text if not response_text.startswith(BOUNDARY): yield response_text From d64e947aac64f0c0b5001feb4294a832d494c22d Mon Sep 17 00:00:00 2001 From: Sean Arietta Date: Mon, 22 Dec 2014 00:36:09 -0800 Subject: [PATCH 02/22] Checkpoint for updates. Namespaces are working. Events are working. Reconnects working. Disconnects working. Need to implemenet ACKs / callbacks, WebSocket transport, and JSONP transport --- socketIO_client/__init__.py | 124 ++++++++++++++++++++------- socketIO_client/parser.py | 117 +++++++++++++++++++++---- socketIO_client/transports.py | 155 +++++++++++++++++++++++----------- 3 files changed, 302 insertions(+), 94 deletions(-) diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index e4d5fa1..bd3556a 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -1,6 +1,8 @@ from collections import namedtuple +import copy import logging import json +import multiprocessing import parser import requests import time @@ -16,7 +18,8 @@ from .transports import _get_response, _negotiate_transport, TRANSPORTS _SocketIOSession = namedtuple('_SocketIOSession', [ 'id', - 'heartbeat_timeout', + 'heartbeat_interval', + 'connection_timeout', 'server_supported_transports', ]) _log = logging.getLogger(__name__) @@ -42,7 +45,11 @@ class BaseNamespace(object): def emit(self, event, *args, **kw): callback, args = find_callback(args, kw) - self._transport.emit(self.path, event, args, callback) + + if callback is not None: + _log.warn("Callback was specified but is not supported."); + + self._transport.emit(self.path, event, args, None) def disconnect(self): self._transport.disconnect(self.path) @@ -138,6 +145,16 @@ class SocketIO(object): self._namespace_by_path = {} self.client_supported_transports = transports self.kw = kw + # These two fields work to control the heartbeat thread. + self.heartbeat_terminator = None; + self.heartbeat_thread = None; + # Saved session information. + self.session = None; + # This is stores the set of paths (namespaces) that need to be + # reconnected to. + self.reconnect_paths = {}; + # This sets of a chain of events that attempts to connect to + # the server at the base namespace. self.define(Namespace) def __enter__(self): @@ -145,9 +162,17 @@ class SocketIO(object): def __exit__(self, *exception_pack): self.disconnect() + self._terminate_heartbeat(); def __del__(self): self.disconnect() + self._terminate_heartbeat(); + + def _terminate_heartbeat(self): + if self.heartbeat_terminator is not None: + self.heartbeat_terminator.set(); + #time.sleep(self.session.heartbeat_interval); + self.heartbeat_thread.join(); def define(self, Namespace, path=''): if path: @@ -167,6 +192,19 @@ class SocketIO(object): callback, args = find_callback(args, kw) self._transport.emit(path, event, args, callback) + def reconnect(self): + """Reconnects to a set of namespaces. + + """ + for path in self.reconnect_paths: + # We avoid reconnecting to the default namespace because + # socketIO_client connects to that already. + if (len(self.reconnect_paths) > 1 and path is ''): + continue; + _log.debug("Reconnecting to path: %s" % repr(path)) + self._transport.connect(path); + self.reconnect_paths = {}; + def wait(self, seconds=None, for_callbacks=False): """Wait in a loop and process events as defined in the namespaces. @@ -181,14 +219,28 @@ class SocketIO(object): pass if self._stop_waiting(for_callbacks): break - self.heartbeat_pacemaker.send(elapsed_time) + + # We will end up here in the case that we + # disconnected, then reconnected AND we were + # successful. + if len(self.reconnect_paths) > 0: + self.reconnect(); except ConnectionError as e: try: + # This is where we end up if the connection was + # severed. The client will disconnect here. + if len(self.reconnect_paths) is 0: + self.reconnect_paths = copy.deepcopy(self._namespace_by_path); + + self._terminate_heartbeat(); + warning = Exception('[connection error] %s' % e) + self._transport._connected = False; warning_screen.throw(warning) except StopIteration: _log.warn(warning) self.disconnect() + _log.debug("[wait canceled]"); def _process_events(self): for packet in self._transport.recv_packet(): @@ -249,31 +301,29 @@ class SocketIO(object): return self.__transport def _get_transport(self): - socketIO_session = _get_socketIO_session( - self.is_secure, self.base_url, **self.kw) - _log.debug('[transports available] %s', ' '.join( - socketIO_session.server_supported_transports)) - # Initialize heartbeat_pacemaker - self.heartbeat_pacemaker = self._make_heartbeat_pacemaker( - heartbeat_interval=socketIO_session.heartbeat_timeout / 2) - next(self.heartbeat_pacemaker) + self.session = _get_socketIO_session(self.is_secure, self.base_url, **self.kw) + _log.debug('[transports available] %s', ' '.join(self.session.server_supported_transports)) + # Negotiate transport transport = _negotiate_transport( - self.client_supported_transports, socketIO_session, + self.client_supported_transports, self.session, self.is_secure, self.base_url, **self.kw) # Update namespaces for path, namespace in self._namespace_by_path.items(): namespace._transport = transport transport.connect(path) - return transport + + transport.set_timeout(self.session.connection_timeout); - def _make_heartbeat_pacemaker(self, heartbeat_interval): - heartbeat_time = 0 - while True: - elapsed_time = (yield) - if elapsed_time - heartbeat_time > heartbeat_interval: - heartbeat_time = elapsed_time - self._transport.send_heartbeat() + # Start the heartbeat pacemaker (PING). + _log.debug("[start heartbeat pacemaker]"); + self.heartbeat_terminator = multiprocessing.Event(); + self.heartbeat_thread = multiprocessing.Process( + target = _make_heartbeat_pacemaker, + args = (self.heartbeat_terminator, transport, self.session.heartbeat_interval / 2)); + self.heartbeat_thread.start(); + + return transport def get_namespace(self, path=''): try: @@ -369,7 +419,7 @@ def _parse_host(host, port): url_pack = parse_url(host) is_secure = url_pack.scheme == 'https' port = port or url_pack.port or (443 if is_secure else 80) - base_url = '%s:%d%s/socket.io/%s' % (url_pack.hostname, port, url_pack.path, PROTOCOL_VERSION) + base_url = '%s:%d%s/socket.io' % (url_pack.hostname, port, url_pack.path) return is_secure, base_url @@ -396,7 +446,8 @@ def _yield_elapsed_time(seconds=None): def _get_socketIO_session(is_secure, base_url, **kw): - server_url = '%s://%s/?transport=polling' % ('https' if is_secure else 'http', base_url) + server_url = '%s://%s/?EIO=%d&transport=polling' \ + % ('https' if is_secure else 'http', base_url, parser.ENGINE_PROTOCOL) _log.debug('[session] %s', server_url) try: response = _get_response(requests.get, server_url, **kw) @@ -404,15 +455,28 @@ def _get_socketIO_session(is_secure, base_url, **kw): raise ConnectionError(e) _log.debug("[response] %s", response.text); - decoded = parser.decode_response(response); - _log.debug("[decoded] %s", repr(decoded)); + packet = parser.decode_response(response); + _log.debug("[decoded] %s", repr(packet)); + + if packet.type is not parser.PacketType.OPEN: + _log.warn("Got unexpected packet during connection handshake: %d" % packet.type); + return None; + + handshake = json.loads(packet.payload); return _SocketIOSession( - id = decoded["payload"]["sid"], - heartbeat_timeout = int(decoded["payload"]["pingInterval"]), + id = handshake["sid"], + heartbeat_interval = int(handshake["pingInterval"]) / 1000, + connection_timeout = int(handshake["pingTimeout"]) / 1000, server_supported_transports = ["xhr-polling"]);#decoded["payload"]["upgrades"]); - #return _SocketIOSession( - # id=response_parts[0], - # heartbeat_timeout=int(response_parts[1]), - # server_supported_transports=response_parts[3].split(',')) +def _make_heartbeat_pacemaker(terminator, transport, heartbeat_interval): + while True: + if terminator.wait(heartbeat_interval): + break; + _log.debug("[hearbeat]"); + try: + transport.send_heartbeat(); + except: + pass; + _log.debug("[heartbeat terminated]"); diff --git a/socketIO_client/parser.py b/socketIO_client/parser.py index 2231481..6896c40 100644 --- a/socketIO_client/parser.py +++ b/socketIO_client/parser.py @@ -1,21 +1,94 @@ +from enum import Enum import logging import json _log = logging.getLogger(__name__) -""" Decodes a response from requests lib. -""" -def decode_response(response): - # TODO(sean): Should we use the 'raw' stream instead? - raw_bytes = response.content; - packet_type = "string" if ord(raw_bytes[0]) == 0 else "binary"; - _log.debug("Packet type: %s" % packet_type); +ENGINE_PROTOCOL = 3; - if packet_type is "string": +class PacketType(Enum): + OPEN = 0; + CLOSE = 1; + PING = 2; + PONG = 3; + MESSAGE = 4; + UPGRADE = 5; + NOOP = 6; + +class MessageType(Enum): + CONNECT = 0; + DISCONNECT = 1; + EVENT = 2; + ACK = 3; + ERROR = 4; + BINARY_EVENT = 5; + BINARY_ACK = 6; + +class Packet(): + def __init__(self, packet_type, payload): + self.type = packet_type; + self.payload = payload; + +class Message(): + def __init__(self, message_type, message, path = ""): + self.type = message_type; + if isinstance(message, basestring): + try: + self.message = json.loads(message); + except: + self.message = message; + else: + self.message = message; + + self.path = path; + + def encode_as_json(self): + """Encodes a Message to be sent to socket.io server. + + Assumes the message payload will be dumped as a json string. + """ + if self.path == "": + return str(self.type) + json.dumps(self.message); + return str(self.type) + self.path + "," + json.dumps(self.message); + + def encode_as_string(self): + """Same as the encode_as_string method except it doesn't encode things as a JSON string""" + if self.path == "": + return str(self.type) + self.message; + return str(self.type) + self.path + "," + self.message; + +def decode_message(payload): + """ Decodes a message encoded via socket.io + """ + + message_type = int(payload[0]); + message = payload[1:]; + + return Message(message_type, message); + +def decode_response(response): + """Decodes a response from requests lib. + + """ + # TODO(sean): Should we use the 'raw' stream instead? + return decode_packet(response.content); + +def decode_packet(packet): + """Decodes a packet sent via engine.io. + + If the packet is a message, this method assumes the message was + encoded by socket.io and will parse it as such. + + """ + + packet_format = "string" if ord(packet[0]) == 0 else "binary"; + _log.debug("Packet type: %s" % packet_format); + + if packet_format is "string": length_bytes = []; offset = 1; - while ord(raw_bytes[offset]) is not 255: - length_bytes.append(ord(raw_bytes[offset])); + while ord(packet[offset]) is not 255: + length_bytes.append(ord(packet[offset])); offset += 1; offset += 1; @@ -26,16 +99,30 @@ def decode_response(response): base *= 10; _log.debug("Packet length: %d" % length); - message_type = raw_bytes[offset]; + packet_type = int(packet[offset]); offset += 1; - message = {"type": message_type, "payload": json.loads(raw_bytes[offset:offset + length - 1])}; - _log.debug("Message: %s" % repr(message)); - return message; + payload = packet[offset:offset + length - 1]; + _log.debug("Payload: %s" % repr(payload)); + + if packet_type is PacketType.MESSAGE: + message = decode_message(payload); + payload = message; + + return Packet(packet_type, payload); else: pass; return ""; -def decode_packet(packet): pass; + +def encode_packet_string(code, path, data): + """Encodes packet to be sent to socket.io server. + """ + + code_length = len(str(code)); + data_length = len(data); + length = code_length + data_length; + + return str(length) + ":" + str(code) + str(data); diff --git a/socketIO_client/transports.py b/socketIO_client/transports.py index e1a0064..a2f37e8 100644 --- a/socketIO_client/transports.py +++ b/socketIO_client/transports.py @@ -1,6 +1,7 @@ import json import logging import parser +from parser import Message, MessageType, PacketType import re import requests import six @@ -13,7 +14,7 @@ from .exceptions import SocketIOError, ConnectionError, TimeoutError TRANSPORTS = 'websocket', 'xhr-polling', 'jsonp-polling' BOUNDARY = six.u('\ufffd') -TIMEOUT_IN_SECONDS = 3 +TIMEOUT_IN_SECONDS = 300 _log = logging.getLogger(__name__) @@ -24,6 +25,10 @@ class _AbstractTransport(object): self._callback_by_packet_id = {} self._wants_to_disconnect = False self._packets = [] + self._timeout = TIMEOUT_IN_SECONDS; + + def set_timeout(self, timeout): + self._timeout = timeout; def disconnect(self, path=''): if not path: @@ -31,15 +36,20 @@ class _AbstractTransport(object): if not self.connected: return if path: - self.send_packet(0, path) + self.send_packet(PacketType.CLOSE, path) else: self.close() def connect(self, path): - self.send_packet(1, path) + if path != "": + _log.debug("Connecting to path: %s" % path); + data = Message(MessageType.CONNECT, path).encode_as_string(); + self.send_packet(PacketType.MESSAGE, path, data); + else: + self.send_packet(PacketType.OPEN, path, data); def send_heartbeat(self): - self.send_packet(2) + self.send_packet(PacketType.PING) def message(self, path, data, callback): if isinstance(data, basestring): @@ -50,8 +60,8 @@ class _AbstractTransport(object): self.send_packet(code, path, data, callback) def emit(self, path, event, args, callback): - data = json.dumps(dict(name=event, args=args), ensure_ascii=False) - self.send_packet(5, path, data, callback) + message = Message(MessageType.EVENT, [event, args], path); + self.send_packet(PacketType.MESSAGE, path, message.encode_as_json(), callback) def ack(self, path, packet_id, *args): packet_id = packet_id.rstrip('+') @@ -59,15 +69,13 @@ class _AbstractTransport(object): packet_id, json.dumps(args, ensure_ascii=False), ) if args else packet_id - self.send_packet(6, path, data) + #self.send_packet(6, path, data) def noop(self, path=''): - self.send_packet(8, path) + self.send_packet(PacketType.NOOP, path) def send_packet(self, code, path='', data='', callback=None): - packet_id = self.set_ack_callback(callback) if callback else '' - packet_parts = str(code), packet_id, path, data - packet_text = ':'.join(packet_parts) + packet_text = parser.encode_packet_string(code, path, data); self.send(packet_text) _log.debug('[packet sent] %s', packet_text) @@ -77,22 +85,48 @@ class _AbstractTransport(object): yield self._packets.pop(0) except IndexError: pass - for packet_text in self.recv(): - _log.debug('[packet received] %s', packet_text) + for response in self.recv(): + _log.debug('[packet received] %s', response.text); try: - #packet = parser.decode_response(packet_text); - packet_parts = packet_text.split(':', 3) + packet = parser.decode_response(response); except AttributeError: - _log.warn('[packet error] %s', packet_text) + _log.warn('[packet error] %s', response.text) continue - code, packet_id, path, data = None, None, None, None - packet_count = len(packet_parts) - if 4 == packet_count: - code, packet_id, path, data = packet_parts - elif 3 == packet_count: - code, packet_id, path = packet_parts - elif 1 == packet_count: - code = packet_parts[0] + code, packet_id, path, data = None, None, '', None + + if packet.type is PacketType.OPEN: + code = '1'; + continue; + elif packet.type is PacketType.CLOSE: + code = '0'; + elif packet.type is PacketType.PING: + code = '2'; + elif packet.type is PacketType.PONG: + code = '2'; + elif packet.type is PacketType.UPGRADE: + _log.warn("Don't know how to handle upgrade packets"); + yield code, packet_id, path, data; + elif packet.type is PacketType.NOOP: + code = '8'; + elif packet.type is PacketType.MESSAGE: + if packet.payload.type is MessageType.CONNECT: + code = '1'; + elif packet.payload.type is MessageType.DISCONNECT: + code = '0'; + elif packet.payload.type is MessageType.EVENT: + code = '5'; + data = json.dumps({"name": packet.payload.message[0], "args": []}); + elif packet.payload.type is MessageType.ACK: + code = '6'; + elif packet.payload.type is MessageType.ERROR: + code = '7'; + else: + _log.warn("Don't know how to handle message type: %d" % packet.payload.type); + yield code, packet_id, path, data; + else: + _log.warn("Don't know how to handle packet type: %d" % packet.type); + yield code, packet_id, path, data; + yield code, packet_id, path, data def _enqueue_packet(self, packet): @@ -169,14 +203,16 @@ class _XHR_PollingTransport(_AbstractTransport): def __init__(self, socketIO_session, is_secure, base_url, **kw): super(_XHR_PollingTransport, self).__init__() - self._url = '%s://%s/?transport=polling&sid=%s' % ( + self._url = '%s://%s/?EIO=%d&transport=polling&sid=%s' % ( 'https' if is_secure else 'http', - base_url, socketIO_session.id) + base_url, parser.ENGINE_PROTOCOL, socketIO_session.id) self._connected = True self._http_session = _prepare_http_session(kw) + self._waiting = False; + # Create connection - for packet in self.recv_packet(): - self._enqueue_packet(packet) + #for packet in self.recv_packet(): + # self._enqueue_packet(packet) @property def connected(self): @@ -184,35 +220,54 @@ class _XHR_PollingTransport(_AbstractTransport): @property def _params(self): - return dict(t=int(time.time())) + return dict(t=int(time.time() * 1000)) def send(self, packet_text): - _get_response( - self._http_session.post, - self._url, - params=self._params, - data=packet_text, - timeout=TIMEOUT_IN_SECONDS) + uri = self._url + "&" + '&'.join("%s=%s" % (k, v) for (k, v) in self._params.iteritems()); + response = None; + try: + response = requests.post(uri, data=packet_text); + except requests.exceptions.Timeout as e: + message = 'timed out while sending %s (%s)' % (packet_text, e) + _log.warn(message) + raise TimeoutError(e) + except requests.exceptions.ConnectionError as e: + message = 'disconnected while sending %s (%s)' % (packet_text, e) + _log.warn(message) + raise ConnectionError(message) + except requests.exceptions.SSLError as e: + raise ConnectionError('could not negotiate SSL (%s)' % e) + status = response.status_code + if 200 != status: + raise ConnectionError('unexpected status code (%s)' % status) + return response def recv(self): + if self._waiting: + return; + + self._waiting = True; response = _get_response( self._http_session.get, self._url, - params=self._params, - timeout=TIMEOUT_IN_SECONDS) - #response_text = response.content - response_text = response.text - if not response_text.startswith(BOUNDARY): - yield response_text - return - for packet_text in _yield_text_from_framed_data(response_text): - yield packet_text + params = self._params, + timeout = self._timeout) + + self._waiting = False; + if response is None: + return; + + response_text = response; + #response_text = response.text + #if not response_text.startswith(BOUNDARY): + yield response_text + return + #for packet_text in _yield_text_from_framed_data(response_text): + # yield packet_text def close(self): - _get_response( - self._http_session.get, - self._url, - params=dict(self._params.items() + [('disconnect', True)])) + self.send_packet(41) + self.send_packet(1) self._connected = False @@ -310,8 +365,10 @@ def _yield_text_from_framed_data(framed_data, parse=lambda x: x): def _get_response(request, *args, **kw): + response = None; try: - response = request(*args, **kw) + response = request(*args, **kw); + response.close(); except requests.exceptions.Timeout as e: raise TimeoutError(e) except requests.exceptions.ConnectionError as e: From 57971b5f71f2724630ec81b6dd5c3e0c0ce1e360 Mon Sep 17 00:00:00 2001 From: Sean Arietta Date: Mon, 22 Dec 2014 16:29:58 -0800 Subject: [PATCH 03/22] Added support for websockets via upgrade paradigm. Also added support for series of packets in responses rather than assuming single packets each time. Added support for all message fields in socket.io protocol --- socketIO_client/__init__.py | 104 +++++++++++++++------- socketIO_client/parser.py | 99 ++++++++++++++++++--- socketIO_client/transports.py | 157 ++++++++++------------------------ 3 files changed, 205 insertions(+), 155 deletions(-) diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index bd3556a..36e2496 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -4,6 +4,7 @@ import logging import json import multiprocessing import parser +from parser import Message, Packet, MessageType, PacketType import requests import time @@ -13,7 +14,7 @@ except ImportError: from urlparse import urlparse as parse_url from .exceptions import ConnectionError, TimeoutError, PacketError -from .transports import _get_response, _negotiate_transport, TRANSPORTS +from .transports import _get_response _SocketIOSession = namedtuple('_SocketIOSession', [ @@ -139,11 +140,10 @@ class SocketIO(object): def __init__( self, host, port=None, Namespace=BaseNamespace, - wait_for_connection=True, transports=TRANSPORTS, **kw): + wait_for_connection=True, **kw): self.is_secure, self.base_url = _parse_host(host, port) self.wait_for_connection = wait_for_connection self._namespace_by_path = {} - self.client_supported_transports = transports self.kw = kw # These two fields work to control the heartbeat thread. self.heartbeat_terminator = None; @@ -250,10 +250,15 @@ class SocketIO(object): _log.warn('[packet error] %s', e) def _process_packet(self, packet): - code, packet_id, path, data = packet + code, packet_id, path, data, p = packet namespace = self.get_namespace(path) - delegate = self._get_delegate(code) - delegate(packet, namespace._find_event_callback) + delegate = None; + try: + delegate = self._get_delegate(code) + except: + pass; + if delegate is not None: + delegate(packet, namespace._find_event_callback) def _stop_waiting(self, for_callbacks): # Use __transport to make sure that we do not reconnect inadvertently @@ -300,14 +305,40 @@ class SocketIO(object): _log.warn(warning) return self.__transport - def _get_transport(self): - self.session = _get_socketIO_session(self.is_secure, self.base_url, **self.kw) - _log.debug('[transports available] %s', ' '.join(self.session.server_supported_transports)) + def _upgrade(self): + websocket = transports.WebsocketTransport(self.session, self.is_secure, self.base_url, **self.kw); + websocket.send_packet(PacketType.PING, "", "probe"); + for packet in websocket.recv_packet(): + _log.debug("[websocket] Packet: %s" % str(packet)); + (code, packet_id, path, data, p) = packet; + if code == PacketType.PONG: + packet = p; + _log.debug("[PONG] %s" % repr(packet)); - # Negotiate transport - transport = _negotiate_transport( - self.client_supported_transports, self.session, - self.is_secure, self.base_url, **self.kw) + self.heartbeat_terminator.set(); + + # Technically we would need to pause the current + # transport (which should be polling in this + # implementation), but since we haven't actually + # started a polling yet, we can upgrade without that. + _log.debug("[upgrading] Sending upgrade request"); + websocket.send_packet(PacketType.UPGRADE); + self._start_heartbeat(websocket); + return websocket; + + def _start_heartbeat(self, transport): + _log.debug("[start heartbeat pacemaker]"); + self.heartbeat_terminator = multiprocessing.Event(); + self.heartbeat_thread = multiprocessing.Process( + target = _make_heartbeat_pacemaker, + args = (self.heartbeat_terminator, transport, self.session.heartbeat_interval / 2)); + self.heartbeat_thread.start(); + + def _get_transport(self): + self.session = _get_socketIO_session(self.is_secure, self.base_url, **self.kw); + + # Negotiate initial transport + transport = transports.XHR_PollingTransport(self.session, self.is_secure, self.base_url, **self.kw); # Update namespaces for path, namespace in self._namespace_by_path.items(): namespace._transport = transport @@ -316,12 +347,17 @@ class SocketIO(object): transport.set_timeout(self.session.connection_timeout); # Start the heartbeat pacemaker (PING). - _log.debug("[start heartbeat pacemaker]"); - self.heartbeat_terminator = multiprocessing.Event(); - self.heartbeat_thread = multiprocessing.Process( - target = _make_heartbeat_pacemaker, - args = (self.heartbeat_terminator, transport, self.session.heartbeat_interval / 2)); - self.heartbeat_thread.start(); + self._start_heartbeat(transport); + + # If websocket is available, upgrade to it immediately. + # TODO(sean): We could run this on a separate thread for + # maximum efficiency although that would require some + # synchronization to ensure buffers are flushed, etc. + if "websocket" in self.session.server_supported_transports: + try: + return self._upgrade(); + except: + pass; return transport @@ -371,10 +407,19 @@ class SocketIO(object): find_event_callback('message')(*args) def _on_event(self, packet, find_event_callback): - code, packet_id, path, data = packet - value_by_name = json.loads(data) - event = value_by_name['name'] - args = value_by_name.get('args', []) + code, packet_id, path, data, p = packet + packet = p; + + + # Accoding to the documentation + # (https://github.com/automattic/socket.io-protocol#event), + # the event name is the first entry in the message array, and + # the arguments are the rest of the entries. + event = packet.payload.message[0]; + args = packet.payload.message[1:] if len(packet.payload.message) > 1 else []; + + _log.debug("[event] %s (%s)" % (repr(event), repr(args))); + if packet_id: args.append(self._prepare_to_send_ack(path, packet_id)) find_event_callback(event)(*args) @@ -455,12 +500,11 @@ def _get_socketIO_session(is_secure, base_url, **kw): raise ConnectionError(e) _log.debug("[response] %s", response.text); - packet = parser.decode_response(response); - _log.debug("[decoded] %s", repr(packet)); - - if packet.type is not parser.PacketType.OPEN: - _log.warn("Got unexpected packet during connection handshake: %d" % packet.type); - return None; + for packet in parser.decode_response(response): + _log.debug("[decoded] %s", str(packet)); + if packet.type is not parser.PacketType.OPEN: + _log.warn("Got unexpected packet during connection handshake: %d" % packet.type); + return None; handshake = json.loads(packet.payload); @@ -468,7 +512,7 @@ def _get_socketIO_session(is_secure, base_url, **kw): id = handshake["sid"], heartbeat_interval = int(handshake["pingInterval"]) / 1000, connection_timeout = int(handshake["pingTimeout"]) / 1000, - server_supported_transports = ["xhr-polling"]);#decoded["payload"]["upgrades"]); + server_supported_transports = handshake["upgrades"]); def _make_heartbeat_pacemaker(terminator, transport, heartbeat_interval): while True: diff --git a/socketIO_client/parser.py b/socketIO_client/parser.py index 6896c40..3e657d0 100644 --- a/socketIO_client/parser.py +++ b/socketIO_client/parser.py @@ -29,8 +29,11 @@ class Packet(): self.type = packet_type; self.payload = payload; + def __str__(self): + return "PACKET{type: " + str(self.type) + ", payload: " + str(self.payload) + "}"; + class Message(): - def __init__(self, message_type, message, path = ""): + def __init__(self, message_type, message, path = "", attachments = "", message_id = None): self.type = message_type; if isinstance(message, basestring): try: @@ -41,6 +44,23 @@ class Message(): self.message = message; self.path = path; + self.attachments = attachments; + self.id = message_id; + + def __str__(self): + if self.id is not None: + return "MESSAGE{" + \ + "id: " + str(self.id) + ", " + \ + "type: " + str(self.type) + ", " + \ + "message: " + str(self.message) + ", " + \ + "path: " + self.path + \ + "}"; + else: + return "MESSAGE{" + \ + "type: " + str(self.type) + ", " + \ + "message: " + str(self.message) + ", " + \ + "path: " + self.path + \ + "}"; def encode_as_json(self): """Encodes a Message to be sent to socket.io server. @@ -61,17 +81,78 @@ def decode_message(payload): """ Decodes a message encoded via socket.io """ - message_type = int(payload[0]); - message = payload[1:]; + _log.debug("[decode payload] %s" % repr(payload)); - return Message(message_type, message); + i = 0; + message_type = int(payload[i]); + message = ""; + path = ""; + attachments = ""; + message_id = None; + + i += 1; + + if len(payload) > i: + if message_type == MessageType.BINARY_EVENT or message_type == MessageType.BINARY_ACK: + while (payload[i] != "-"): + attachments += payload[i]; + i += 1; + + if len(payload) > i: + # This is kind of odd, but it is how socket.io-parser works (see + # https://github.com/Automattic/socket.io-parser/blob/master/index.js#L292 + # @0ae9a4f). + if payload[i] == "/": + if "," in payload: + split_point = payload.index(","); + path = payload[i:split_point]; + i += split_point; + else: + path = payload[i:]; + i += len(path); + + if len(payload) > i: + # This is the same pecularity as above. + if "," in payload[i:]: + split_point = payload.index(","); + message_id = int(payload[i:split_point]); + i += split_point; + + if len(payload) > i: + message = payload[i:]; + + return Message(message_type, message, path, attachments, message_id); def decode_response(response): """Decodes a response from requests lib. """ # TODO(sean): Should we use the 'raw' stream instead? - return decode_packet(response.content); + if isinstance(response, basestring): + _log.debug("[decode response (string)] Response: %s" % str(response)); + packet = decode_packet_string(response); + yield packet; + else: + content = response.content; + total_length = len(content); + processed = 0; + while processed < total_length: + _log.debug("[decode response] Content: %s" % str(content)); + (read, packet) = decode_packet(content); + content = content[read:]; + processed += read; + yield packet; + + +def decode_packet_string(packet): + packet_type = int(packet[0]); + payload = packet[1:]; + + if packet_type == PacketType.MESSAGE: + message = decode_message(payload); + payload = message; + + return Packet(packet_type, payload); def decode_packet(packet): """Decodes a packet sent via engine.io. @@ -108,15 +189,11 @@ def decode_packet(packet): if packet_type is PacketType.MESSAGE: message = decode_message(payload); payload = message; - - return Packet(packet_type, payload); + + return offset + length, Packet(packet_type, payload); else: pass; - return ""; - - pass; - def encode_packet_string(code, path, data): """Encodes packet to be sent to socket.io server. """ diff --git a/socketIO_client/transports.py b/socketIO_client/transports.py index a2f37e8..83eb019 100644 --- a/socketIO_client/transports.py +++ b/socketIO_client/transports.py @@ -45,6 +45,19 @@ class _AbstractTransport(object): _log.debug("Connecting to path: %s" % path); data = Message(MessageType.CONNECT, path).encode_as_string(); self.send_packet(PacketType.MESSAGE, path, data); + + # Wait for response. + responded = False; + while not responded: + for packet in self.recv_packet(): + _log.debug("[connect wait] Waiting for confirmation"); + (code, packet_id, ignore, data, p) = packet; + packet = p; + if (packet.type == PacketType.MESSAGE + and packet.payload.type == MessageType.CONNECT + and packet.payload.path == path): + _log.debug("Connected to path: %s" % path); + responded = True; else: self.send_packet(PacketType.OPEN, path, data); @@ -85,27 +98,20 @@ class _AbstractTransport(object): yield self._packets.pop(0) except IndexError: pass - for response in self.recv(): - _log.debug('[packet received] %s', response.text); - try: - packet = parser.decode_response(response); - except AttributeError: - _log.warn('[packet error] %s', response.text) - continue + for packet in self.recv(): code, packet_id, path, data = None, None, '', None if packet.type is PacketType.OPEN: code = '1'; - continue; elif packet.type is PacketType.CLOSE: code = '0'; elif packet.type is PacketType.PING: code = '2'; elif packet.type is PacketType.PONG: - code = '2'; + code = PacketType.PONG; elif packet.type is PacketType.UPGRADE: _log.warn("Don't know how to handle upgrade packets"); - yield code, packet_id, path, data; + yield code, packet_id, path, data, packet; elif packet.type is PacketType.NOOP: code = '8'; elif packet.type is PacketType.MESSAGE: @@ -122,12 +128,12 @@ class _AbstractTransport(object): code = '7'; else: _log.warn("Don't know how to handle message type: %d" % packet.payload.type); - yield code, packet_id, path, data; + yield code, packet_id, path, data, packet; else: _log.warn("Don't know how to handle packet type: %d" % packet.type); - yield code, packet_id, path, data; + yield code, packet_id, path, data, packet; - yield code, packet_id, path, data + yield code, packet_id, path, data, packet def _enqueue_packet(self, packet): self._packets.append(packet) @@ -149,15 +155,17 @@ class _AbstractTransport(object): return True if self._callback_by_packet_id else False -class _WebsocketTransport(_AbstractTransport): +class WebsocketTransport(_AbstractTransport): def __init__(self, socketIO_session, is_secure, base_url, **kw): - super(_WebsocketTransport, self).__init__() - url = '%s://%s/websocket/%s' % ( + super(WebsocketTransport, self).__init__() + + self._url = '%s://%s/?EIO=%d&transport=websocket&sid=%s' % ( 'wss' if is_secure else 'ws', - base_url, socketIO_session.id) + base_url, parser.ENGINE_PROTOCOL, socketIO_session.id) + try: - self._connection = websocket.create_connection(url) + self._connection = websocket.create_connection(self._url) except socket.timeout as e: raise ConnectionError(e) except socket.error as e: @@ -168,6 +176,11 @@ class _WebsocketTransport(_AbstractTransport): def connected(self): return self._connection.connected + def send_packet(self, code, path="", data='', callback=None): + packet_text = Message(code, data).encode_as_string(); + self.send(packet_text) + _log.debug('[packet sent] %s', packet_text) + def send(self, packet_text): try: self._connection.send(packet_text) @@ -182,7 +195,14 @@ class _WebsocketTransport(_AbstractTransport): def recv(self): try: - yield self._connection.recv() + response = self._connection.recv(); + try: + for packet in parser.decode_response(response): + _log.debug('[websocket packet received] %s', str(packet)); + yield packet; + except AttributeError: + _log.warn('[packet error] %s', repr(response)) + return; except websocket.WebSocketTimeoutException as e: raise TimeoutError(e) except websocket.SSLError as e: @@ -199,10 +219,10 @@ class _WebsocketTransport(_AbstractTransport): self._connection.close() -class _XHR_PollingTransport(_AbstractTransport): +class XHR_PollingTransport(_AbstractTransport): def __init__(self, socketIO_session, is_secure, base_url, **kw): - super(_XHR_PollingTransport, self).__init__() + super(XHR_PollingTransport, self).__init__() self._url = '%s://%s/?EIO=%d&transport=polling&sid=%s' % ( 'https' if is_secure else 'http', base_url, parser.ENGINE_PROTOCOL, socketIO_session.id) @@ -210,10 +230,6 @@ class _XHR_PollingTransport(_AbstractTransport): self._http_session = _prepare_http_session(kw) self._waiting = False; - # Create connection - #for packet in self.recv_packet(): - # self._enqueue_packet(packet) - @property def connected(self): return self._connected @@ -257,102 +273,15 @@ class _XHR_PollingTransport(_AbstractTransport): if response is None: return; - response_text = response; - #response_text = response.text - #if not response_text.startswith(BOUNDARY): - yield response_text + for packet in parser.decode_response(response): + yield packet; return - #for packet_text in _yield_text_from_framed_data(response_text): - # yield packet_text def close(self): self.send_packet(41) self.send_packet(1) self._connected = False - -class _JSONP_PollingTransport(_AbstractTransport): - - RESPONSE_PATTERN = re.compile(r'io.j\[(\d+)\]\("(.*)"\);') - - def __init__(self, socketIO_session, is_secure, base_url, **kw): - super(_JSONP_PollingTransport, self).__init__() - self._url = '%s://%s/jsonp-polling/%s' % ( - 'https' if is_secure else 'http', - base_url, socketIO_session.id) - self._connected = True - self._http_session = _prepare_http_session(kw) - self._id = 0 - # Create connection - for packet in self.recv_packet(): - self._enqueue_packet(packet) - - @property - def connected(self): - return self._connected - - @property - def _params(self): - return dict(t=int(time.time()), i=self._id) - - def send(self, packet_text): - _get_response( - self._http_session.post, - self._url, - params=self._params, - data='d=%s' % requests.utils.quote(json.dumps(packet_text)), - headers={'content-type': 'application/x-www-form-urlencoded'}, - timeout=TIMEOUT_IN_SECONDS) - - def recv(self): - 'Decode the JavaScript response so that we can parse it as JSON' - response = _get_response( - self._http_session.get, - self._url, - params=self._params, - headers={'content-type': 'text/javascript; charset=UTF-8'}, - timeout=TIMEOUT_IN_SECONDS) - response_text = response.text - try: - self._id, response_text = self.RESPONSE_PATTERN.match( - response_text).groups() - except AttributeError: - _log.warn('[packet error] %s', response_text) - return - if not response_text.startswith(BOUNDARY): - yield response_text.decode('unicode_escape') - return - for packet_text in _yield_text_from_framed_data( - response_text, parse=lambda x: x.decode('unicode_escape')): - yield packet_text - - def close(self): - _get_response( - self._http_session.get, - self._url, - params=dict(self._params.items() + [('disconnect', True)])) - self._connected = False - - -def _negotiate_transport( - client_supported_transports, session, - is_secure, base_url, **kw): - server_supported_transports = session.server_supported_transports - for supported_transport in client_supported_transports: - if supported_transport in server_supported_transports: - _log.debug('[transport selected] %s', supported_transport) - return { - 'websocket': _WebsocketTransport, - 'xhr-polling': _XHR_PollingTransport, - 'jsonp-polling': _JSONP_PollingTransport, - }[supported_transport](session, is_secure, base_url, **kw) - raise SocketIOError(' '.join([ - 'could not negotiate a transport:', - 'client supports %s but' % ', '.join(client_supported_transports), - 'server supports %s' % ', '.join(server_supported_transports), - ])) - - def _yield_text_from_framed_data(framed_data, parse=lambda x: x): parts = [parse(x) for x in framed_data.split(BOUNDARY)] for text_length, text in zip(parts[1::2], parts[2::2]): From 07a5cc4c63d85839ce838c93776bcf233032a172 Mon Sep 17 00:00:00 2001 From: Sean Arietta Date: Mon, 22 Dec 2014 16:49:16 -0800 Subject: [PATCH 04/22] Events are working on a per-namespace basis now --- socketIO_client/__init__.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index 36e2496..794e158 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -410,7 +410,6 @@ class SocketIO(object): code, packet_id, path, data, p = packet packet = p; - # Accoding to the documentation # (https://github.com/automattic/socket.io-protocol#event), # the event name is the first entry in the message array, and @@ -420,9 +419,12 @@ class SocketIO(object): _log.debug("[event] %s (%s)" % (repr(event), repr(args))); - if packet_id: + if packet.payload.id is not None: args.append(self._prepare_to_send_ack(path, packet_id)) - find_event_callback(event)(*args) + try: + self._namespace_by_path[packet.payload.path]._find_event_callback(event)(*args); + except KeyError: + _log.error("Could not handle event for unknown path: %s" % packet.payload.path); def _on_ack(self, packet, find_event_callback): code, packet_id, path, data = packet From 16c64437d318bcac87e1ff21b9d7304cf2b4327e Mon Sep 17 00:00:00 2001 From: Sean Arietta Date: Mon, 22 Dec 2014 17:43:42 -0800 Subject: [PATCH 05/22] Medium-sized refactor. Converted all methods away from old 'code' paradigm to use the PacketType and MessageType enums directly --- socketIO_client/__init__.py | 151 +++++++++++++++++++--------------- socketIO_client/transports.py | 38 +-------- 2 files changed, 87 insertions(+), 102 deletions(-) diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index 794e158..4552103 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -67,14 +67,6 @@ class BaseNamespace(object): 'Called after server disconnects; you can override this method' _log.debug('%s [disconnect]', self.path) - def on_heartbeat(self): - 'Called after server sends a heartbeat; you can override this method' - _log.debug('%s [heartbeat]', self.path) - - def on_message(self, data): - 'Called after server sends a message; you can override this method' - _log.info('%s [message] %s', self.path, data) - def on_event(self, event, *args): """ Called after server sends an event; you can override this method. @@ -88,14 +80,22 @@ class BaseNamespace(object): callback(*args) _log.info('%s [event] %s(%s)', self.path, event, ', '.join(arguments)) - def on_error(self, reason, advice): + def on_error(self, reason): 'Called after server sends an error; you can override this method' - _log.info('%s [error] %s', self.path, advice) + _log.info('%s [error] %s', self.path, reason) def on_noop(self): 'Called after server sends a noop; you can override this method' _log.info('%s [noop]', self.path) + def on_ping(self): + 'Called after server sends a ping; you can override this method' + _log.info('%s [ping]', self.path) + + def on_pong(self): + 'Called after server sends a pong; you can override this method' + _log.info('%s [pong]', self.path) + def on_open(self, *args): _log.info('%s [open] %s', self.path, args) @@ -249,13 +249,49 @@ class SocketIO(object): except PacketError as e: _log.warn('[packet error] %s', e) + def _get_message_delegate(self, code): + try: + return { + MessageType.CONNECT: self._on_connect, + MessageType.DISCONNECT: self._on_disconnect, + MessageType.EVENT: self._on_event, + MessageType.ACK: self._on_ack, + MessageType.ERROR: self._on_error, + MessageType.BINARY_EVENT: self._on_binary_event, + MessageType.BINARY_ACK: self._on_binary_ack + }[code] + except KeyError: + raise PacketError('unexpected code (%s)' % code) + + def _get_packet_delegate(self, code): + try: + return { + PacketType.OPEN: self._on_open, + PacketType.CLOSE: self._on_close, + PacketType.PING: self._on_ping, + PacketType.PONG: self._on_pong, + #PacketType.MESSAGE: self._on_message, Handled by other delegates + PacketType.UPGRADE: self._on_upgrade, + PacketType.NOOP: self._on_noop + }[code] + except KeyError: + raise PacketError('unexpected code (%s)' % code) + def _process_packet(self, packet): - code, packet_id, path, data, p = packet + _log.debug("[process packet] %s" % str(packet)); + path = packet.payload.path if packet.type == PacketType.MESSAGE else ""; namespace = self.get_namespace(path) + code = packet.payload.type if packet.type == PacketType.MESSAGE else packet.type; + delegate = None; try: - delegate = self._get_delegate(code) - except: + if packet.type == PacketType.MESSAGE: + _log.debug("[process packet] Handling message"); + delegate = self._get_message_delegate(packet.payload.type); + else: + delegate = self._get_packet_delegate(packet.type); + except Exception as e: + _log.warn("[process packet] Could not find delegate for packet: " + str(e)); pass; if delegate is not None: delegate(packet, namespace._find_event_callback) @@ -310,9 +346,7 @@ class SocketIO(object): websocket.send_packet(PacketType.PING, "", "probe"); for packet in websocket.recv_packet(): _log.debug("[websocket] Packet: %s" % str(packet)); - (code, packet_id, path, data, p) = packet; - if code == PacketType.PONG: - packet = p; + if packet.type == PacketType.PONG: _log.debug("[PONG] %s" % repr(packet)); self.heartbeat_terminator.set(); @@ -367,49 +401,39 @@ class SocketIO(object): except KeyError: raise PacketError('unexpected namespace path (%s)' % path) - def _get_delegate(self, code): - try: - return { - '0': self._on_disconnect, - '1': self._on_connect, - '2': self._on_heartbeat, - '3': self._on_message, - '4': self._on_json, - '5': self._on_event, - '6': self._on_ack, - '7': self._on_error, - '8': self._on_noop, - }[code] - except KeyError: - raise PacketError('unexpected code (%s)' % code) + ################################################################# + # Handlers for EngineIO packet types (PacketType in parser.py) + ################################################################# - def _on_disconnect(self, packet, find_event_callback): - find_event_callback('disconnect')() + def _on_open(self, packet, find_event_callback): + find_event_callback('open')() + + def _on_close(self, packet, find_event_callback): + find_event_callback('close')() + + def _on_ping(self, packet, find_event_callback): + find_event_callback('ping')() + + def _on_pong(self, packet, find_event_callback): + find_event_callback('pong')() + + def _on_upgrade(self, packet, find_event_callback): + find_event_callback('close')() + + def _on_noop(self, packet, find_event_callback): + find_event_callback('noop')() + + ################################################################# + # Handlers for SocketIO "packet" types (MessageType in parser.py) + ################################################################# def _on_connect(self, packet, find_event_callback): find_event_callback('connect')() - def _on_heartbeat(self, packet, find_event_callback): - find_event_callback('heartbeat')() - - def _on_message(self, packet, find_event_callback): - code, packet_id, path, data = packet - args = [data] - if packet_id: - args.append(self._prepare_to_send_ack(path, packet_id)) - find_event_callback('message')(*args) - - def _on_json(self, packet, find_event_callback): - code, packet_id, path, data = packet - args = [json.loads(data)] - if packet_id: - args.append(self._prepare_to_send_ack(path, packet_id)) - find_event_callback('message')(*args) + def _on_disconnect(self, packet, find_event_callback): + find_event_callback('disconnect')() def _on_event(self, packet, find_event_callback): - code, packet_id, path, data, p = packet - packet = p; - # Accoding to the documentation # (https://github.com/automattic/socket.io-protocol#event), # the event name is the first entry in the message array, and @@ -421,29 +445,26 @@ class SocketIO(object): if packet.payload.id is not None: args.append(self._prepare_to_send_ack(path, packet_id)) - try: - self._namespace_by_path[packet.payload.path]._find_event_callback(event)(*args); - except KeyError: - _log.error("Could not handle event for unknown path: %s" % packet.payload.path); + find_event_callback(event)(*args); def _on_ack(self, packet, find_event_callback): - code, packet_id, path, data = packet - data_parts = data.split('+', 1) - packet_id = data_parts[0] + event = packet.payload.message[0]; + args = packet.payload.message[1:] if len(packet.payload.message) > 1 else []; + packet_id = packet.payload.id; try: ack_callback = self._transport.get_ack_callback(packet_id) except KeyError: return - args = json.loads(data_parts[1]) if len(data_parts) > 1 else [] ack_callback(*args) def _on_error(self, packet, find_event_callback): - code, packet_id, path, data = packet - reason, advice = data.split('+', 1) - find_event_callback('error')(reason, advice) + find_event_callback('error')(packet.payload.message) - def _on_noop(self, packet, find_event_callback): - find_event_callback('noop')() + def _on_binary_event(self, packet, find_event_callback): + raise PacketError("Don't know how to handle binary events yet"); + + def _on_binary_ack(self, packet, find_event_callback): + raise PacketError("Don't know how to handle binary acks yet"); def _prepare_to_send_ack(self, path, packet_id): 'Return function that acknowledges the server' diff --git a/socketIO_client/transports.py b/socketIO_client/transports.py index 83eb019..32a03f1 100644 --- a/socketIO_client/transports.py +++ b/socketIO_client/transports.py @@ -51,8 +51,6 @@ class _AbstractTransport(object): while not responded: for packet in self.recv_packet(): _log.debug("[connect wait] Waiting for confirmation"); - (code, packet_id, ignore, data, p) = packet; - packet = p; if (packet.type == PacketType.MESSAGE and packet.payload.type == MessageType.CONNECT and packet.payload.path == path): @@ -99,41 +97,7 @@ class _AbstractTransport(object): except IndexError: pass for packet in self.recv(): - code, packet_id, path, data = None, None, '', None - - if packet.type is PacketType.OPEN: - code = '1'; - elif packet.type is PacketType.CLOSE: - code = '0'; - elif packet.type is PacketType.PING: - code = '2'; - elif packet.type is PacketType.PONG: - code = PacketType.PONG; - elif packet.type is PacketType.UPGRADE: - _log.warn("Don't know how to handle upgrade packets"); - yield code, packet_id, path, data, packet; - elif packet.type is PacketType.NOOP: - code = '8'; - elif packet.type is PacketType.MESSAGE: - if packet.payload.type is MessageType.CONNECT: - code = '1'; - elif packet.payload.type is MessageType.DISCONNECT: - code = '0'; - elif packet.payload.type is MessageType.EVENT: - code = '5'; - data = json.dumps({"name": packet.payload.message[0], "args": []}); - elif packet.payload.type is MessageType.ACK: - code = '6'; - elif packet.payload.type is MessageType.ERROR: - code = '7'; - else: - _log.warn("Don't know how to handle message type: %d" % packet.payload.type); - yield code, packet_id, path, data, packet; - else: - _log.warn("Don't know how to handle packet type: %d" % packet.type); - yield code, packet_id, path, data, packet; - - yield code, packet_id, path, data, packet + yield packet def _enqueue_packet(self, packet): self._packets.append(packet) From 5c1d38ac861d0a8d6f5ac279882ad18929aabb2d Mon Sep 17 00:00:00 2001 From: Sean Arietta Date: Mon, 22 Dec 2014 18:39:58 -0800 Subject: [PATCH 06/22] Implemented ack callbacks --- socketIO_client/__init__.py | 2 +- socketIO_client/parser.py | 63 +++++++++++++++++++++++++++-------- socketIO_client/transports.py | 26 +++++++++------ 3 files changed, 66 insertions(+), 25 deletions(-) diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index 4552103..05492d9 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -444,7 +444,7 @@ class SocketIO(object): _log.debug("[event] %s (%s)" % (repr(event), repr(args))); if packet.payload.id is not None: - args.append(self._prepare_to_send_ack(path, packet_id)) + args.append(self._prepare_to_send_ack(packet.payload.path, packet.payload.id)) find_event_callback(event)(*args); def _on_ack(self, packet, find_event_callback): diff --git a/socketIO_client/parser.py b/socketIO_client/parser.py index 3e657d0..8fe06b1 100644 --- a/socketIO_client/parser.py +++ b/socketIO_client/parser.py @@ -32,6 +32,27 @@ class Packet(): def __str__(self): return "PACKET{type: " + str(self.type) + ", payload: " + str(self.payload) + "}"; + def encode_as_string(self, for_websocket = False): + data = ""; + path = ""; + if self.type == PacketType.MESSAGE: + data = self.payload.encode_as_string(); + path = self.payload.path; + else: + data = self.payload; + + code_length = len(str(self.type)); + data_length = len(data); + length = code_length + data_length; + + encoded = ""; + if for_websocket: + encoded = str(self.type) + str(data); + else: + encoded = str(length) + ":" + str(self.type) + str(data); + + return encoded; + class Message(): def __init__(self, message_type, message, path = "", attachments = "", message_id = None): self.type = message_type; @@ -67,15 +88,30 @@ class Message(): Assumes the message payload will be dumped as a json string. """ + data = json.dumps(self.message); + if self.id is not None: + data = str(self.id) + json.dumps(self.message); + if self.path == "": - return str(self.type) + json.dumps(self.message); - return str(self.type) + self.path + "," + json.dumps(self.message); + return str(self.type) + data; + return str(self.type) + self.path + "," + data; def encode_as_string(self): """Same as the encode_as_string method except it doesn't encode things as a JSON string""" + data = self.message; + if self.id is not None: + data = str(self.id) + self.message; + if self.path == "": - return str(self.type) + self.message; - return str(self.type) + self.path + "," + self.message; + return str(self.type) + data; + return str(self.type) + self.path + "," + data; + +def _is_integer(s): + try: + int(s); + except ValueError: + return False; + return True; def decode_message(payload): """ Decodes a message encoded via socket.io @@ -112,11 +148,15 @@ def decode_message(payload): i += len(path); if len(payload) > i: - # This is the same pecularity as above. - if "," in payload[i:]: - split_point = payload.index(","); - message_id = int(payload[i:split_point]); - i += split_point; + # This is another oddity. According to the socket.io-parser we + # need to loop over the next chars until we stop finding ints + # to determine if there is a message id. + message_id_str = ""; + while _is_integer(payload[i]): + message_id_str += payload[i]; + i += 1; + if message_id_str != "": + message_id = int(message_id_str); if len(payload) > i: message = payload[i:]; @@ -198,8 +238,3 @@ def encode_packet_string(code, path, data): """Encodes packet to be sent to socket.io server. """ - code_length = len(str(code)); - data_length = len(data); - length = code_length + data_length; - - return str(length) + ":" + str(code) + str(data); diff --git a/socketIO_client/transports.py b/socketIO_client/transports.py index 32a03f1..65be157 100644 --- a/socketIO_client/transports.py +++ b/socketIO_client/transports.py @@ -1,7 +1,7 @@ import json import logging import parser -from parser import Message, MessageType, PacketType +from parser import Message, Packet, MessageType, PacketType import re import requests import six @@ -75,18 +75,16 @@ class _AbstractTransport(object): self.send_packet(PacketType.MESSAGE, path, message.encode_as_json(), callback) def ack(self, path, packet_id, *args): - packet_id = packet_id.rstrip('+') - data = '%s+%s' % ( - packet_id, - json.dumps(args, ensure_ascii=False), - ) if args else packet_id - #self.send_packet(6, path, data) + _log.debug("[ack] Sending ACK for packet: %d" % packet_id); + message = Message(MessageType.ACK, "", path, "", packet_id); + packet = Packet(PacketType.MESSAGE, message); + self.send_engineio_packet(packet) def noop(self, path=''): self.send_packet(PacketType.NOOP, path) def send_packet(self, code, path='', data='', callback=None): - packet_text = parser.encode_packet_string(code, path, data); + packet_text = Packet(code, data).encode_as_string(); self.send(packet_text) _log.debug('[packet sent] %s', packet_text) @@ -140,11 +138,19 @@ class WebsocketTransport(_AbstractTransport): def connected(self): return self._connection.connected - def send_packet(self, code, path="", data='', callback=None): - packet_text = Message(code, data).encode_as_string(); + def send_message(self, message, callback = None): + packet_text = message.encode_as_string(); self.send(packet_text) _log.debug('[packet sent] %s', packet_text) + def send_engineio_packet(self, packet, callback=None): + packet_text = packet.encode_as_string(for_websocket = True); + self.send(packet_text) + _log.debug('[packet sent] %s', packet_text) + + def send_packet(self, code, path="", data='', callback=None): + self.send_message(Message(code, data), callback); + def send(self, packet_text): try: self._connection.send(packet_text) From 2d1257bf8f241a2b831198c1b92bf3d4e625b7f1 Mon Sep 17 00:00:00 2001 From: Sean Arietta Date: Mon, 22 Dec 2014 21:46:05 -0800 Subject: [PATCH 07/22] Client now attempts to reconnect forever on server disconnect and correctly reconnects --- socketIO_client/__init__.py | 55 ++++++++++++++++++++++++++--------- socketIO_client/parser.py | 5 ++-- socketIO_client/transports.py | 19 ++++++------ 3 files changed, 55 insertions(+), 24 deletions(-) diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index 05492d9..30a8643 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -145,19 +145,26 @@ class SocketIO(object): self.wait_for_connection = wait_for_connection self._namespace_by_path = {} self.kw = kw + + self.__transport = None; + # These two fields work to control the heartbeat thread. self.heartbeat_terminator = None; self.heartbeat_thread = None; + # Saved session information. self.session = None; + # This is stores the set of paths (namespaces) that need to be # reconnected to. self.reconnect_paths = {}; + # This sets of a chain of events that attempts to connect to # the server at the base namespace. self.define(Namespace) def __enter__(self): + _log.debug("[enter]"); return self def __exit__(self, *exception_pack): @@ -193,9 +200,19 @@ class SocketIO(object): self._transport.emit(path, event, args, callback) def reconnect(self): - """Reconnects to a set of namespaces. + """Reconnects the client. + + Reconnects to the server and connects to the previously + connected set of namespaces. """ + _log.debug(" [reconnect attempt]"); + + # Reconnect to the server. + if self.__transport is not None: + self.__transport.close(); + self.__transport = self._get_transport(); + for path in self.reconnect_paths: # We avoid reconnecting to the default namespace because # socketIO_client connects to that already. @@ -203,6 +220,10 @@ class SocketIO(object): continue; _log.debug("Reconnecting to path: %s" % repr(path)) self._transport.connect(path); + # Restore paths. + self._namespace_by_path = copy.copy(self.reconnect_paths); + for namespace in self._namespace_by_path: + self._namespace_by_path[namespace]._transport = self.__transport; self.reconnect_paths = {}; def wait(self, seconds=None, for_callbacks=False): @@ -212,6 +233,15 @@ class SocketIO(object): """ warning_screen = _yield_warning_screen(seconds) for elapsed_time in warning_screen: + # We will end up here in the case that we + # disconnected. + if len(self.reconnect_paths) > 0: + try: + self.reconnect(); + except ConnectionError as e: + time.sleep(1); + continue; + try: try: self._process_events() @@ -220,26 +250,23 @@ class SocketIO(object): if self._stop_waiting(for_callbacks): break - # We will end up here in the case that we - # disconnected, then reconnected AND we were - # successful. - if len(self.reconnect_paths) > 0: - self.reconnect(); except ConnectionError as e: try: # This is where we end up if the connection was # severed. The client will disconnect here. if len(self.reconnect_paths) is 0: - self.reconnect_paths = copy.deepcopy(self._namespace_by_path); + self.reconnect_paths = copy.copy(self._namespace_by_path); - self._terminate_heartbeat(); + self._terminate_heartbeat(); + + for namespace in self.reconnect_paths: + self.disconnect(namespace); warning = Exception('[connection error] %s' % e) - self._transport._connected = False; warning_screen.throw(warning) except StopIteration: _log.warn(warning) - self.disconnect() + _log.debug("[wait canceled]"); def _process_events(self): @@ -317,18 +344,19 @@ class SocketIO(object): @property def connected(self): - return self.__transport.connected + return self.__transport.connected if self.__transport is not None else False; @property def _transport(self): try: - if self.connected: + if self.__transport is not None and self.connected: return self.__transport except AttributeError: pass warning_screen = _yield_warning_screen(seconds=None) for elapsed_time in warning_screen: try: + _log.debug("[create transport]"); self.__transport = self._get_transport() break except ConnectionError as e: @@ -349,7 +377,7 @@ class SocketIO(object): if packet.type == PacketType.PONG: _log.debug("[PONG] %s" % repr(packet)); - self.heartbeat_terminator.set(); + self._terminate_heartbeat(); # Technically we would need to pause the current # transport (which should be polling in this @@ -391,6 +419,7 @@ class SocketIO(object): try: return self._upgrade(); except: + _log.warn("[websocket] Failed to upgrade to websocket") pass; return transport diff --git a/socketIO_client/parser.py b/socketIO_client/parser.py index 8fe06b1..39fcb3d 100644 --- a/socketIO_client/parser.py +++ b/socketIO_client/parser.py @@ -35,7 +35,7 @@ class Packet(): def encode_as_string(self, for_websocket = False): data = ""; path = ""; - if self.type == PacketType.MESSAGE: + if self.type == PacketType.MESSAGE and not isinstance(self.payload, basestring): data = self.payload.encode_as_string(); path = self.payload.path; else: @@ -230,8 +230,9 @@ def decode_packet(packet): message = decode_message(payload); payload = message; - return offset + length, Packet(packet_type, payload); + return offset + length - 1, Packet(packet_type, payload); else: + import ipdb; ipdb.set_trace(); pass; def encode_packet_string(code, path, data): diff --git a/socketIO_client/transports.py b/socketIO_client/transports.py index 65be157..2c70cf4 100644 --- a/socketIO_client/transports.py +++ b/socketIO_client/transports.py @@ -41,7 +41,7 @@ class _AbstractTransport(object): self.close() def connect(self, path): - if path != "": + if True or path != "": _log.debug("Connecting to path: %s" % path); data = Message(MessageType.CONNECT, path).encode_as_string(); self.send_packet(PacketType.MESSAGE, path, data); @@ -50,11 +50,11 @@ class _AbstractTransport(object): responded = False; while not responded: for packet in self.recv_packet(): - _log.debug("[connect wait] Waiting for confirmation"); + _log.debug("[connect wait] Waiting for confirmation of connect to: %s" % path); if (packet.type == PacketType.MESSAGE and packet.payload.type == MessageType.CONNECT and packet.payload.path == path): - _log.debug("Connected to path: %s" % path); + _log.debug("[connect] Connected to path: %s" % path); responded = True; else: self.send_packet(PacketType.OPEN, path, data); @@ -78,7 +78,7 @@ class _AbstractTransport(object): _log.debug("[ack] Sending ACK for packet: %d" % packet_id); message = Message(MessageType.ACK, "", path, "", packet_id); packet = Packet(PacketType.MESSAGE, message); - self.send_engineio_packet(packet) + self.send_engineio_packet(packet); def noop(self, path=''): self.send_packet(PacketType.NOOP, path) @@ -86,7 +86,6 @@ class _AbstractTransport(object): def send_packet(self, code, path='', data='', callback=None): packet_text = Packet(code, data).encode_as_string(); self.send(packet_text) - _log.debug('[packet sent] %s', packet_text) def recv_packet(self): try: @@ -127,6 +126,7 @@ class WebsocketTransport(_AbstractTransport): base_url, parser.ENGINE_PROTOCOL, socketIO_session.id) try: + _log.debug("[websocket] Connecting"); self._connection = websocket.create_connection(self._url) except socket.timeout as e: raise ConnectionError(e) @@ -141,17 +141,16 @@ class WebsocketTransport(_AbstractTransport): def send_message(self, message, callback = None): packet_text = message.encode_as_string(); self.send(packet_text) - _log.debug('[packet sent] %s', packet_text) def send_engineio_packet(self, packet, callback=None): packet_text = packet.encode_as_string(for_websocket = True); self.send(packet_text) - _log.debug('[packet sent] %s', packet_text) def send_packet(self, code, path="", data='', callback=None): self.send_message(Message(code, data), callback); def send(self, packet_text): + _log.debug("[websocket] send: " + str(packet_text)); try: self._connection.send(packet_text) except websocket.WebSocketTimeoutException as e: @@ -186,7 +185,8 @@ class WebsocketTransport(_AbstractTransport): raise ConnectionError(e) def close(self): - self._connection.close() + self._connection.close(); + self._connected = False; class XHR_PollingTransport(_AbstractTransport): @@ -207,8 +207,9 @@ class XHR_PollingTransport(_AbstractTransport): @property def _params(self): return dict(t=int(time.time() * 1000)) - + def send(self, packet_text): + _log.debug("[xhr] send: " + str(packet_text)); uri = self._url + "&" + '&'.join("%s=%s" % (k, v) for (k, v) in self._params.iteritems()); response = None; try: From f4a96c72d036292ce1d5111b59843a06fc90f393 Mon Sep 17 00:00:00 2001 From: Sean Arietta Date: Mon, 22 Dec 2014 22:08:15 -0800 Subject: [PATCH 08/22] Updates to get XHR polling working correctly. Mostly encode/decode issues. --- socketIO_client/__init__.py | 3 +-- socketIO_client/parser.py | 5 ----- socketIO_client/transports.py | 27 ++++++++++++++++++++------- 3 files changed, 21 insertions(+), 14 deletions(-) diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index 30a8643..fd9ebf7 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -178,7 +178,6 @@ class SocketIO(object): def _terminate_heartbeat(self): if self.heartbeat_terminator is not None: self.heartbeat_terminator.set(); - #time.sleep(self.session.heartbeat_interval); self.heartbeat_thread.join(); def define(self, Namespace, path=''): @@ -239,7 +238,7 @@ class SocketIO(object): try: self.reconnect(); except ConnectionError as e: - time.sleep(1); + time.sleep(RETRY_INTERVAL_IN_SECONDS); continue; try: diff --git a/socketIO_client/parser.py b/socketIO_client/parser.py index 39fcb3d..4edd487 100644 --- a/socketIO_client/parser.py +++ b/socketIO_client/parser.py @@ -234,8 +234,3 @@ def decode_packet(packet): else: import ipdb; ipdb.set_trace(); pass; - -def encode_packet_string(code, path, data): - """Encodes packet to be sent to socket.io server. - """ - diff --git a/socketIO_client/transports.py b/socketIO_client/transports.py index 2c70cf4..b13738a 100644 --- a/socketIO_client/transports.py +++ b/socketIO_client/transports.py @@ -50,12 +50,15 @@ class _AbstractTransport(object): responded = False; while not responded: for packet in self.recv_packet(): - _log.debug("[connect wait] Waiting for confirmation of connect to: %s" % path); - if (packet.type == PacketType.MESSAGE - and packet.payload.type == MessageType.CONNECT - and packet.payload.path == path): - _log.debug("[connect] Connected to path: %s" % path); - responded = True; + if not responded: + _log.debug("[connect wait] Waiting for confirmation of connect to: %s" % path); + if (packet.type == PacketType.MESSAGE + and packet.payload.type == MessageType.CONNECT + and packet.payload.path == path): + _log.debug("[connect] Connected to path: %s" % path); + responded = True; + else: + self._packets.append(packet); else: self.send_packet(PacketType.OPEN, path, data); @@ -167,7 +170,7 @@ class WebsocketTransport(_AbstractTransport): response = self._connection.recv(); try: for packet in parser.decode_response(response): - _log.debug('[websocket packet received] %s', str(packet)); + _log.debug('[websocket] Packet received: %s', str(packet)); yield packet; except AttributeError: _log.warn('[packet error] %s', repr(response)) @@ -207,6 +210,10 @@ class XHR_PollingTransport(_AbstractTransport): @property def _params(self): return dict(t=int(time.time() * 1000)) + + def send_engineio_packet(self, packet, callback=None): + packet_text = packet.encode_as_string(for_websocket = False); + self.send(packet_text) def send(self, packet_text): _log.debug("[xhr] send: " + str(packet_text)); @@ -233,6 +240,11 @@ class XHR_PollingTransport(_AbstractTransport): if self._waiting: return; + # Yield any packets that were not processed before. + for packet in self._packets: + _log.debug('[xhr] Packet received: %s', str(packet)); + yield packet; + self._waiting = True; response = _get_response( self._http_session.get, @@ -245,6 +257,7 @@ class XHR_PollingTransport(_AbstractTransport): return; for packet in parser.decode_response(response): + _log.debug('[xhr] Packet received: %s', str(packet)); yield packet; return From bdb92f8da6a642ce5f54c5a6c2990c3ef08f648c Mon Sep 17 00:00:00 2001 From: Sean Arietta Date: Tue, 23 Dec 2014 02:18:33 -0800 Subject: [PATCH 09/22] Added 4 space indents to server_tests.js. Also added less ambiguous method definitions for multiple arg methods --- serve_tests.js | 130 ++++++++++++++++++++++++------------------------- 1 file changed, 65 insertions(+), 65 deletions(-) diff --git a/serve_tests.js b/serve_tests.js index 6c2edfc..90c58cf 100644 --- a/serve_tests.js +++ b/serve_tests.js @@ -1,79 +1,79 @@ var io = require('socket.io').listen(8000); var main = io.of('').on('connection', function(socket) { - socket.on('message', function(data, fn) { - if (fn) { // Client expects a callback - if (data) { - fn(data); - } else { - fn(); - } - } else if (typeof data === 'object') { - socket.json.send(data ? data : 'message_response'); // object or null - } else { - socket.send(data ? data : 'message_response'); // string or '' - } - }); - socket.on('emit', function() { - socket.emit('emit_response'); - }); - socket.on('emit_with_payload', function(payload) { - socket.emit('emit_with_payload_response', payload); - }); - socket.on('emit_with_multiple_payloads', function(payload, payload) { - socket.emit('emit_with_multiple_payloads_response', payload, payload); - }); - socket.on('emit_with_callback', function(fn) { - fn(); - }); - socket.on('emit_with_callback_with_payload', function(fn) { - fn(PAYLOAD); - }); - socket.on('emit_with_callback_with_multiple_payloads', function(fn) { - fn(PAYLOAD, PAYLOAD); - }); - socket.on('emit_with_event', function(payload) { - socket.emit('emit_with_event_response', payload); - }); - socket.on('ack', function(payload) { - socket.emit('ack_response', payload, function(payload) { - socket.emit('ack_callback_response', payload); + socket.on('message', function(data, fn) { + if (fn) { // Client expects a callback + if (data) { + fn(data); + } else { + fn(); + } + } else if (typeof data === 'object') { + socket.json.send(data ? data : 'message_response'); // object or null + } else { + socket.send(data ? data : 'message_response'); // string or '' + } + }); + socket.on('emit', function() { + socket.emit('emit_response'); + }); + socket.on('emit_with_payload', function(payload) { + socket.emit('emit_with_payload_response', payload); + }); + socket.on('emit_with_multiple_payloads', function(payload1, payload2) { + socket.emit('emit_with_multiple_payloads_response', payload1, payload2); + }); + socket.on('emit_with_callback', function(fn) { + fn(); + }); + socket.on('emit_with_callback_with_payload', function(fn) { + fn(PAYLOAD); + }); + socket.on('emit_with_callback_with_multiple_payloads', function(fn) { + fn(PAYLOAD, PAYLOAD); + }); + socket.on('emit_with_event', function(payload) { + socket.emit('emit_with_event_response', payload); + }); + socket.on('ack', function(payload) { + socket.emit('ack_response', payload, function(payload) { + socket.emit('ack_callback_response', payload); + }); + }); + socket.on('aaa', function() { + socket.emit('aaa_response', PAYLOAD); + }); + socket.on('bbb', function(payload, fn) { + if (fn) { + fn(payload); + } + }); + socket.on('wait_with_disconnect', function() { + socket.emit('wait_with_disconnect_response'); }); - }); - socket.on('aaa', function() { - socket.emit('aaa_response', PAYLOAD); - }); - socket.on('bbb', function(payload, fn) { - if (fn) { - fn(payload); - } - }); - socket.on('wait_with_disconnect', function() { - socket.emit('wait_with_disconnect_response'); - }); }); var chat = io.of('/chat').on('connection', function (socket) { - socket.on('emit_with_payload', function(payload) { - socket.emit('emit_with_payload_response', payload); - }); - socket.on('aaa', function() { - socket.emit('aaa_response', 'in chat'); - }); - socket.on('ack', function(payload) { - socket.emit('ack_response', payload, function(payload) { - socket.emit('ack_callback_response', payload); + socket.on('emit_with_payload', function(payload) { + socket.emit('emit_with_payload_response', payload); + }); + socket.on('aaa', function() { + socket.emit('aaa_response', 'in chat'); + }); + socket.on('ack', function(payload) { + socket.emit('ack_response', payload, function(payload) { + socket.emit('ack_callback_response', payload); + }); }); - }); }); var news = io.of('/news').on('connection', function (socket) { - socket.on('emit_with_payload', function(payload) { - socket.emit('emit_with_payload_response', payload); - }); - socket.on('aaa', function() { - socket.emit('aaa_response', 'in news'); - }); + socket.on('emit_with_payload', function(payload) { + socket.emit('emit_with_payload_response', payload); + }); + socket.on('aaa', function() { + socket.emit('aaa_response', 'in news'); + }); }); var PAYLOAD = {'xxx': 'yyy'}; From 66e563acc31c8e4f22b365668dec71d2e798d8c8 Mon Sep 17 00:00:00 2001 From: Sean Arietta Date: Tue, 23 Dec 2014 02:19:41 -0800 Subject: [PATCH 10/22] Updated implementation to allow callbacks and arg handling that is consistent with reference javascript implementation. --- socketIO_client/__init__.py | 33 ++++++++++++++++++++--------- socketIO_client/transports.py | 40 +++++++++++++++++++---------------- 2 files changed, 45 insertions(+), 28 deletions(-) diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index fd9ebf7..8196327 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -27,7 +27,6 @@ _log = logging.getLogger(__name__) PROTOCOL_VERSION = 1 RETRY_INTERVAL_IN_SECONDS = 1 - class BaseNamespace(object): 'Define client behavior' @@ -46,11 +45,7 @@ class BaseNamespace(object): def emit(self, event, *args, **kw): callback, args = find_callback(args, kw) - - if callback is not None: - _log.warn("Callback was specified but is not supported."); - - self._transport.emit(self.path, event, args, None) + self._transport.emit(self.path, event, args, callback) def disconnect(self): self._transport.disconnect(self.path) @@ -231,7 +226,7 @@ class SocketIO(object): - Omit seconds, i.e. call wait() without arguments, to wait forever. """ warning_screen = _yield_warning_screen(seconds) - for elapsed_time in warning_screen: + for elapsed_time in warning_screen: # We will end up here in the case that we # disconnected. if len(self.reconnect_paths) > 0: @@ -266,6 +261,7 @@ class SocketIO(object): except StopIteration: _log.warn(warning) + self._terminate_heartbeat(); _log.debug("[wait canceled]"); def _process_events(self): @@ -476,13 +472,30 @@ class SocketIO(object): find_event_callback(event)(*args); def _on_ack(self, packet, find_event_callback): - event = packet.payload.message[0]; - args = packet.payload.message[1:] if len(packet.payload.message) > 1 else []; + """Handles ACK from server. + + There are two types of ACKs. The first is when this client + requests that the server responds with an ACK upon execution + of a remote function (specified in the server via + socketio.on()). + + The second type is when the server requests that this client + acknowledges that a local function has been executed. + + Both are handled the same way from the client's standpoint, + but in latter case the server will actually send along the + event name and the args, but they are currently ignored. + + """ + + #event = packet.payload.message[0]; packet_id = packet.payload.id; try: - ack_callback = self._transport.get_ack_callback(packet_id) + ack_callback = self._transport.get_ack_callback(str(packet_id)) except KeyError: + _log.warn("Could not find callback function for packet id: %d" % packet_id); return + args = packet.payload.message[1:] if len(packet.payload.message) > 1 else []; ack_callback(*args) def _on_error(self, packet, find_event_callback): diff --git a/socketIO_client/transports.py b/socketIO_client/transports.py index b13738a..18e27ae 100644 --- a/socketIO_client/transports.py +++ b/socketIO_client/transports.py @@ -65,28 +65,30 @@ class _AbstractTransport(object): def send_heartbeat(self): self.send_packet(PacketType.PING) - def message(self, path, data, callback): - if isinstance(data, basestring): - code = 3 - else: - code = 4 - data = json.dumps(data, ensure_ascii=False) - self.send_packet(code, path, data, callback) - def emit(self, path, event, args, callback): - message = Message(MessageType.EVENT, [event, args], path); - self.send_packet(PacketType.MESSAGE, path, message.encode_as_json(), callback) + message_id = self.set_ack_callback(callback) if callback else None; + message = ""; + if len(args) > 0: + data = list(args); + data.insert(0, event); + message = Message(MessageType.EVENT, data, path, message_id = message_id); + else: + message = Message(MessageType.EVENT, [event], path, message_id = message_id); + self.send_packet(PacketType.MESSAGE, path, message.encode_as_json()) def ack(self, path, packet_id, *args): _log.debug("[ack] Sending ACK for packet: %d" % packet_id); - message = Message(MessageType.ACK, "", path, "", packet_id); + data = ""; + if len(args) > 0: + data = args; + message = Message(MessageType.ACK, data, path, "", packet_id); packet = Packet(PacketType.MESSAGE, message); self.send_engineio_packet(packet); def noop(self, path=''): self.send_packet(PacketType.NOOP, path) - def send_packet(self, code, path='', data='', callback=None): + def send_packet(self, code, path='', data=''): packet_text = Packet(code, data).encode_as_string(); self.send(packet_text) @@ -105,11 +107,13 @@ class _AbstractTransport(object): def set_ack_callback(self, callback): 'Set callback to be called after server sends an acknowledgment' self._packet_id += 1 + _log.debug("Setting ACK for packet id: %d (%s) [%s]" % (self._packet_id, str(callback), str(self))); self._callback_by_packet_id[str(self._packet_id)] = callback - return '%s+' % self._packet_id + return self._packet_id def get_ack_callback(self, packet_id): 'Get callback to be called after server sends an acknowledgment' + _log.debug("Searching for ACK for packet id: %s [%s]" % (packet_id, str(self))); callback = self._callback_by_packet_id[packet_id] del self._callback_by_packet_id[packet_id] return callback @@ -141,16 +145,16 @@ class WebsocketTransport(_AbstractTransport): def connected(self): return self._connection.connected - def send_message(self, message, callback = None): + def send_message(self, message): packet_text = message.encode_as_string(); self.send(packet_text) - def send_engineio_packet(self, packet, callback=None): + def send_engineio_packet(self, packet): packet_text = packet.encode_as_string(for_websocket = True); self.send(packet_text) - def send_packet(self, code, path="", data='', callback=None): - self.send_message(Message(code, data), callback); + def send_packet(self, code, path="", data=''): + self.send_message(Message(code, data)); def send(self, packet_text): _log.debug("[websocket] send: " + str(packet_text)); @@ -211,7 +215,7 @@ class XHR_PollingTransport(_AbstractTransport): def _params(self): return dict(t=int(time.time() * 1000)) - def send_engineio_packet(self, packet, callback=None): + def send_engineio_packet(self, packet): packet_text = packet.encode_as_string(for_websocket = False); self.send(packet_text) From 6f8e76adfd6cdeaafc039bdd91193e086ee8ac80 Mon Sep 17 00:00:00 2001 From: Sean Arietta Date: Tue, 23 Dec 2014 02:20:26 -0800 Subject: [PATCH 11/22] Added check to parser encoder to automatically encode json if the message data is not a string --- socketIO_client/parser.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/socketIO_client/parser.py b/socketIO_client/parser.py index 4edd487..27a2d7f 100644 --- a/socketIO_client/parser.py +++ b/socketIO_client/parser.py @@ -98,6 +98,8 @@ class Message(): def encode_as_string(self): """Same as the encode_as_string method except it doesn't encode things as a JSON string""" + if not isinstance(self.message, basestring): + return self.encode_as_json(); data = self.message; if self.id is not None: data = str(self.id) + self.message; From 2361706c4e92e00e7e30418228767d3b8ecfad7c Mon Sep 17 00:00:00 2001 From: Sean Arietta Date: Tue, 23 Dec 2014 02:24:52 -0800 Subject: [PATCH 12/22] Updated tests slightly for newer client implementation. Tests on 'message'-like methods have been removed as the newer paradigm considers message and events the same. Timeouts on the non-websocket transports have been lowered to enable faster tests. Explicit transports have been removed from constructors since that has also been removed from the code. --- socketIO_client/tests.py | 60 ++++++++-------------------------------- 1 file changed, 11 insertions(+), 49 deletions(-) diff --git a/socketIO_client/tests.py b/socketIO_client/tests.py index dfebecb..ea20282 100644 --- a/socketIO_client/tests.py +++ b/socketIO_client/tests.py @@ -3,8 +3,6 @@ import time from unittest import TestCase from . import SocketIO, BaseNamespace, find_callback -from .transports import TIMEOUT_IN_SECONDS - HOST = 'localhost' PORT = 8000 @@ -42,39 +40,6 @@ class BaseMixin(object): self.assertTrue(namespace.called_on_disconnect) self.assertFalse(self.socketIO.connected) - def test_message(self): - 'Message' - namespace = self.socketIO.define(Namespace) - self.socketIO.message() - self.socketIO.wait(self.wait_time_in_seconds) - self.assertEqual(namespace.response, 'message_response') - - def test_message_with_data(self): - 'Message with data' - namespace = self.socketIO.define(Namespace) - self.socketIO.message(DATA) - self.socketIO.wait(self.wait_time_in_seconds) - self.assertEqual(namespace.response, DATA) - - def test_message_with_payload(self): - 'Message with payload' - namespace = self.socketIO.define(Namespace) - self.socketIO.message(PAYLOAD) - self.socketIO.wait(self.wait_time_in_seconds) - self.assertEqual(namespace.response, PAYLOAD) - - def test_message_with_callback(self): - 'Message with callback' - self.socketIO.message(callback=self.on_response) - self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) - self.assertTrue(self.called_on_response) - - def test_message_with_callback_with_data(self): - 'Message with callback with data' - self.socketIO.message(DATA, self.on_response) - self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) - self.assertTrue(self.called_on_response) - def test_emit(self): 'Emit' namespace = self.socketIO.define(Namespace) @@ -104,22 +69,22 @@ class BaseMixin(object): def test_emit_with_callback(self): 'Emit with callback' - self.socketIO.emit('emit_with_callback', self.on_response) - self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) + self.socketIO.emit('emit_with_callback', callback = self.on_response) + self.socketIO.wait_for_callbacks(seconds = self.wait_time_in_seconds) self.assertTrue(self.called_on_response) def test_emit_with_callback_with_payload(self): 'Emit with callback with payload' self.socketIO.emit( 'emit_with_callback_with_payload', self.on_response) - self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) + self.socketIO.wait_for_callbacks(seconds = self.wait_time_in_seconds) self.assertTrue(self.called_on_response) def test_emit_with_callback_with_multiple_payloads(self): 'Emit with callback with multiple payloads' self.socketIO.emit( 'emit_with_callback_with_multiple_payloads', self.on_response) - self.socketIO.wait_for_callbacks(seconds=self.wait_time_in_seconds) + self.socketIO.wait_for_callbacks(seconds = self.wait_time_in_seconds) self.assertTrue(self.called_on_response) def test_emit_with_event(self): @@ -171,30 +136,27 @@ class BaseMixin(object): 'ack_callback_response': (PAYLOAD,), }) - -class Test_WebsocketTransport(TestCase, BaseMixin): +class Test_WebsocketTransport(BaseMixin, TestCase): def setUp(self): super(Test_WebsocketTransport, self).setUp() - self.socketIO = SocketIO(HOST, PORT, transports=['websocket']) + self.socketIO = SocketIO(HOST, PORT) self.wait_time_in_seconds = 0.1 -class Test_XHR_PollingTransport(TestCase, BaseMixin): +class Test_XHR_PollingTransport(BaseMixin, TestCase): def setUp(self): super(Test_XHR_PollingTransport, self).setUp() - self.socketIO = SocketIO(HOST, PORT, transports=['xhr-polling']) - self.wait_time_in_seconds = TIMEOUT_IN_SECONDS + 1 - + self.socketIO = SocketIO(HOST, PORT) + self.wait_time_in_seconds = 1 class Test_JSONP_PollingTransport(TestCase, BaseMixin): def setUp(self): super(Test_JSONP_PollingTransport, self).setUp() - self.socketIO = SocketIO(HOST, PORT, transports=['jsonp-polling']) - self.wait_time_in_seconds = TIMEOUT_IN_SECONDS + 1 - + self.socketIO = SocketIO(HOST, PORT, transports = ['jsonp-polling']) + self.wait_time_in_seconds = 1; class Namespace(BaseNamespace): From 455890a7000175826d0d2b680661c5aa3afab2e1 Mon Sep 17 00:00:00 2001 From: Sean Arietta Date: Tue, 23 Dec 2014 02:37:53 -0800 Subject: [PATCH 13/22] Added more documentation --- socketIO_client/__init__.py | 47 +++++++++++++++++++++++++++++++------ socketIO_client/parser.py | 27 +++++++++++++++++---- 2 files changed, 62 insertions(+), 12 deletions(-) diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index 8196327..a2b6c8c 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -16,7 +16,6 @@ except ImportError: from .exceptions import ConnectionError, TimeoutError, PacketError from .transports import _get_response - _SocketIOSession = namedtuple('_SocketIOSession', [ 'id', 'heartbeat_interval', @@ -24,7 +23,6 @@ _SocketIOSession = namedtuple('_SocketIOSession', [ 'server_supported_transports', ]) _log = logging.getLogger(__name__) -PROTOCOL_VERSION = 1 RETRY_INTERVAL_IN_SECONDS = 1 class BaseNamespace(object): @@ -171,6 +169,9 @@ class SocketIO(object): self._terminate_heartbeat(); def _terminate_heartbeat(self): + """Terminates the heartbeat thread. + + """ if self.heartbeat_terminator is not None: self.heartbeat_terminator.set(); self.heartbeat_thread.join(); @@ -365,6 +366,21 @@ class SocketIO(object): return self.__transport def _upgrade(self): + """Attempts to upgrade the connection to a websocket. + + This method will execute the update process outline here: + https://github.com/Automattic/engine.io-protocol#transport-upgrading + + To summarize, we first send a PING packet with the string + 'probe' appended as data. This signals to the server that we + want to probe the ability to upgrade. If the server has this + functionality, it responds with a PONG packet and the 'probe' + string. + + We then send an UPGRADE packet, restart the heartbeat thread, + and return. + + """ websocket = transports.WebsocketTransport(self.session, self.is_secure, self.base_url, **self.kw); websocket.send_packet(PacketType.PING, "", "probe"); for packet in websocket.recv_packet(): @@ -384,6 +400,19 @@ class SocketIO(object): return websocket; def _start_heartbeat(self, transport): + """Starts the heartbeat thread. + + The heartbeat thread ensures that our connection is never + severed. This effectively spawns a thread that sits in an + infinite loop. The thread waits + self.session.heartbeat_interval / 2 (gleaned from the server) + seconds, then sends a heartbeat packet (PING). + + The thread is implemented using a multiprocessing.Event to + perform the wait, so it doesn't waste any cpu cycles while + it's waiting. + + """ _log.debug("[start heartbeat pacemaker]"); self.heartbeat_terminator = multiprocessing.Event(); self.heartbeat_thread = multiprocessing.Process( @@ -458,10 +487,14 @@ class SocketIO(object): find_event_callback('disconnect')() def _on_event(self, packet, find_event_callback): - # Accoding to the documentation - # (https://github.com/automattic/socket.io-protocol#event), - # the event name is the first entry in the message array, and - # the arguments are the rest of the entries. + """This delegate is called when there is an EVENT packet. + + Accoding to the documentation + (https://github.com/automattic/socket.io-protocol#event), the + event name is the first entry in the message array, and the + arguments are the rest of the entries. + + """ event = packet.payload.message[0]; args = packet.payload.message[1:] if len(packet.payload.message) > 1 else []; @@ -556,7 +589,7 @@ def _yield_elapsed_time(seconds=None): def _get_socketIO_session(is_secure, base_url, **kw): server_url = '%s://%s/?EIO=%d&transport=polling' \ - % ('https' if is_secure else 'http', base_url, parser.ENGINE_PROTOCOL) + % ('https' if is_secure else 'http', base_url, parser.ENGINEIO_PROTOCOL) _log.debug('[session] %s', server_url) try: response = _get_response(requests.get, server_url, **kw) diff --git a/socketIO_client/parser.py b/socketIO_client/parser.py index 27a2d7f..070cf14 100644 --- a/socketIO_client/parser.py +++ b/socketIO_client/parser.py @@ -4,7 +4,7 @@ import json _log = logging.getLogger(__name__) -ENGINE_PROTOCOL = 3; +ENGINEIO_PROTOCOL = 3; class PacketType(Enum): OPEN = 0; @@ -25,6 +25,8 @@ class MessageType(Enum): BINARY_ACK = 6; class Packet(): + """ Represents a 'packet' from the engine.io protocol. + """ def __init__(self, packet_type, payload): self.type = packet_type; self.payload = payload; @@ -33,6 +35,16 @@ class Packet(): return "PACKET{type: " + str(self.type) + ", payload: " + str(self.payload) + "}"; def encode_as_string(self, for_websocket = False): + """Returns the packet encoded according to the engine.io-protocol. + + Reference: (https://github.com/Automattic/engine.io-protocol). + + It's worth noting that websockets have their own framing and + encoding mechanism so if the packet is going to be transmitted + via websockets, we encode it slightly differently per the + documentation. + + """ data = ""; path = ""; if self.type == PacketType.MESSAGE and not isinstance(self.payload, basestring): @@ -54,6 +66,8 @@ class Packet(): return encoded; class Message(): + """Represents a 'message' from the socket.io protocol. + """ def __init__(self, message_type, message, path = "", attachments = "", message_id = None): self.type = message_type; if isinstance(message, basestring): @@ -84,9 +98,8 @@ class Message(): "}"; def encode_as_json(self): - """Encodes a Message to be sent to socket.io server. - - Assumes the message payload will be dumped as a json string. + """Encodes a JSON Message to be sent to socket.io server. + """ data = json.dumps(self.message); if self.id is not None: @@ -97,7 +110,11 @@ class Message(): return str(self.type) + self.path + "," + data; def encode_as_string(self): - """Same as the encode_as_string method except it doesn't encode things as a JSON string""" + """Encodes a Message to be to a socket.io server. + + This will call encode_as_json if the message is not a string. + + """ if not isinstance(self.message, basestring): return self.encode_as_json(); data = self.message; From 6932d4c2d1b89d57a4c6524d5526f2c064cdf908 Mon Sep 17 00:00:00 2001 From: Sean Arietta Date: Tue, 23 Dec 2014 19:15:50 -0800 Subject: [PATCH 14/22] Added connection error handling for all send / recv related methods so that we can reconnect on all connection errors --- socketIO_client/__init__.py | 103 ++++++++++++++++++++++-------------- 1 file changed, 64 insertions(+), 39 deletions(-) diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index a2b6c8c..5026cb9 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -28,8 +28,8 @@ RETRY_INTERVAL_IN_SECONDS = 1 class BaseNamespace(object): 'Define client behavior' - def __init__(self, _transport, path): - self._transport = _transport + def __init__(self, client, path): + self._client = client; self.path = path self._callback_by_event = {} self.initialize() @@ -38,15 +38,12 @@ class BaseNamespace(object): 'Initialize custom variables here; you can override this method' pass - def message(self, data='', callback=None): - self._transport.message(self.path, data, callback) - def emit(self, event, *args, **kw): callback, args = find_callback(args, kw) - self._transport.emit(self.path, event, args, callback) + self._client.emit(self.path, event, args, callback) def disconnect(self): - self._transport.disconnect(self.path) + self._client.disconnect(self.path) def on(self, event, callback): 'Define a callback to handle a custom event emitted by the server' @@ -168,31 +165,31 @@ class SocketIO(object): self.disconnect() self._terminate_heartbeat(); - def _terminate_heartbeat(self): - """Terminates the heartbeat thread. - - """ - if self.heartbeat_terminator is not None: - self.heartbeat_terminator.set(); - self.heartbeat_thread.join(); - def define(self, Namespace, path=''): - if path: - self._transport.connect(path) - namespace = Namespace(self._transport, path) + _log.debug("[define] Path: %s" % path); + namespace = Namespace(self, path) self._namespace_by_path[path] = namespace + + if path: + try: + self._transport.connect(path); + except ConnectionError as e: + _log.warn("[define] Connection error: %s" % str(e)); + self._handle_severed_connection(); + return namespace def on(self, event, callback, path=''): return self.get_namespace(path).on(event, callback) - def message(self, data='', callback=None, path=''): - self._transport.message(path, data, callback) - def emit(self, event, *args, **kw): path = kw.get('path', '') callback, args = find_callback(args, kw) - self._transport.emit(path, event, args, callback) + try: + self._transport.emit(path, event, args, callback); + except ConnectionError as e: + _log.warn("[emit] Connection error: %s" % str(e)); + self._handle_severed_connection(); def reconnect(self): """Reconnects the client. @@ -205,8 +202,8 @@ class SocketIO(object): # Reconnect to the server. if self.__transport is not None: - self.__transport.close(); - self.__transport = self._get_transport(); + self.__transport.close(); + self.__transport = None; for path in self.reconnect_paths: # We avoid reconnecting to the default namespace because @@ -217,10 +214,21 @@ class SocketIO(object): self._transport.connect(path); # Restore paths. self._namespace_by_path = copy.copy(self.reconnect_paths); - for namespace in self._namespace_by_path: - self._namespace_by_path[namespace]._transport = self.__transport; self.reconnect_paths = {}; + def _handle_severed_connection(self): + """Handles severed (unexpectedly terminated) connections + + """ + self._terminate_heartbeat(); + if len(self.reconnect_paths) is 0: + self.reconnect_paths = copy.copy(self._namespace_by_path); + + self._terminate_heartbeat(); + + for namespace in self.reconnect_paths: + self.disconnect(namespace); + def wait(self, seconds=None, for_callbacks=False): """Wait in a loop and process events as defined in the namespaces. @@ -247,16 +255,7 @@ class SocketIO(object): except ConnectionError as e: try: - # This is where we end up if the connection was - # severed. The client will disconnect here. - if len(self.reconnect_paths) is 0: - self.reconnect_paths = copy.copy(self._namespace_by_path); - - self._terminate_heartbeat(); - - for namespace in self.reconnect_paths: - self.disconnect(namespace); - + self._handle_severed_connection(); warning = Exception('[connection error] %s' % e) warning_screen.throw(warning) except StopIteration: @@ -332,7 +331,11 @@ class SocketIO(object): def disconnect(self, path=''): if self.connected: - self._transport.disconnect(path) + try: + self._transport.disconnect(path) + except ConnectionError as e: + _log.warn("[disconnect] Connection error: %s" % str(e)); + namespace = self._namespace_by_path[path] namespace.on_disconnect() if path: @@ -340,7 +343,7 @@ class SocketIO(object): @property def connected(self): - return self.__transport.connected if self.__transport is not None else False; + return self.__transport.connected if self.__transport is not None else False; @property def _transport(self): @@ -399,6 +402,14 @@ class SocketIO(object): self._start_heartbeat(websocket); return websocket; + def _terminate_heartbeat(self): + """Terminates the heartbeat thread. + + """ + if self.heartbeat_terminator is not None: + self.heartbeat_terminator.set(); + self.heartbeat_thread.join(); + def _start_heartbeat(self, transport): """Starts the heartbeat thread. @@ -413,6 +424,11 @@ class SocketIO(object): it's waiting. """ + + if self.heartbeat_thread is not None and not self.heartbeat_terminator.is_set(): + _log.warn("[start hearbeat] heartbeat already started... terminating old heartbeat"); + self._terminate_heartbeat(); + _log.debug("[start heartbeat pacemaker]"); self.heartbeat_terminator = multiprocessing.Event(); self.heartbeat_thread = multiprocessing.Process( @@ -542,8 +558,14 @@ class SocketIO(object): def _prepare_to_send_ack(self, path, packet_id): 'Return function that acknowledges the server' - return lambda *args: self._transport.ack(path, packet_id, *args) + return lambda *args: _send_ack(self, path, packet_id, *args); +def _send_ack(socketio, path, packet_id, *args): + try: + socketio._transport.ack(path, packet_id, *args); + except ConnectionError as e: + _log.warn("[send ack] Connection error: %s" % str(e)); + socketio._handle_severed_connection(); def find_callback(args, kw=None): 'Return callback whether passed as a last argument or as a keyword' @@ -618,6 +640,9 @@ def _make_heartbeat_pacemaker(terminator, transport, heartbeat_interval): _log.debug("[hearbeat]"); try: transport.send_heartbeat(); + except requests.exceptions.ConnectionError as e: + message = "[heartbeat] disconnected while sending PING"; + _log.warn(message); except: pass; _log.debug("[heartbeat terminated]"); From a1f37e8e60ec94f3a857d95248bae62adb9fd4ff Mon Sep 17 00:00:00 2001 From: Sean Arietta Date: Tue, 23 Dec 2014 21:38:42 -0800 Subject: [PATCH 15/22] Added an event queue so that emitted events that failed can be automatically retried upon reconnection. Also fixed a potential infinite loop bug wrt reconnecting automatically --- socketIO_client/__init__.py | 87 +++++++++++++++++++++++++++---------- 1 file changed, 65 insertions(+), 22 deletions(-) diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index 5026cb9..6c5b41c 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -38,9 +38,8 @@ class BaseNamespace(object): 'Initialize custom variables here; you can override this method' pass - def emit(self, event, *args, **kw): - callback, args = find_callback(args, kw) - self._client.emit(self.path, event, args, callback) + def emit(self, event_name, *args, **kw): + self._client.emit(event_name, path = self.path, *args, **kw) def disconnect(self): self._client.disconnect(self.path) @@ -110,6 +109,15 @@ class BaseNamespace(object): 'on_' + event.replace(' ', '_'), lambda *args: self.on_event(event, *args)) +class SocketIOEvent(object): + def __init__(self, path, name, args, callback): + self.path = path; + self.name = name; + self.args = args; + self.callback = callback; + + def __str__(self): + return str(self.path) + "/" + str(self.name) + "(" + str(self.args) + ")(" + str(self.callback) + ")"; class SocketIO(object): """Create a socket.io client that connects to a socket.io server @@ -152,6 +160,12 @@ class SocketIO(object): # This sets of a chain of events that attempts to connect to # the server at the base namespace. self.define(Namespace) + self._transport.connect(""); + + # Events that fail to emit due to connection errors will be + # placed in this 'queue' and re-sent automatically upon + # reconnect. + self._event_retry_queue = []; def __enter__(self): _log.debug("[enter]"); @@ -182,11 +196,24 @@ class SocketIO(object): def on(self, event, callback, path=''): return self.get_namespace(path).on(event, callback) - def emit(self, event, *args, **kw): + def _emit_event(self, event): + """Emits an Emittable object. + + This function enables automatically re-emitting events that + failed due to connection errors. + + """ + self._transport.emit(event.path, event.name, event.args, event.callback); + + def emit(self, event_name, *args, **kw): path = kw.get('path', '') callback, args = find_callback(args, kw) + event = SocketIOEvent(path, event_name, args, callback); + self._event_retry_queue.append(event); try: - self._transport.emit(path, event, args, callback); + #import ipdb; ipdb.set_trace(); + self._emit_event(event); + self._event_retry_queue.pop(); except ConnectionError as e: _log.warn("[emit] Connection error: %s" % str(e)); self._handle_severed_connection(); @@ -205,6 +232,11 @@ class SocketIO(object): self.__transport.close(); self.__transport = None; + # We call the _create_transport directly ahead of the loop + # below so that self._tranport will not result in an infinite + # loop. + self._create_transport(); + for path in self.reconnect_paths: # We avoid reconnecting to the default namespace because # socketIO_client connects to that already. @@ -216,6 +248,11 @@ class SocketIO(object): self._namespace_by_path = copy.copy(self.reconnect_paths); self.reconnect_paths = {}; + # Send any pending events. + for event in self._event_retry_queue: + _log.debug("[reconnect] Re-emitting event: %s" % str(event)); + self._emit_event(event); + def _handle_severed_connection(self): """Handles severed (unexpectedly terminated) connections @@ -225,9 +262,10 @@ class SocketIO(object): self.reconnect_paths = copy.copy(self._namespace_by_path); self._terminate_heartbeat(); + self.__transport = None; for namespace in self.reconnect_paths: - self.disconnect(namespace); + self.disconnect(namespace, skip_transport_disconnect = True); def wait(self, seconds=None, for_callbacks=False): """Wait in a loop and process events as defined in the namespaces. @@ -235,16 +273,7 @@ class SocketIO(object): - Omit seconds, i.e. call wait() without arguments, to wait forever. """ warning_screen = _yield_warning_screen(seconds) - for elapsed_time in warning_screen: - # We will end up here in the case that we - # disconnected. - if len(self.reconnect_paths) > 0: - try: - self.reconnect(); - except ConnectionError as e: - time.sleep(RETRY_INTERVAL_IN_SECONDS); - continue; - + for elapsed_time in warning_screen: try: try: self._process_events() @@ -329,15 +358,15 @@ class SocketIO(object): def wait_for_callbacks(self, seconds=None): self.wait(seconds, for_callbacks=True) - def disconnect(self, path=''): - if self.connected: + def disconnect(self, path='', skip_transport_disconnect = False): + if self.connected and not skip_transport_disconnect: try: self._transport.disconnect(path) except ConnectionError as e: _log.warn("[disconnect] Connection error: %s" % str(e)); - namespace = self._namespace_by_path[path] - namespace.on_disconnect() + namespace = self._namespace_by_path[path] + namespace.on_disconnect() if path: del self._namespace_by_path[path] @@ -345,6 +374,10 @@ class SocketIO(object): def connected(self): return self.__transport.connected if self.__transport is not None else False; + def _create_transport(self): + _log.debug("[create transport]"); + self.__transport = self._get_transport(); + @property def _transport(self): try: @@ -355,8 +388,7 @@ class SocketIO(object): warning_screen = _yield_warning_screen(seconds=None) for elapsed_time in warning_screen: try: - _log.debug("[create transport]"); - self.__transport = self._get_transport() + self._create_transport(); break except ConnectionError as e: if not self.wait_for_connection: @@ -366,6 +398,17 @@ class SocketIO(object): warning_screen.throw(warning) except StopIteration: _log.warn(warning) + + continue; + # If we disconnected before, self.reconnected_paths will be + # non-empty. + while len(self.reconnect_paths) > 0: + try: + self.reconnect(); + except ConnectionError as e: + time.sleep(RETRY_INTERVAL_IN_SECONDS); + continue; + return self.__transport def _upgrade(self): From f06bf8e7364d0b9b0b99679c95eb9ed50eff1559 Mon Sep 17 00:00:00 2001 From: Sean Arietta Date: Tue, 23 Dec 2014 21:39:26 -0800 Subject: [PATCH 16/22] Fixed a typo bug. Also added better disconnect handling in the websockets send method --- socketIO_client/transports.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/socketIO_client/transports.py b/socketIO_client/transports.py index 18e27ae..96a4045 100644 --- a/socketIO_client/transports.py +++ b/socketIO_client/transports.py @@ -130,7 +130,7 @@ class WebsocketTransport(_AbstractTransport): self._url = '%s://%s/?EIO=%d&transport=websocket&sid=%s' % ( 'wss' if is_secure else 'ws', - base_url, parser.ENGINE_PROTOCOL, socketIO_session.id) + base_url, parser.ENGINEIO_PROTOCOL, socketIO_session.id) try: _log.debug("[websocket] Connecting"); @@ -159,11 +159,16 @@ class WebsocketTransport(_AbstractTransport): def send(self, packet_text): _log.debug("[websocket] send: " + str(packet_text)); try: - self._connection.send(packet_text) + self._connection.ping(); + self._connection.send(packet_text); except websocket.WebSocketTimeoutException as e: message = 'timed out while sending %s (%s)' % (packet_text, e) _log.warn(message) raise TimeoutError(e) + except websocket.WebSocketConnectionClosedException as e: + message = 'disconnected while sending %s (%s)' % (packet_text, e); + _log.warn(message); + raise ConnectionError(message); except socket.error as e: message = 'disconnected while sending %s (%s)' % (packet_text, e) _log.warn(message) @@ -202,7 +207,7 @@ class XHR_PollingTransport(_AbstractTransport): super(XHR_PollingTransport, self).__init__() self._url = '%s://%s/?EIO=%d&transport=polling&sid=%s' % ( 'https' if is_secure else 'http', - base_url, parser.ENGINE_PROTOCOL, socketIO_session.id) + base_url, parser.ENGINEIO_PROTOCOL, socketIO_session.id) self._connected = True self._http_session = _prepare_http_session(kw) self._waiting = False; From dc8e8677410573fc009b2d3f17259f10d49f3231 Mon Sep 17 00:00:00 2001 From: Sean Arietta Date: Tue, 23 Dec 2014 21:39:48 -0800 Subject: [PATCH 17/22] Added tests that verify restart functionality is working as expected --- socketIO_client/tests.py | 68 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 65 insertions(+), 3 deletions(-) diff --git a/socketIO_client/tests.py b/socketIO_client/tests.py index ea20282..a0e6223 100644 --- a/socketIO_client/tests.py +++ b/socketIO_client/tests.py @@ -1,4 +1,5 @@ import logging +from subprocess import check_call import time from unittest import TestCase @@ -15,6 +16,9 @@ class BaseMixin(object): def setUp(self): self.called_on_response = False + + # Start the server if it's not already started. + check_call("./start-test-server.sh"); def tearDown(self): del self.socketIO @@ -27,6 +31,65 @@ class BaseMixin(object): self.assertEqual(arg, DATA) self.called_on_response = True + def test_server_dies_during_define(self): + 'Server dies in the middle of a send' + self.socketIO.wait(self.wait_time_in_seconds); + self.assertTrue(self.socketIO.connected); + + check_call("./kill-test-server.sh"); + + news_namespace = self.socketIO.define(Namespace, '/news'); + self.assertFalse(self.socketIO.connected); + + check_call("./start-test-server.sh"); + + main_namespace = self.socketIO.define(Namespace); + chat_namespace = self.socketIO.define(Namespace, '/chat'); + news_namespace = self.socketIO.define(Namespace, '/news'); + news_namespace.emit('emit_with_payload', PAYLOAD); + + self.socketIO.wait(self.wait_time_in_seconds); + self.assertEqual(main_namespace.args_by_event, {}); + self.assertEqual(chat_namespace.args_by_event, {}); + self.assertEqual(news_namespace.args_by_event, { + 'emit_with_payload_response': (PAYLOAD,), + }); + + def test_server_dies_during_emit_and_re_emit(self): + 'Server dies in the middle of a send' + namespace = self.socketIO.define(Namespace); + + self.socketIO.wait(self.wait_time_in_seconds); + self.assertTrue(self.socketIO.connected); + + check_call("./kill-test-server.sh"); + + self.socketIO.emit("emit"); + self.assertFalse(self.socketIO.connected); + check_call("./start-test-server.sh"); + + #self.socketIO.emit("emit"); + self.socketIO.wait(self.wait_time_in_seconds); + self.assertEqual(namespace.args_by_event, { + 'emit_response': (), + }); + + def test_server_restart(self): + 'Server restart' + self.assertTrue(self.socketIO.connected); + + check_call("./kill-test-server.sh"); + time.sleep(2); + self.socketIO.wait(self.wait_time_in_seconds) + + self.assertFalse(self.socketIO.connected); + + check_call("./start-test-server.sh"); + time.sleep(2); + self.socketIO.wait(self.wait_time_in_seconds); + + self.assertTrue(self.socketIO.connected); + def test_disconnect(self): 'Disconnect' self.assertTrue(self.socketIO.connected) @@ -143,21 +206,20 @@ class Test_WebsocketTransport(BaseMixin, TestCase): self.socketIO = SocketIO(HOST, PORT) self.wait_time_in_seconds = 0.1 - class Test_XHR_PollingTransport(BaseMixin, TestCase): def setUp(self): super(Test_XHR_PollingTransport, self).setUp() self.socketIO = SocketIO(HOST, PORT) self.wait_time_in_seconds = 1 - +""" class Test_JSONP_PollingTransport(TestCase, BaseMixin): def setUp(self): super(Test_JSONP_PollingTransport, self).setUp() self.socketIO = SocketIO(HOST, PORT, transports = ['jsonp-polling']) self.wait_time_in_seconds = 1; - +""" class Namespace(BaseNamespace): def initialize(self): From b059cdf02c336894ff3b5387163d2f90b65cb609 Mon Sep 17 00:00:00 2001 From: Sean Arietta Date: Thu, 25 Dec 2014 14:06:32 -0800 Subject: [PATCH 18/22] Re-arranged the call order of connecting to avoid interrupted connections --- socketIO_client/__init__.py | 46 ++++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index 6c5b41c..3793c60 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -24,6 +24,7 @@ _SocketIOSession = namedtuple('_SocketIOSession', [ ]) _log = logging.getLogger(__name__) RETRY_INTERVAL_IN_SECONDS = 1 +MAX_UPGRADE_RETRIES = 3; class BaseNamespace(object): 'Define client behavior' @@ -75,15 +76,15 @@ class BaseNamespace(object): def on_noop(self): 'Called after server sends a noop; you can override this method' - _log.info('%s [noop]', self.path) + _log.debug('%s [noop]', self.path) def on_ping(self): 'Called after server sends a ping; you can override this method' - _log.info('%s [ping]', self.path) + _log.debug('%s [ping]', self.path) def on_pong(self): 'Called after server sends a pong; you can override this method' - _log.info('%s [pong]', self.path) + _log.debug('%s [pong]', self.path) def on_open(self, *args): _log.info('%s [open] %s', self.path, args) @@ -434,16 +435,15 @@ class SocketIO(object): if packet.type == PacketType.PONG: _log.debug("[PONG] %s" % repr(packet)); - self._terminate_heartbeat(); - # Technically we would need to pause the current # transport (which should be polling in this # implementation), but since we haven't actually # started a polling yet, we can upgrade without that. _log.debug("[upgrading] Sending upgrade request"); websocket.send_packet(PacketType.UPGRADE); - self._start_heartbeat(websocket); return websocket; + else: + self._process_packet(packet) def _terminate_heartbeat(self): """Terminates the heartbeat thread. @@ -484,26 +484,33 @@ class SocketIO(object): # Negotiate initial transport transport = transports.XHR_PollingTransport(self.session, self.is_secure, self.base_url, **self.kw); - # Update namespaces - for path, namespace in self._namespace_by_path.items(): - namespace._transport = transport - transport.connect(path) - transport.set_timeout(self.session.connection_timeout); - # Start the heartbeat pacemaker (PING). - self._start_heartbeat(transport); - # If websocket is available, upgrade to it immediately. # TODO(sean): We could run this on a separate thread for # maximum efficiency although that would require some # synchronization to ensure buffers are flushed, etc. + num_retries = 0; if "websocket" in self.session.server_supported_transports: - try: - return self._upgrade(); - except: - _log.warn("[websocket] Failed to upgrade to websocket") - pass; + while num_retries < MAX_UPGRADE_RETRIES: + try: + transport = self._upgrade(); + break; + except: + pass; + time.sleep(1); + num_retries += 1; + + if num_retries == MAX_UPGRADE_RETRIES: + _log.warn("[websocket] Failed to upgrade to websocket"); + + # Update namespaces + for path, namespace in self._namespace_by_path.items(): + namespace._transport = transport + transport.connect(path) + + # Start the heartbeat pacemaker (PING). + self._start_heartbeat(transport); return transport @@ -669,6 +676,7 @@ def _get_socketIO_session(is_secure, base_url, **kw): return None; handshake = json.loads(packet.payload); + _log.info("socket.io client connected to: %s" % base_url); return _SocketIOSession( id = handshake["sid"], From 5af21af575968eecb82fc20836b22f14b2e13191 Mon Sep 17 00:00:00 2001 From: Sean Arietta Date: Thu, 25 Dec 2014 22:52:47 -0800 Subject: [PATCH 19/22] Small updates that fix a couple of connection issues --- socketIO_client/__init__.py | 11 ++++++++--- socketIO_client/transports.py | 5 +++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index 3793c60..2b45c25 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -161,7 +161,6 @@ class SocketIO(object): # This sets of a chain of events that attempts to connect to # the server at the base namespace. self.define(Namespace) - self._transport.connect(""); # Events that fail to emit due to connection errors will be # placed in this 'queue' and re-sent automatically upon @@ -430,6 +429,7 @@ class SocketIO(object): """ websocket = transports.WebsocketTransport(self.session, self.is_secure, self.base_url, **self.kw); websocket.send_packet(PacketType.PING, "", "probe"); + for packet in websocket.recv_packet(): _log.debug("[websocket] Packet: %s" % str(packet)); if packet.type == PacketType.PONG: @@ -486,6 +486,11 @@ class SocketIO(object): transport = transports.XHR_PollingTransport(self.session, self.is_secure, self.base_url, **self.kw); transport.set_timeout(self.session.connection_timeout); + # Wait for the response that we connected. + packet = None; + while packet == None or packet.type != PacketType.MESSAGE or packet.payload.type != MessageType.CONNECT: + packet = transport.recv().next(); + # If websocket is available, upgrade to it immediately. # TODO(sean): We could run this on a separate thread for # maximum efficiency although that would require some @@ -496,7 +501,7 @@ class SocketIO(object): try: transport = self._upgrade(); break; - except: + except Exception, e: pass; time.sleep(1); num_retries += 1; @@ -504,7 +509,7 @@ class SocketIO(object): if num_retries == MAX_UPGRADE_RETRIES: _log.warn("[websocket] Failed to upgrade to websocket"); - # Update namespaces + # Update namespaces for path, namespace in self._namespace_by_path.items(): namespace._transport = transport transport.connect(path) diff --git a/socketIO_client/transports.py b/socketIO_client/transports.py index 96a4045..f62e997 100644 --- a/socketIO_client/transports.py +++ b/socketIO_client/transports.py @@ -133,12 +133,13 @@ class WebsocketTransport(_AbstractTransport): base_url, parser.ENGINEIO_PROTOCOL, socketIO_session.id) try: - _log.debug("[websocket] Connecting"); - self._connection = websocket.create_connection(self._url) + _log.debug("[websocket] Connecting to: %s" % self._url); + self._connection = websocket.create_connection(self._url); except socket.timeout as e: raise ConnectionError(e) except socket.error as e: raise ConnectionError(e) + self._connection.settimeout(TIMEOUT_IN_SECONDS) @property From ef64649a2a4fe22a72dfdd2de96f1b1d5970c15e Mon Sep 17 00:00:00 2001 From: Sean Arietta Date: Thu, 25 Dec 2014 23:27:19 -0800 Subject: [PATCH 20/22] Added support for forcing websocket connections if available. Also fixed a small bug that seemed to be causing a race condition on first connections due to a failure to consume a packet on opening the default namespace --- socketIO_client/__init__.py | 12 +++++++++--- socketIO_client/transports.py | 1 + 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index 2b45c25..8188bec 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -138,11 +138,16 @@ class SocketIO(object): """ def __init__( - self, host, port=None, Namespace=BaseNamespace, - wait_for_connection=True, **kw): + self, host, + port = None, + Namespace = BaseNamespace, + wait_for_connection = True, + force_websockets_if_available = True, + **kw): self.is_secure, self.base_url = _parse_host(host, port) self.wait_for_connection = wait_for_connection self._namespace_by_path = {} + self.force_websockets_if_available = force_websockets_if_available; self.kw = kw self.__transport = None; @@ -161,6 +166,7 @@ class SocketIO(object): # This sets of a chain of events that attempts to connect to # the server at the base namespace. self.define(Namespace) + self._transport.connect(""); # Events that fail to emit due to connection errors will be # placed in this 'queue' and re-sent automatically upon @@ -497,7 +503,7 @@ class SocketIO(object): # synchronization to ensure buffers are flushed, etc. num_retries = 0; if "websocket" in self.session.server_supported_transports: - while num_retries < MAX_UPGRADE_RETRIES: + while num_retries < MAX_UPGRADE_RETRIES or self.force_websockets_if_available: try: transport = self._upgrade(); break; diff --git a/socketIO_client/transports.py b/socketIO_client/transports.py index f62e997..ad88ebb 100644 --- a/socketIO_client/transports.py +++ b/socketIO_client/transports.py @@ -61,6 +61,7 @@ class _AbstractTransport(object): self._packets.append(packet); else: self.send_packet(PacketType.OPEN, path, data); + self.recv().next(); def send_heartbeat(self): self.send_packet(PacketType.PING) From 2a43420e1b503100ea51cbe44d4db6f3b447183c Mon Sep 17 00:00:00 2001 From: Sean Arietta Date: Wed, 7 Jan 2015 13:14:19 -0800 Subject: [PATCH 21/22] Added scripts to start and kill test server --- kill-test-server.sh | 18 ++++++++++++++++++ start-test-server.sh | 25 +++++++++++++++++++++++++ 2 files changed, 43 insertions(+) create mode 100755 kill-test-server.sh create mode 100755 start-test-server.sh diff --git a/kill-test-server.sh b/kill-test-server.sh new file mode 100755 index 0000000..6823405 --- /dev/null +++ b/kill-test-server.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +pid=$(ps ax | grep node | grep serve_tests | grep -v grep | awk '{print $1}'); +if [[ "$pid" == "" ]]; then + echo "Server not started." + exit 0; +fi + +kill ${pid} + +# Ensure it's dead... Jim +pid=$(ps ax | grep node | grep serve_tests | grep -v grep | awk '{print $1}'); +if [[ "$pid" != "" ]]; then + echo "Server didn't die :(." + exit 1; +else + exit 0; +fi diff --git a/start-test-server.sh b/start-test-server.sh new file mode 100755 index 0000000..05f832b --- /dev/null +++ b/start-test-server.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +pid=$(ps ax | grep node | grep serve_tests | grep -v grep | awk '{print $1}'); +if [[ "$pid" != "" ]]; then + #echo "Server already started." + exit 0; +fi + +dir=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) + +# Start the server (pipe output to server.log) +node ${dir}/serve_tests.js &> ${dir}/server.log & +pid=$!; + +# Wait a second so the server has a chance to start. +sleep 1; + +check=$(ps ax | grep ${pid} | grep -v grep); + +if [[ "$check" == "" ]]; then + echo "Server failed to start"; + exit 1; +else + exit 0; +fi From 8e23fc73a5582d9a9b8ff2fcf12da2165650f295 Mon Sep 17 00:00:00 2001 From: Sean Arietta Date: Sat, 31 Jan 2015 21:54:04 -0800 Subject: [PATCH 22/22] Switched from multiprocessing to thread --- socketIO_client/__init__.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index 8188bec..3a8ecee 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -2,10 +2,10 @@ from collections import namedtuple import copy import logging import json -import multiprocessing import parser from parser import Message, Packet, MessageType, PacketType import requests +import threading import time try: @@ -479,10 +479,11 @@ class SocketIO(object): self._terminate_heartbeat(); _log.debug("[start heartbeat pacemaker]"); - self.heartbeat_terminator = multiprocessing.Event(); - self.heartbeat_thread = multiprocessing.Process( + self.heartbeat_terminator = threading.Event(); + self.heartbeat_thread = threading.Thread( target = _make_heartbeat_pacemaker, args = (self.heartbeat_terminator, transport, self.session.heartbeat_interval / 2)); + self.heartbeat_thread.daemon = True; self.heartbeat_thread.start(); def _get_transport(self):