Updated implementation to allow callbacks and arg handling that is consistent with reference javascript implementation.

This commit is contained in:
Sean Arietta 2014-12-23 02:19:41 -08:00
commit 66e563acc3
2 changed files with 45 additions and 28 deletions

View file

@ -27,7 +27,6 @@ _log = logging.getLogger(__name__)
PROTOCOL_VERSION = 1
RETRY_INTERVAL_IN_SECONDS = 1
class BaseNamespace(object):
'Define client behavior'
@ -46,11 +45,7 @@ class BaseNamespace(object):
def emit(self, event, *args, **kw):
callback, args = find_callback(args, kw)
if callback is not None:
_log.warn("Callback was specified but is not supported.");
self._transport.emit(self.path, event, args, None)
self._transport.emit(self.path, event, args, callback)
def disconnect(self):
self._transport.disconnect(self.path)
@ -231,7 +226,7 @@ class SocketIO(object):
- Omit seconds, i.e. call wait() without arguments, to wait forever.
"""
warning_screen = _yield_warning_screen(seconds)
for elapsed_time in warning_screen:
for elapsed_time in warning_screen:
# We will end up here in the case that we
# disconnected.
if len(self.reconnect_paths) > 0:
@ -266,6 +261,7 @@ class SocketIO(object):
except StopIteration:
_log.warn(warning)
self._terminate_heartbeat();
_log.debug("[wait canceled]");
def _process_events(self):
@ -476,13 +472,30 @@ class SocketIO(object):
find_event_callback(event)(*args);
def _on_ack(self, packet, find_event_callback):
event = packet.payload.message[0];
args = packet.payload.message[1:] if len(packet.payload.message) > 1 else [];
"""Handles ACK from server.
There are two types of ACKs. The first is when this client
requests that the server responds with an ACK upon execution
of a remote function (specified in the server via
socketio.on()).
The second type is when the server requests that this client
acknowledges that a local function has been executed.
Both are handled the same way from the client's standpoint,
but in latter case the server will actually send along the
event name and the args, but they are currently ignored.
"""
#event = packet.payload.message[0];
packet_id = packet.payload.id;
try:
ack_callback = self._transport.get_ack_callback(packet_id)
ack_callback = self._transport.get_ack_callback(str(packet_id))
except KeyError:
_log.warn("Could not find callback function for packet id: %d" % packet_id);
return
args = packet.payload.message[1:] if len(packet.payload.message) > 1 else [];
ack_callback(*args)
def _on_error(self, packet, find_event_callback):

View file

@ -65,28 +65,30 @@ class _AbstractTransport(object):
def send_heartbeat(self):
self.send_packet(PacketType.PING)
def message(self, path, data, callback):
if isinstance(data, basestring):
code = 3
else:
code = 4
data = json.dumps(data, ensure_ascii=False)
self.send_packet(code, path, data, callback)
def emit(self, path, event, args, callback):
message = Message(MessageType.EVENT, [event, args], path);
self.send_packet(PacketType.MESSAGE, path, message.encode_as_json(), callback)
message_id = self.set_ack_callback(callback) if callback else None;
message = "";
if len(args) > 0:
data = list(args);
data.insert(0, event);
message = Message(MessageType.EVENT, data, path, message_id = message_id);
else:
message = Message(MessageType.EVENT, [event], path, message_id = message_id);
self.send_packet(PacketType.MESSAGE, path, message.encode_as_json())
def ack(self, path, packet_id, *args):
_log.debug("[ack] Sending ACK for packet: %d" % packet_id);
message = Message(MessageType.ACK, "", path, "", packet_id);
data = "";
if len(args) > 0:
data = args;
message = Message(MessageType.ACK, data, path, "", packet_id);
packet = Packet(PacketType.MESSAGE, message);
self.send_engineio_packet(packet);
def noop(self, path=''):
self.send_packet(PacketType.NOOP, path)
def send_packet(self, code, path='', data='', callback=None):
def send_packet(self, code, path='', data=''):
packet_text = Packet(code, data).encode_as_string();
self.send(packet_text)
@ -105,11 +107,13 @@ class _AbstractTransport(object):
def set_ack_callback(self, callback):
'Set callback to be called after server sends an acknowledgment'
self._packet_id += 1
_log.debug("Setting ACK for packet id: %d (%s) [%s]" % (self._packet_id, str(callback), str(self)));
self._callback_by_packet_id[str(self._packet_id)] = callback
return '%s+' % self._packet_id
return self._packet_id
def get_ack_callback(self, packet_id):
'Get callback to be called after server sends an acknowledgment'
_log.debug("Searching for ACK for packet id: %s [%s]" % (packet_id, str(self)));
callback = self._callback_by_packet_id[packet_id]
del self._callback_by_packet_id[packet_id]
return callback
@ -141,16 +145,16 @@ class WebsocketTransport(_AbstractTransport):
def connected(self):
return self._connection.connected
def send_message(self, message, callback = None):
def send_message(self, message):
packet_text = message.encode_as_string();
self.send(packet_text)
def send_engineio_packet(self, packet, callback=None):
def send_engineio_packet(self, packet):
packet_text = packet.encode_as_string(for_websocket = True);
self.send(packet_text)
def send_packet(self, code, path="", data='', callback=None):
self.send_message(Message(code, data), callback);
def send_packet(self, code, path="", data=''):
self.send_message(Message(code, data));
def send(self, packet_text):
_log.debug("[websocket] send: " + str(packet_text));
@ -211,7 +215,7 @@ class XHR_PollingTransport(_AbstractTransport):
def _params(self):
return dict(t=int(time.time() * 1000))
def send_engineio_packet(self, packet, callback=None):
def send_engineio_packet(self, packet):
packet_text = packet.encode_as_string(for_websocket = False);
self.send(packet_text)