297 lines
12 KiB
Python
297 lines
12 KiB
Python
# ______________________________________________________________________
|
|
# Module imports
|
|
|
|
from __future__ import absolute_import
|
|
|
|
import opcode
|
|
import pprint
|
|
import inspect
|
|
|
|
from . import opcode_util
|
|
from .bytecode_visitor import BasicBlockVisitor, BenignBytecodeVisitorMixin
|
|
from .control_flow import ControlFlowGraph
|
|
|
|
# ______________________________________________________________________
|
|
# Module data
|
|
|
|
# The following opcodes branch based on the control (a.k.a. frame)
|
|
# stack:
|
|
RETURN_VALUE, CONTINUE_LOOP, BREAK_LOOP, END_FINALLY, RAISE_VARARGS = (
|
|
opcode.opmap[opname] for opname in (
|
|
'RETURN_VALUE', 'CONTINUE_LOOP', 'BREAK_LOOP', 'END_FINALLY',
|
|
'RAISE_VARARGS'))
|
|
|
|
# The following opcodes push a new frame on the control stack:
|
|
SETUP_EXCEPT, SETUP_FINALLY, SETUP_LOOP, SETUP_WITH = (
|
|
opcode.opmap.get(opname, None) for opname in (
|
|
'SETUP_EXCEPT', 'SETUP_FINALLY', 'SETUP_LOOP', 'SETUP_WITH'))
|
|
|
|
WHY_NOT = 1
|
|
WHY_EXCEPTION = WHY_NOT << 1
|
|
WHY_RERAISE = WHY_EXCEPTION << 1 # We don't worry about this code
|
|
# during CFA, since its primary use
|
|
# is to log traceback information;
|
|
# WHY_RERAISE's bytecode control flow
|
|
# is the same as WHY_EXCEPTION.
|
|
WHY_RETURN = WHY_RERAISE << 1
|
|
WHY_BREAK = WHY_RETURN << 1
|
|
WHY_CONTINUE = WHY_BREAK << 1
|
|
WHY_YIELD = WHY_CONTINUE << 1
|
|
|
|
# ______________________________________________________________________
|
|
# Class definition(s)
|
|
|
|
class ControlFlowBuilder (BenignBytecodeVisitorMixin, BasicBlockVisitor):
|
|
'''Visitor responsible for traversing a bytecode basic block map and
|
|
building a control flow graph (CFG).
|
|
|
|
The primary purpose of this transformation is to create a CFG,
|
|
which is used by later transformers for dataflow analysis.
|
|
'''
|
|
def visit (self, flow, nargs = 0, *args, **kws):
|
|
'''Given a map of bytecode basic blocks, and an optional
|
|
number of arguments, return a
|
|
:py:class:`llpython.control_flow.ControlFlowGraph` instance
|
|
describing the full control flow of the bytecode flow.'''
|
|
self.nargs = nargs
|
|
ret_val = super(ControlFlowBuilder, self).visit(flow, *args, **kws)
|
|
del self.nargs
|
|
return ret_val
|
|
|
|
def enter_blocks (self, blocks):
|
|
super(ControlFlowBuilder, self).enter_blocks(blocks)
|
|
self.blocks = blocks
|
|
self.block_list = list(blocks.keys())
|
|
self.block_list.sort()
|
|
self.cfg = ControlFlowGraph()
|
|
self.control_stack = []
|
|
self.continue_targets = {} # Map from SETUP_LOOP addresses to
|
|
# start of loop addresses, based on
|
|
# observed CONTINUE_LOOP opcodes.
|
|
self.break_targets = set() # Set of SETUP_LOOP address that
|
|
# have at least one observed
|
|
# BREAK_LOOP opcode corresponding
|
|
# to them.
|
|
for block in self.block_list:
|
|
self.cfg.add_block(block, blocks[block])
|
|
|
|
def exit_blocks (self, blocks):
|
|
super(ControlFlowBuilder, self).exit_blocks(blocks)
|
|
assert self.blocks == blocks
|
|
self.cfg.unlink_unreachables()
|
|
self.cfg.compute_dataflow()
|
|
self.cfg.update_for_ssa()
|
|
ret_val = self.cfg
|
|
del self.continue_targets
|
|
del self.control_stack
|
|
del self.cfg
|
|
del self.block_list
|
|
del self.blocks
|
|
return ret_val
|
|
|
|
def enter_block (self, block):
|
|
self.block = block
|
|
assert block in self.cfg.blocks
|
|
if block == 0:
|
|
for local_index in range(self.nargs):
|
|
self.op_STORE_FAST(0, opcode.opmap['STORE_FAST'], local_index)
|
|
return True
|
|
|
|
def _get_next_block (self, block):
|
|
return self.block_list[self.block_list.index(block) + 1]
|
|
|
|
def _generate_handler_edge (self, block, i, op, arg, why):
|
|
"""Given a reason (corresponding to the why code in
|
|
Python/ceval.c), interrupt control flow, possibly using the
|
|
control flow (a.k.a. frame) stack to calculate the next
|
|
target.
|
|
|
|
Returns True if an edge was added to the CFG, False otherwise.
|
|
Based on the opcode the return result may mean different
|
|
things (for example: if why == WHY_RETURN, then a False return
|
|
result means the function returned, and no edge was
|
|
generated)."""
|
|
ret_val = False
|
|
if len(self.control_stack) > 0:
|
|
handlers = set((SETUP_FINALLY, SETUP_WITH))
|
|
if why == WHY_EXCEPTION:
|
|
handlers.add(SETUP_EXCEPT)
|
|
reversed_stack = reversed(self.control_stack)
|
|
target = None
|
|
for handler_i, handler_op, handler_arg in reversed_stack:
|
|
if handler_op in handlers:
|
|
target = handler_i + handler_arg + 3
|
|
elif handler_op == SETUP_LOOP:
|
|
if why == WHY_CONTINUE:
|
|
# Only generate a WHY_CONTINUE edge if a continue
|
|
# statement has been observed for this loop.
|
|
if handler_i not in self.continue_targets:
|
|
break
|
|
elif op == CONTINUE_LOOP:
|
|
target = arg
|
|
assert target == self.continue_targets[handler_i]
|
|
else:
|
|
target = self.continue_targets[handler_i]
|
|
elif why == WHY_BREAK:
|
|
# Only generate a WHY_BREAK edge if a break
|
|
# statement has been observed for this loop.
|
|
if handler_i not in self.break_targets:
|
|
break
|
|
else:
|
|
target = handler_i + handler_arg + 3
|
|
if target is not None:
|
|
self.cfg.add_edge(block, target)
|
|
ret_val = True
|
|
break
|
|
return ret_val
|
|
|
|
def exit_block (self, block):
|
|
assert block == self.block
|
|
del self.block
|
|
i, op, arg = self.blocks[block][-1]
|
|
goto_next = False
|
|
if op == RETURN_VALUE:
|
|
self._generate_handler_edge(block, i, op, arg, WHY_RETURN)
|
|
elif op == CONTINUE_LOOP:
|
|
branched = self._generate_handler_edge(block, i, op, arg,
|
|
WHY_CONTINUE)
|
|
assert branched, ("Attempted to continue outside of loop %r" %
|
|
(self.blocks[block][-1],))
|
|
elif op == BREAK_LOOP:
|
|
branched = self._generate_handler_edge(block, i, op, arg,
|
|
WHY_BREAK)
|
|
assert branched, ("Attempted to break outside of loop %r" %
|
|
(self.blocks[block][-1],))
|
|
elif op == RAISE_VARARGS:
|
|
self._generate_handler_edge(block, i, op, arg, WHY_EXCEPTION)
|
|
elif op == END_FINALLY:
|
|
# The following does a lot of redundant traversal of the
|
|
# simulated frame stack, but it works, and keeps a lot of
|
|
# special case logic out of _generate_handler_edge().
|
|
self._generate_handler_edge(block, i, op, arg, WHY_EXCEPTION)
|
|
self._generate_handler_edge(block, i, op, arg, WHY_RETURN)
|
|
self._generate_handler_edge(block, i, op, arg, WHY_BREAK)
|
|
self._generate_handler_edge(block, i, op, arg, WHY_CONTINUE)
|
|
goto_next = True # why == WHY_NOT
|
|
elif op in opcode.hasjabs:
|
|
self.cfg.add_edge(block, arg)
|
|
elif op in opcode.hasjrel:
|
|
self.cfg.add_edge(block, i + arg + 3)
|
|
else:
|
|
goto_next = True
|
|
if op in opcode_util.hascbranch or goto_next:
|
|
self.cfg.add_edge(block, self._get_next_block(block))
|
|
|
|
# ____________________________________________________________
|
|
# LOAD/STORE_FAST
|
|
|
|
def op_LOAD_FAST (self, i, op, arg, *args, **kws):
|
|
self.cfg.blocks_reads[self.block].add(arg)
|
|
return super(ControlFlowBuilder, self).op_LOAD_FAST(i, op, arg, *args,
|
|
**kws)
|
|
|
|
def op_STORE_FAST (self, i, op, arg, *args, **kws):
|
|
self.cfg.writes_local(self.block, i, arg)
|
|
return super(ControlFlowBuilder, self).op_STORE_FAST(i, op, arg, *args,
|
|
**kws)
|
|
|
|
# ____________________________________________________________
|
|
# *_LOOP: Special loop control flow.
|
|
|
|
def _get_current_loop (self):
|
|
for handler in reversed(self.control_stack):
|
|
if handler[1] == SETUP_LOOP:
|
|
return handler
|
|
return None, None, None
|
|
|
|
def op_BREAK_LOOP (self, i, op, arg, *args, **kws):
|
|
handler_i, _, _ = self._get_current_loop()
|
|
assert handler_i is not None
|
|
self.break_targets.add(handler_i)
|
|
|
|
def op_CONTINUE_LOOP (self, i, op, arg, *args, **kws):
|
|
"""
|
|
CONTINUE_LOOP has to be handled differently than BREAK_LOOP,
|
|
since its argument specifies where the start of the loop is
|
|
(in the case of for-loops, FOR_ITER defines the true start of
|
|
the loop, instead of SETUP_LOOP.)
|
|
"""
|
|
handler_i, _, _ = self._get_current_loop()
|
|
assert handler_i is not None
|
|
if handler_i in self.continue_targets:
|
|
assert arg == self.continue_targets[handler_i]
|
|
else:
|
|
self.continue_targets[handler_i] = arg
|
|
|
|
# ____________________________________________________________
|
|
# POP_BLOCK
|
|
|
|
def op_POP_BLOCK (self, i, op, arg, *args, **kws):
|
|
self.control_stack.pop()
|
|
return super(ControlFlowBuilder, self).op_POP_BLOCK(i, op, arg, *args,
|
|
**kws)
|
|
# ____________________________________________________________
|
|
# SETUP_*
|
|
|
|
def op_SETUP_EXCEPT (self, i, op, arg, *args, **kws):
|
|
self.control_stack.append((i, op, arg))
|
|
return super(ControlFlowBuilder, self).op_SETUP_EXCEPT(i, op, arg,
|
|
*args, **kws)
|
|
|
|
def op_SETUP_FINALLY (self, i, op, arg, *args, **kws):
|
|
self.control_stack.append((i, op, arg))
|
|
return super(ControlFlowBuilder, self).op_SETUP_FINALLY(i, op, arg,
|
|
*args, **kws)
|
|
|
|
def op_SETUP_LOOP (self, i, op, arg, *args, **kws):
|
|
self.control_stack.append((i, op, arg))
|
|
return super(ControlFlowBuilder, self).op_SETUP_LOOP(i, op, arg, *args,
|
|
**kws)
|
|
|
|
def op_SETUP_WITH (self, i, op, arg, *args, **kws):
|
|
self.control_stack.append((i, op, arg))
|
|
return super(ControlFlowBuilder, self).op_SETUP_WITH(i, op, arg, *args,
|
|
**kws)
|
|
|
|
# ____________________________________________________________
|
|
# Class convenience methods
|
|
|
|
@classmethod
|
|
def build_cfg_from_co(cls, co_obj):
|
|
return cls().visit(opcode_util.build_basic_blocks(co_obj),
|
|
co_obj.co_argcount)
|
|
|
|
@classmethod
|
|
def build_cfg(cls, func):
|
|
co_obj = opcode_util.get_code_object(func)
|
|
return cls.build_cfg_from_co(co_obj)
|
|
|
|
# ______________________________________________________________________
|
|
|
|
def build_cfg (func):
|
|
'''Given a Python function, create a bytecode flow, visit the flow
|
|
object, and return a control flow graph.'''
|
|
return ControlFlowBuilder.build_cfg(func)
|
|
|
|
# ______________________________________________________________________
|
|
# Main (self-test) routine
|
|
|
|
def main (*args, **kws):
|
|
def _visit(obj):
|
|
print("_" * 70)
|
|
print(obj)
|
|
if inspect.isfunction(obj):
|
|
cfg = build_cfg(obj)
|
|
else:
|
|
cfg = ControlFlowBuilder.build_cfg_from_co(obj)
|
|
cfg.pprint()
|
|
return opcode_util.visit_code_args(_visit, *args, **kws)
|
|
|
|
# ______________________________________________________________________
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
main(*sys.argv[1:])
|
|
|
|
# ______________________________________________________________________
|
|
# End of byte_control.py
|