Skip to content

Commit

Permalink
fix: some algo simplifacations
Browse files Browse the repository at this point in the history
  • Loading branch information
15r10nk committed Mar 18, 2023
1 parent 410075b commit e5d175c
Showing 1 changed file with 115 additions and 135 deletions.
250 changes: 115 additions & 135 deletions executing/_index_node_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
Any,
cast,
)
from dataclasses import dataclass


from .executing import (
EnhancedAST,
NotOneValueFound,
Expand Down Expand Up @@ -64,24 +67,14 @@ def all_equal(seq: Iterator[Any]) -> bool:
return all(s == e for e in rest)


@dataclass
class InstFingerprint:
__slots__ = ("opcode", "code_value", "line", "offset", "index", "next")

def __init__(
self,
opcode: int,
value: Any,
line: Optional[int],
offset: int,
index: int,
next: Tuple[int, ...],
):
self.opcode = opcode
self.code_value = value if isinstance(value, CodeType) else None
self.line = line
self.offset = offset
self.index = index
self.next = next
opcode: int
value: Any
line: Optional[int]
offset: int
index: int
next: Tuple[int, ...]

@property
def opname(self) -> str:
Expand Down Expand Up @@ -153,131 +146,39 @@ def __init__(
):
self.graph: Dict[int, InstFingerprint] = {}

instructions = []
self.instructions: List[dis.Instruction] = []

last_lineno = None
for inst in dis.get_instructions(code):
if inst.starts_line:
last_lineno = inst.starts_line
instructions.append(inst._replace(starts_line=last_lineno))

self.instructions: List[dis.Instruction] = instructions

assert all(i.offset == j * 2 for j, i in enumerate(instructions))

def skip(offset: int) -> Union[int, None]:
done = set()
while offset // 2 < len(instructions) and offset not in done:
done.add(offset)
inst = instructions[offset // 2]
if inst.opcode in skip_opcodes:
offset += 2
elif inst.opcode in (opcodes.JUMP_FORWARD, opcodes.JUMP_ABSOLUTE):
offset = inst.argval
else:
return offset

return None

def next_opcode(offset: int) -> dis.Instruction:
done = set()
inst = None
while offset // 2 < len(instructions) and offset not in done:
done.add(offset)
inst = instructions[offset // 2]
if inst.opcode in (opcodes.NOP, opcodes.EXTENDED_ARG):
offset += 2
elif inst.opcode in (opcodes.JUMP_FORWARD, opcodes.JUMP_ABSOLUTE):
offset = inst.argval
else:
return inst
assert inst is not None
return inst

def optimize_jump(inst: dis.Instruction) -> Tuple[int, int]:
pop = False
if inst.opcode == opcodes.JUMP_IF_FALSE_OR_POP:
# TOS=False
jump_or_pop = opcodes.JUMP_IF_FALSE_OR_POP
pop_and_no_jump = [
opcodes.JUMP_IF_TRUE_OR_POP,
opcodes.POP_JUMP_IF_TRUE,
]
self.instructions.append(inst._replace(starts_line=last_lineno))

pop_and_jump = opcodes.POP_JUMP_IF_FALSE

elif inst.opcode == opcodes.JUMP_IF_TRUE_OR_POP:
# TOS=True
jump_or_pop = opcodes.JUMP_IF_TRUE_OR_POP
pop_and_no_jump = [
opcodes.JUMP_IF_FALSE_OR_POP,
opcodes.POP_JUMP_IF_FALSE,
]

pop_and_jump = opcodes.POP_JUMP_IF_TRUE

else:
assert False, inst

while inst.opcode == jump_or_pop:
inst = next_opcode(inst.argval)

if inst.opcode == pop_and_jump:
pop = True
inst = next_opcode(inst.argval)

if inst.opcode in pop_and_no_jump:
pop = True
inst = next_opcode(inst.offset + 2)

return (pop_and_jump if pop else jump_or_pop, inst.offset)

def optimize_tuple(inst: dis.Instruction) -> Optional[Tuple[int, Any, int]]:
"""
If tuples are loaded with LOAD_CONST or BUILD_TUPLE depends on linenumbers.
This function performes this optimization independent of the line numbers.
"""
if inst.opcode == opcodes.LOAD_CONST and not sys.version_info >= (3, 10):
values = []
while inst.opcode in (opcodes.LOAD_CONST, opcodes.EXTENDED_ARG):
if inst.opcode == opcodes.LOAD_CONST:
values.append(inst.argval)

if inst.offset // 2 == len(instructions):
break
inst = instructions[inst.offset // 2 + 1]

# BUILD_LIST is not 100% correct here but this saves us
# from performance problems on 3.8 in test_extended_arg()
if inst.opcode in (
opcodes.BUILD_TUPLE,
opcodes.BUILD_LIST,
) and inst.argval == len(values):
return (opcodes.LOAD_CONST, tuple(values), inst.offset + 2)

return None
assert all(i.offset == j * 2 for j, i in enumerate(self.instructions))

def get_next(n: int) -> List[Tuple[int, Any]]:
i = current_offset // 2
return [(inst.opcode, inst.argval) for inst in instructions[i : i + n]]
return [(inst.opcode, inst.argval) for inst in self.instructions[i : i + n]]

self.start = skip(0)
self.start = self.skip(0)

todo = [self.start] if self.start is not None else []

while todo:
current_offset = todo.pop()
next_offset = current_offset + 2
inst = instructions[current_offset // 2]
inst = self.instructions[current_offset // 2]

opcode = inst.opcode
value = inst.argval

optimize_result = optimize_tuple(inst)
optimize_result = self.optimize_tuple(inst)

if optimize_result is not None:
opcode, value, next_offset = optimize_result

if get_next(2) == [(opcodes.BUILD_TUPLE, 2), (opcodes.UNPACK_SEQUENCE, 2)]:
# normalize tuple optimization
opcode = opcodes.ROT_TWO
value = None
next_offset += 2
Expand All @@ -288,31 +189,27 @@ def add_offset(offset: Union[int, None]) -> None:
if offset is not None:
next_offsets.append(offset)

add_offset(skip(next_offset))
add_offset(self.skip(next_offset))

if opcode in hasjcond:
# this is also optimized by the python compiler if both instructions are on the same line
# see https://github.com/python/cpython/issues/100378#issuecomment-1360381947
next_offset = inst.argval
if inst.opcode in [
opcodes.JUMP_IF_TRUE_OR_POP,
opcodes.JUMP_IF_FALSE_OR_POP,
]:
opcode, next_offset = optimize_jump(inst)
opcode, next_offset = self.optimize_jump(inst)

add_offset(skip(next_offset))
# jump destination can be different because of skipped instructions
value = "xxx"
add_offset(self.skip(next_offset))

if inst.opcode in (
opcodes.SETUP_FINALLY,
opcodes.SETUP_WITH,
opcodes.SETUP_ASYNC_WITH,
opcodes.FOR_ITER,
):
add_offset(skip(inst.argval))

if inst.opcode in dis.hasjabs or inst.opcode in dis.hasjrel:
value = "???"
add_offset(self.skip(inst.argval))

if inst.opcode in end_of_block:
next_offsets = []
Expand All @@ -330,6 +227,92 @@ def add_offset(offset: Union[int, None]) -> None:
next=tuple(next_offsets),
)

def skip(self, offset: int) -> Union[int, None]:
done = set()
while offset // 2 < len(self.instructions) and offset not in done:
done.add(offset)
inst = self.instructions[offset // 2]
if inst.opcode in skip_opcodes:
offset += 2
elif inst.opcode in (opcodes.JUMP_FORWARD, opcodes.JUMP_ABSOLUTE):
offset = inst.argval
else:
return offset

return None

def next_opcode(self, offset: int) -> dis.Instruction:
done = set()
inst = None
while offset // 2 < len(self.instructions) and offset not in done:
done.add(offset)
inst = self.instructions[offset // 2]
if inst.opcode in (opcodes.NOP, opcodes.EXTENDED_ARG):
offset += 2
elif inst.opcode in (opcodes.JUMP_FORWARD, opcodes.JUMP_ABSOLUTE):
offset = inst.argval
else:
return inst
assert inst is not None
return inst

def optimize_jump(self, inst: dis.Instruction) -> Tuple[int, int]:
pop = False
if inst.opcode == opcodes.JUMP_IF_FALSE_OR_POP:
# TOS=False
jump_or_pop = opcodes.JUMP_IF_FALSE_OR_POP
pop_and_no_jump = [opcodes.JUMP_IF_TRUE_OR_POP, opcodes.POP_JUMP_IF_TRUE]

pop_and_jump = opcodes.POP_JUMP_IF_FALSE

elif inst.opcode == opcodes.JUMP_IF_TRUE_OR_POP:
# TOS=True
jump_or_pop = opcodes.JUMP_IF_TRUE_OR_POP
pop_and_no_jump = [opcodes.JUMP_IF_FALSE_OR_POP, opcodes.POP_JUMP_IF_FALSE]

pop_and_jump = opcodes.POP_JUMP_IF_TRUE

else:
assert False, inst

while inst.opcode == jump_or_pop:
inst = self.next_opcode(inst.argval)

if inst.opcode == pop_and_jump:
pop = True
inst = self.next_opcode(inst.argval)

if inst.opcode in pop_and_no_jump:
pop = True
inst = self.next_opcode(inst.offset + 2)

return (pop_and_jump if pop else jump_or_pop, inst.offset)

def optimize_tuple(self, inst: dis.Instruction) -> Optional[Tuple[int, Any, int]]:
"""
If tuples are loaded with LOAD_CONST or BUILD_TUPLE depends on linenumbers.
This function performes this optimization independent of the line numbers.
"""
if inst.opcode == opcodes.LOAD_CONST and not sys.version_info >= (3, 10):
values = []
while inst.opcode in (opcodes.LOAD_CONST, opcodes.EXTENDED_ARG):
if inst.opcode == opcodes.LOAD_CONST:
values.append(inst.argval)

if inst.offset // 2 == len(self.instructions):
break
inst = self.instructions[inst.offset // 2 + 1]

# BUILD_LIST is not 100% correct here but this saves us
# from performance problems on 3.8 in test_extended_arg()
if inst.opcode in (
opcodes.BUILD_TUPLE,
opcodes.BUILD_LIST,
) and inst.argval == len(values):
return (opcodes.LOAD_CONST, tuple(values), inst.offset + 2)

return None


def merge_code_fingerprint(
code_a: CodeFingerprint, code_b: CodeFingerprint
Expand Down Expand Up @@ -419,10 +402,7 @@ def __init__(
# compile the code
# the inst.starts_line contains now the index
original_bc = compile(
cast(ast.Module, original_tree),
filename,
"exec",
dont_inherit=True,
cast(ast.Module, original_tree), filename, "exec", dont_inherit=True
)
index_bc = compile(
cast(ast.Module, index_tree), filename, "exec", dont_inherit=True
Expand Down Expand Up @@ -456,10 +436,10 @@ def handle(self, line: int) -> None:
# TODO: why None
offset_to_index[original_instr.offset] = index_instr.index

if original_instr.code_value is not None:
assert index_instr.code_value is not None
self.todo[original_instr.code_value.co_firstlineno].append(
(original_instr.code_value, index_instr.code_value)
if isinstance(original_instr.value, CodeType):
assert isinstance(index_instr.value, CodeType)
self.todo[original_instr.value.co_firstlineno].append(
(original_instr.value, index_instr.value)
)

original_instructions = original_key.instructions
Expand Down

0 comments on commit e5d175c

Please sign in to comment.