Medium-sized refactor. Converted all methods away from old 'code' paradigm to use the PacketType and MessageType enums directly

This commit is contained in:
Sean Arietta 2014-12-22 17:43:42 -08:00
commit 16c64437d3
2 changed files with 87 additions and 102 deletions

View file

@ -67,14 +67,6 @@ class BaseNamespace(object):
'Called after server disconnects; you can override this method'
_log.debug('%s [disconnect]', self.path)
def on_heartbeat(self):
'Called after server sends a heartbeat; you can override this method'
_log.debug('%s [heartbeat]', self.path)
def on_message(self, data):
'Called after server sends a message; you can override this method'
_log.info('%s [message] %s', self.path, data)
def on_event(self, event, *args):
"""
Called after server sends an event; you can override this method.
@ -88,14 +80,22 @@ class BaseNamespace(object):
callback(*args)
_log.info('%s [event] %s(%s)', self.path, event, ', '.join(arguments))
def on_error(self, reason, advice):
def on_error(self, reason):
'Called after server sends an error; you can override this method'
_log.info('%s [error] %s', self.path, advice)
_log.info('%s [error] %s', self.path, reason)
def on_noop(self):
'Called after server sends a noop; you can override this method'
_log.info('%s [noop]', self.path)
def on_ping(self):
'Called after server sends a ping; you can override this method'
_log.info('%s [ping]', self.path)
def on_pong(self):
'Called after server sends a pong; you can override this method'
_log.info('%s [pong]', self.path)
def on_open(self, *args):
_log.info('%s [open] %s', self.path, args)
@ -249,13 +249,49 @@ class SocketIO(object):
except PacketError as e:
_log.warn('[packet error] %s', e)
def _get_message_delegate(self, code):
try:
return {
MessageType.CONNECT: self._on_connect,
MessageType.DISCONNECT: self._on_disconnect,
MessageType.EVENT: self._on_event,
MessageType.ACK: self._on_ack,
MessageType.ERROR: self._on_error,
MessageType.BINARY_EVENT: self._on_binary_event,
MessageType.BINARY_ACK: self._on_binary_ack
}[code]
except KeyError:
raise PacketError('unexpected code (%s)' % code)
def _get_packet_delegate(self, code):
try:
return {
PacketType.OPEN: self._on_open,
PacketType.CLOSE: self._on_close,
PacketType.PING: self._on_ping,
PacketType.PONG: self._on_pong,
#PacketType.MESSAGE: self._on_message, Handled by other delegates
PacketType.UPGRADE: self._on_upgrade,
PacketType.NOOP: self._on_noop
}[code]
except KeyError:
raise PacketError('unexpected code (%s)' % code)
def _process_packet(self, packet):
code, packet_id, path, data, p = packet
_log.debug("[process packet] %s" % str(packet));
path = packet.payload.path if packet.type == PacketType.MESSAGE else "";
namespace = self.get_namespace(path)
code = packet.payload.type if packet.type == PacketType.MESSAGE else packet.type;
delegate = None;
try:
delegate = self._get_delegate(code)
except:
if packet.type == PacketType.MESSAGE:
_log.debug("[process packet] Handling message");
delegate = self._get_message_delegate(packet.payload.type);
else:
delegate = self._get_packet_delegate(packet.type);
except Exception as e:
_log.warn("[process packet] Could not find delegate for packet: " + str(e));
pass;
if delegate is not None:
delegate(packet, namespace._find_event_callback)
@ -310,9 +346,7 @@ class SocketIO(object):
websocket.send_packet(PacketType.PING, "", "probe");
for packet in websocket.recv_packet():
_log.debug("[websocket] Packet: %s" % str(packet));
(code, packet_id, path, data, p) = packet;
if code == PacketType.PONG:
packet = p;
if packet.type == PacketType.PONG:
_log.debug("[PONG] %s" % repr(packet));
self.heartbeat_terminator.set();
@ -367,49 +401,39 @@ class SocketIO(object):
except KeyError:
raise PacketError('unexpected namespace path (%s)' % path)
def _get_delegate(self, code):
try:
return {
'0': self._on_disconnect,
'1': self._on_connect,
'2': self._on_heartbeat,
'3': self._on_message,
'4': self._on_json,
'5': self._on_event,
'6': self._on_ack,
'7': self._on_error,
'8': self._on_noop,
}[code]
except KeyError:
raise PacketError('unexpected code (%s)' % code)
#################################################################
# Handlers for EngineIO packet types (PacketType in parser.py)
#################################################################
def _on_disconnect(self, packet, find_event_callback):
find_event_callback('disconnect')()
def _on_open(self, packet, find_event_callback):
find_event_callback('open')()
def _on_close(self, packet, find_event_callback):
find_event_callback('close')()
def _on_ping(self, packet, find_event_callback):
find_event_callback('ping')()
def _on_pong(self, packet, find_event_callback):
find_event_callback('pong')()
def _on_upgrade(self, packet, find_event_callback):
find_event_callback('close')()
def _on_noop(self, packet, find_event_callback):
find_event_callback('noop')()
#################################################################
# Handlers for SocketIO "packet" types (MessageType in parser.py)
#################################################################
def _on_connect(self, packet, find_event_callback):
find_event_callback('connect')()
def _on_heartbeat(self, packet, find_event_callback):
find_event_callback('heartbeat')()
def _on_message(self, packet, find_event_callback):
code, packet_id, path, data = packet
args = [data]
if packet_id:
args.append(self._prepare_to_send_ack(path, packet_id))
find_event_callback('message')(*args)
def _on_json(self, packet, find_event_callback):
code, packet_id, path, data = packet
args = [json.loads(data)]
if packet_id:
args.append(self._prepare_to_send_ack(path, packet_id))
find_event_callback('message')(*args)
def _on_disconnect(self, packet, find_event_callback):
find_event_callback('disconnect')()
def _on_event(self, packet, find_event_callback):
code, packet_id, path, data, p = packet
packet = p;
# Accoding to the documentation
# (https://github.com/automattic/socket.io-protocol#event),
# the event name is the first entry in the message array, and
@ -421,29 +445,26 @@ class SocketIO(object):
if packet.payload.id is not None:
args.append(self._prepare_to_send_ack(path, packet_id))
try:
self._namespace_by_path[packet.payload.path]._find_event_callback(event)(*args);
except KeyError:
_log.error("Could not handle event for unknown path: %s" % packet.payload.path);
find_event_callback(event)(*args);
def _on_ack(self, packet, find_event_callback):
code, packet_id, path, data = packet
data_parts = data.split('+', 1)
packet_id = data_parts[0]
event = packet.payload.message[0];
args = packet.payload.message[1:] if len(packet.payload.message) > 1 else [];
packet_id = packet.payload.id;
try:
ack_callback = self._transport.get_ack_callback(packet_id)
except KeyError:
return
args = json.loads(data_parts[1]) if len(data_parts) > 1 else []
ack_callback(*args)
def _on_error(self, packet, find_event_callback):
code, packet_id, path, data = packet
reason, advice = data.split('+', 1)
find_event_callback('error')(reason, advice)
find_event_callback('error')(packet.payload.message)
def _on_noop(self, packet, find_event_callback):
find_event_callback('noop')()
def _on_binary_event(self, packet, find_event_callback):
raise PacketError("Don't know how to handle binary events yet");
def _on_binary_ack(self, packet, find_event_callback):
raise PacketError("Don't know how to handle binary acks yet");
def _prepare_to_send_ack(self, path, packet_id):
'Return function that acknowledges the server'

View file

@ -51,8 +51,6 @@ class _AbstractTransport(object):
while not responded:
for packet in self.recv_packet():
_log.debug("[connect wait] Waiting for confirmation");
(code, packet_id, ignore, data, p) = packet;
packet = p;
if (packet.type == PacketType.MESSAGE
and packet.payload.type == MessageType.CONNECT
and packet.payload.path == path):
@ -99,41 +97,7 @@ class _AbstractTransport(object):
except IndexError:
pass
for packet in self.recv():
code, packet_id, path, data = None, None, '', None
if packet.type is PacketType.OPEN:
code = '1';
elif packet.type is PacketType.CLOSE:
code = '0';
elif packet.type is PacketType.PING:
code = '2';
elif packet.type is PacketType.PONG:
code = PacketType.PONG;
elif packet.type is PacketType.UPGRADE:
_log.warn("Don't know how to handle upgrade packets");
yield code, packet_id, path, data, packet;
elif packet.type is PacketType.NOOP:
code = '8';
elif packet.type is PacketType.MESSAGE:
if packet.payload.type is MessageType.CONNECT:
code = '1';
elif packet.payload.type is MessageType.DISCONNECT:
code = '0';
elif packet.payload.type is MessageType.EVENT:
code = '5';
data = json.dumps({"name": packet.payload.message[0], "args": []});
elif packet.payload.type is MessageType.ACK:
code = '6';
elif packet.payload.type is MessageType.ERROR:
code = '7';
else:
_log.warn("Don't know how to handle message type: %d" % packet.payload.type);
yield code, packet_id, path, data, packet;
else:
_log.warn("Don't know how to handle packet type: %d" % packet.type);
yield code, packet_id, path, data, packet;
yield code, packet_id, path, data, packet
yield packet
def _enqueue_packet(self, packet):
self._packets.append(packet)