diff --git a/docs/compat.md b/docs/compat.md index 4ecc6548b..a5545c967 100644 --- a/docs/compat.md +++ b/docs/compat.md @@ -6,7 +6,7 @@ This matrix indicates the version of Python and/or GDB |:--:|:--:|:--:| | [2018.02](https://github.com/hugsy/gef/releases/tag/2018.02) | 7.2 | Python 2.7, Python 3.4 | | [2020.03](https://github.com/hugsy/gef/releases/tag/2020.03) | 7.4 | Python 2.7, Python 3.4 | -| [2022.01](https://github.com/hugsy/gef/releases/tag/2020.01) | 7.7 | Python 3.4 | +| [2022.01](https://github.com/hugsy/gef/releases/tag/2022.01) | 7.7 | Python 3.4 | | [Current](https://github.com/hugsy/gef/tree/master) | 8.0+ | Python 3.6+ | diff --git a/gef.py b/gef.py index 4fd8781ce..f7b80e0ae 100644 --- a/gef.py +++ b/gef.py @@ -58,6 +58,7 @@ import codecs import collections import ctypes +import enum import functools import hashlib import importlib @@ -152,7 +153,7 @@ def update_gef(argv): __pie_counter__ = 1 __gef_remote__ = None __gef_qemu_mode__ = False -__gef_current_arena__ = "main_arena" +# __gef_current_arena__ = "main_arena" __gef_int_stream_buffer__ = None __gef_redirect_output_fd__ = None @@ -167,6 +168,8 @@ def update_gef(argv): GDB_VERSION = tuple(map(int, re.search(r"(\d+)[^\d]+(\d+)", gdb.VERSION).groups())) PYTHON_VERSION = sys.version_info[0:2] +LIBC_HEAP_MAIN_ARENA_DEFAULT_NAME = "main_arena" + libc_args_definitions = {} highlight_table = {} @@ -176,14 +179,12 @@ def update_gef(argv): def reset_all_caches(): """Free all caches. If an object is cached, it will have a callable attribute `cache_clear` which will be invoked to purge the function cache.""" - global __gef_current_arena__ - for mod in dir(sys.modules["__main__"]): obj = getattr(sys.modules["__main__"], mod) if hasattr(obj, "cache_clear"): obj.cache_clear() - __gef_current_arena__ = "main_arena" + gef.heap.selected_arena = None return @@ -273,7 +274,201 @@ def wrapper(*args, **kwargs): return wrapper +# +# Helpers +# + +def p8(x: int, s: bool = False) -> bytes: + """Pack one byte respecting the current architecture endianness.""" + return struct.pack("{}B".format(endian_str()), x) if not s else struct.pack("{}b".format(endian_str()), x) + +def p16(x: int, s: bool = False) -> bytes: + """Pack one word respecting the current architecture endianness.""" + return struct.pack("{}H".format(endian_str()), x) if not s else struct.pack("{}h".format(endian_str()), x) + +def p32(x: int, s: bool = False) -> bytes: + """Pack one dword respecting the current architecture endianness.""" + return struct.pack("{}I".format(endian_str()), x) if not s else struct.pack("{}i".format(endian_str()), x) + +def p64(x: int, s: bool = False) -> bytes: + """Pack one qword respecting the current architecture endianness.""" + return struct.pack("{}Q".format(endian_str()), x) if not s else struct.pack("{}q".format(endian_str()), x) + +def u8(x: bytes, s: bool = False) -> int: + """Unpack one byte respecting the current architecture endianness.""" + return struct.unpack("{}B".format(endian_str()), x)[0] if not s else struct.unpack("{}b".format(endian_str()), x)[0] + +def u16(x: bytes, s: bool = False) -> int: + """Unpack one word respecting the current architecture endianness.""" + return struct.unpack("{}H".format(endian_str()), x)[0] if not s else struct.unpack("{}h".format(endian_str()), x)[0] + +def u32(x: bytes, s: bool = False) -> int: + """Unpack one dword respecting the current architecture endianness.""" + return struct.unpack("{}I".format(endian_str()), x)[0] if not s else struct.unpack("{}i".format(endian_str()), x)[0] + +def u64(x: bytes, s: bool = False) -> int: + """Unpack one qword respecting the current architecture endianness.""" + return struct.unpack("{}Q".format(endian_str()), x)[0] if not s else struct.unpack("{}q".format(endian_str()), x)[0] + + +def is_ascii_string(address): + """Helper function to determine if the buffer pointed by `address` is an ASCII string (in GDB)""" + try: + return gef.memory.read_ascii_string(address) is not None + except Exception: + return False + + +def is_alive(): + """Check if GDB is running.""" + try: + return gdb.selected_inferior().pid > 0 + except Exception: + return False + + +# +# Decorators +# + +def only_if_gdb_running(f): + """Decorator wrapper to check if GDB is running.""" + + @functools.wraps(f) + def wrapper(*args, **kwargs): + if is_alive(): + return f(*args, **kwargs) + else: + warn("No debugging session active") + + return wrapper + + +def only_if_gdb_target_local(f): + """Decorator wrapper to check if GDB is running locally (target not remote).""" + + @functools.wraps(f) + def wrapper(*args, **kwargs): + if not is_remote_debug(): + return f(*args, **kwargs) + else: + warn("This command cannot work for remote sessions.") + + return wrapper + + +def deprecated(solution): + """Decorator to add a warning when a command is obsolete and will be removed.""" + def decorator(f): + @functools.wraps(f) + def wrapper(*args, **kwargs): + warn("'{}' is deprecated and will be removed in a feature release.".format(f.__name__)) + warn(solution) + return f(*args, **kwargs) + return wrapper + return decorator + + +def experimental_feature(f): + """Decorator to add a warning when a feature is experimental.""" + + @functools.wraps(f) + def wrapper(*args, **kwargs): + warn("This feature is under development, expect bugs and unstability...") + return f(*args, **kwargs) + + return wrapper + + +def only_if_gdb_version_higher_than(required_gdb_version): + """Decorator to check whether current GDB version requirements.""" + + def wrapper(f): + def inner_f(*args, **kwargs): + if GDB_VERSION >= required_gdb_version: + f(*args, **kwargs) + else: + reason = "GDB >= {} for this command".format(required_gdb_version) + raise EnvironmentError(reason) + return inner_f + return wrapper + +def only_if_current_arch_in(valid_architectures): + """Decorator to allow commands for only a subset of the architectured supported by GEF. + This decorator is to use lightly, as it goes against the purpose of GEF to support all + architectures GDB does. However in some cases, it is necessary.""" + + def wrapper(f): + def inner_f(*args, **kwargs): + if gef.arch in valid_architectures: + f(*args, **kwargs) + else: + reason = "This command cannot work for the '{}' architecture".format(gef.arch.arch) + raise EnvironmentError(reason) + return inner_f + return wrapper + + +def FakeExit(*args, **kwargs): + raise RuntimeWarning + +sys.exit = FakeExit + +def parse_arguments(required_arguments, optional_arguments): + """Argument parsing decorator.""" + + def int_wrapper(x): return int(x, 0) + + def decorator(f): + def wrapper(*args, **kwargs): + parser = argparse.ArgumentParser(prog=args[0]._cmdline_, add_help=True) + for argname in required_arguments: + argvalue = required_arguments[argname] + argtype = type(argvalue) + if argtype is int: + argtype = int_wrapper + + argname_is_list = isinstance(argname, list) or isinstance(argname, tuple) + if not argname_is_list and argname.startswith("-"): + # optional args + if argtype is bool: + parser.add_argument(argname, action="store_true" if argvalue else "store_false") + else: + parser.add_argument(argname, type=argtype, required=True, default=argvalue) + else: + if argtype in (list, tuple): + nargs = '*' + argtype = type(argvalue[0]) + else: + nargs = '?' + # positional args + parser.add_argument(argname, type=argtype, default=argvalue, nargs=nargs) + + for argname in optional_arguments: + argname_is_list = isinstance(argname, list) or isinstance(argname, tuple) + if not argname_is_list and not argname.startswith("-"): + # refuse positional arguments + continue + argvalue = optional_arguments[argname] + argtype = type(argvalue) + if not argname_is_list: + argname = [argname,] + if argtype is int: + argtype = int_wrapper + if argtype is bool: + parser.add_argument(*argname, action="store_true" if argvalue else "store_false") + else: + parser.add_argument(*argname, type=argtype, default=argvalue) + + try: + parsed_args = parser.parse_args(*(args[1:])) + except RuntimeWarning: + return + kwargs["arguments"] = parsed_args + return f(*args, **kwargs) + return wrapper + return decorator class Color: """Used to colorify terminal output.""" colors = { @@ -466,6 +661,14 @@ def realpath(self): Zone = collections.namedtuple("Zone", ["name", "zone_start", "zone_end", "filename"]) +class Endianness(enum.Enum): + LITTLE_ENDIAN = 1 + BIG_ENDIAN = 2 + + def __str__(self): + if self == Endianness.LITTLE_ENDIAN: + return "<" + return ">" class Elf: """Basic ELF parsing. @@ -474,9 +677,6 @@ class Elf: - http://refspecs.freestandards.org/elf/elfspec_ppc.pdf - http://refspecs.linuxfoundation.org/ELF/ppc64/PPC-elf64abi.html """ - LITTLE_ENDIAN = 1 - BIG_ENDIAN = 2 - ELF_32_BITS = 0x01 ELF_64_BITS = 0x02 ELF_MAGIC = 0x7f454c46 @@ -510,7 +710,7 @@ class Elf: e_magic = ELF_MAGIC e_class = ELF_32_BITS - e_endianness = LITTLE_ENDIAN + e_endianness = Endianness.LITTLE_ENDIAN e_eiversion = None e_osabi = None e_abiversion = None @@ -781,7 +981,6 @@ def is_valid(self): @lru_cache() def search_for_main_arena(): - global __gef_current_arena__ malloc_hook_addr = parse_address("(void *)&__malloc_hook") if is_x86(): @@ -791,7 +990,7 @@ def search_for_main_arena(): else: raise OSError("Cannot find main_arena for {}".format(gef.arch.arch)) - __gef_current_arena__ = "*0x{:x}".format(addr) + addr = "*0x{:x}".format(addr) return addr @@ -805,6 +1004,8 @@ def __init__(self, addr): warn("Could not parse address '&{}' when searching malloc_state struct, " "using '&main_arena' instead".format(addr)) self.__addr = search_for_main_arena() + # if `search_for_main_arena` throws `gdb.error` on symbol lookup: it means the session is not started + # so just propagate the exception self.num_fastbins = 10 self.num_bins = 254 @@ -968,8 +1169,8 @@ class GlibcArena: """Glibc arena class Ref: https://github.com/sploitfun/lsploits/blob/master/glibc/malloc/malloc.c#L1671""" - def __init__(self, addr, name=None): - self.__name = name or __gef_current_arena__ + def __init__(self, addr): + # self.__name = name or __gef_current_arena__ try: arena = gdb.parse_and_eval(addr) malloc_state_t = cached_lookup_type("struct malloc_state") @@ -999,10 +1200,22 @@ def __int__(self): return self.__addr def __iter__(self): - arena = self - while arena is not None: - yield arena - arena = arena.get_next() + return self + + def __next__(self): + # arena = self + # while arena is not None: + # yield arena + # arena = arena.get_next() + next_arena_address = int(self.next) + # arena_main = GlibcArena(self.__name) + if next_arena_address == int(gef.heap.main_arena): + raise StopIteration + return GlibcArena("*{:#x} ".format(next_arena_address)) + + def __eq__(self, other): + # You cannot have 2 arenas at the same address, so this check should be enough + return self.__addr == int(self) def fastbin(self, i): """Return head chunk in fastbinsY[i].""" @@ -1017,15 +1230,16 @@ def bin(self, i): bw = int(self.bins[idx + 1]) return fd, bw - def get_next(self): - addr_next = int(self.next) - arena_main = GlibcArena(self.__name) - if addr_next == arena_main.__addr: - return None - return GlibcArena("*{:#x} ".format(addr_next)) + # def get_next(self): + # addr_next = int(self.next) + # arena_main = GlibcArena(self.__name) + # if addr_next == arena_main.__addr: + # return None + # return GlibcArena("*{:#x} ".format(addr_next)) + @deprecated("use `==` operator instead") def is_main_arena(self): - return int(self) == parse_address("&main_arena") + return int(self) == int(gef.heap.main_arena) def heap_addr(self, allow_unaligned=False): if self.is_main_arena(): @@ -1109,6 +1323,12 @@ def usable_size(self): def get_prev_chunk_size(self): return gef.memory.read_integer(self.prev_size_addr) + def __iter__(self): + return self + + def __next__(self): + return self.get_next_chunk() + def get_next_chunk(self, allow_unaligned=False): addr = self.get_next_chunk_addr() return GlibcChunk(addr, allow_unaligned=allow_unaligned) @@ -1264,19 +1484,20 @@ def get_libc_version(): return 0, 0 -@lru_cache() -def get_glibc_arena(addr=None): - try: - addr = "*{}".format(addr) if addr else __gef_current_arena__ - return GlibcArena(addr) - except Exception as e: - err("Failed to get the glibc arena, heap commands may not work properly: {}".format(e)) - return None +# @lru_cache() +# def get_glibc_arena(addr=None): +# try: +# addr = "*{}".format(addr) if addr else __gef_current_arena__ +# return GlibcArena(addr) +# except Exception as e: +# err("Failed to get the glibc arena, heap commands may not work properly: {}".format(e)) +# return None +# def get_glibc_arenas(addr=None): +# return iter( GlibcArena(addr) ) -def get_glibc_arenas(addr=None, get_all=True): - arena = get_glibc_arena(addr) - return [arena for arena in iter(arena)] if get_all else [arena] +# def get_first_arena(addr=None): +# return next( get_glibc_arenas(addr) ) def titlify(text, color=None, msg_color=None): @@ -1766,17 +1987,17 @@ def get_arch(): return arch_str -@lru_cache() -def get_endian(): - """Return the binary endianness.""" +# @lru_cache() +# def get_endian(): +# """Return the binary endianness.""" - endian = gdb.execute("show endian", to_string=True).strip().lower() - if "little endian" in endian: - return Elf.LITTLE_ENDIAN - if "big endian" in endian: - return Elf.BIG_ENDIAN +# endian = gdb.execute("show endian", to_string=True).strip().lower() +# if "little endian" in endian: +# return Elf.LITTLE_ENDIAN +# if "big endian" in endian: +# return Elf.BIG_ENDIAN - raise EnvironmentError("Invalid endianness") +# raise EnvironmentError("Invalid endianness") @lru_cache() @@ -1794,8 +2015,14 @@ def is_pie(fpath): return checksec(fpath)["PIE"] -def is_big_endian(): return get_endian() == Elf.BIG_ENDIAN -def is_little_endian(): return not is_big_endian() +@deprecated("Prefer `gef.arch.endianness == Endianness.BIG_ENDIAN`") +def is_big_endian(): + return gef.arch.endianness == Endianness.BIG_ENDIAN + + +@deprecated("gef.arch.endianness == Endianness.LITTLE_ENDIAN") +def is_little_endian(): + return gef.arch.endianness == Endianness.LITTLE_ENDIAN def flags_to_human(reg_value, value_table): @@ -1807,6 +2034,10 @@ def flags_to_human(reg_value, value_table): return "[{}]".format(" ".join(flags)) +# +# Architecture classes +# + class Architecture(metaclass=abc.ABCMeta): """Generic metaclass for the architecture supported by GEF.""" @@ -1851,9 +2082,29 @@ def sp(self): def fp(self): return get_register("$fp") + __ptrsize = None @property def ptrsize(self): - return get_memory_alignment() + if not self.__ptrsize: + res = cached_lookup_type("size_t") + if res is not None: + self.__ptrsize = res.sizeof + else: + self.__ptrsize = gdb.parse_and_eval("$pc").type.sizeof + return self.__ptrsize + + __endianness = None + @property + def endianness(self) -> Endianness: + if not self.__endianness: + output = gdb.execute("show endian", to_string=True).strip().lower() + if "little endian" in output: + self.__endianness = Endianness.LITTLE_ENDIAN + elif "big endian" in output: + self.__endianness = Endianness.BIG_ENDIAN + else: + raise EnvironmentError(f"No valid endianess found in '{output}'") + return self.__endianness def get_ith_parameter(self, i, in_func=True): """Retrieves the correct parameter used for the current function call.""" @@ -1863,6 +2114,26 @@ def get_ith_parameter(self, i, in_func=True): return key, val +class GenericArchitecture(Architecture): + arch = "Generic" + mode = "" + all_registers = () + instruction_length = 0 + return_register = "" + function_parameters = () + syscall_register = "" + syscall_instructions = () + nop_insn = b"" + flag_register = None + flags_table = None + def flag_register_to_human(self, val=None): raise NotImplemented + def is_call(self, insn): raise NotImplemented + def is_ret(self, insn): raise NotImplemented + def is_conditional_branch(self, insn): raise NotImplemented + def is_branch_taken(self, insn): raise NotImplemented + def get_ra(self, insn, frame): raise NotImplemented + + class RISCV(Architecture): arch = "RISCV" mode = "RISCV" @@ -2706,198 +2977,8 @@ def mprotect_asm(cls, addr, size, perm): return "; ".join(insns) -# -# Helpers -# - -def p8(x: int, s: bool = False) -> bytes: - """Pack one byte respecting the current architecture endianness.""" - return struct.pack("{}B".format(endian_str()), x) if not s else struct.pack("{}b".format(endian_str()), x) - -def p16(x: int, s: bool = False) -> bytes: - """Pack one word respecting the current architecture endianness.""" - return struct.pack("{}H".format(endian_str()), x) if not s else struct.pack("{}h".format(endian_str()), x) - -def p32(x: int, s: bool = False) -> bytes: - """Pack one dword respecting the current architecture endianness.""" - return struct.pack("{}I".format(endian_str()), x) if not s else struct.pack("{}i".format(endian_str()), x) - -def p64(x: int, s: bool = False) -> bytes: - """Pack one qword respecting the current architecture endianness.""" - return struct.pack("{}Q".format(endian_str()), x) if not s else struct.pack("{}q".format(endian_str()), x) - -def u8(x: bytes, s: bool = False) -> int: - """Unpack one byte respecting the current architecture endianness.""" - return struct.unpack("{}B".format(endian_str()), x)[0] if not s else struct.unpack("{}b".format(endian_str()), x)[0] - -def u16(x: bytes, s: bool = False) -> int: - """Unpack one word respecting the current architecture endianness.""" - return struct.unpack("{}H".format(endian_str()), x)[0] if not s else struct.unpack("{}h".format(endian_str()), x)[0] - -def u32(x: bytes, s: bool = False) -> int: - """Unpack one dword respecting the current architecture endianness.""" - return struct.unpack("{}I".format(endian_str()), x)[0] if not s else struct.unpack("{}i".format(endian_str()), x)[0] - -def u64(x: bytes, s: bool = False) -> int: - """Unpack one qword respecting the current architecture endianness.""" - return struct.unpack("{}Q".format(endian_str()), x)[0] if not s else struct.unpack("{}q".format(endian_str()), x)[0] - - -def is_ascii_string(address): - """Helper function to determine if the buffer pointed by `address` is an ASCII string (in GDB)""" - try: - return gef.memory.read_ascii_string(address) is not None - except Exception: - return False - - -def is_alive(): - """Check if GDB is running.""" - try: - return gdb.selected_inferior().pid > 0 - except Exception: - return False - return False - - -def only_if_gdb_running(f): - """Decorator wrapper to check if GDB is running.""" - - @functools.wraps(f) - def wrapper(*args, **kwargs): - if is_alive(): - return f(*args, **kwargs) - else: - warn("No debugging session active") - - return wrapper - - -def only_if_gdb_target_local(f): - """Decorator wrapper to check if GDB is running locally (target not remote).""" - - @functools.wraps(f) - def wrapper(*args, **kwargs): - if not is_remote_debug(): - return f(*args, **kwargs) - else: - warn("This command cannot work for remote sessions.") - - return wrapper - - -def deprecated(f): - """Decorator to add a warning when a command is obsolete and will be removed.""" - - @functools.wraps(f) - def wrapper(*args, **kwargs): - warn("'{}' is obsolete and will be removed in a feature release.".format(f.__name__)) - return f(*args, **kwargs) - - return wrapper - - -def experimental_feature(f): - """Decorator to add a warning when a feature is experimental.""" - - @functools.wraps(f) - def wrapper(*args, **kwargs): - warn("This feature is under development, expect bugs and unstability...") - return f(*args, **kwargs) - - return wrapper - - -def only_if_gdb_version_higher_than(required_gdb_version): - """Decorator to check whether current GDB version requirements.""" - - def wrapper(f): - def inner_f(*args, **kwargs): - if GDB_VERSION >= required_gdb_version: - f(*args, **kwargs) - else: - reason = "GDB >= {} for this command".format(required_gdb_version) - raise EnvironmentError(reason) - return inner_f - return wrapper - - -def only_if_current_arch_in(valid_architectures): - """Decorator to allow commands for only a subset of the architectured supported by GEF. - This decorator is to use lightly, as it goes against the purpose of GEF to support all - architectures GDB does. However in some cases, it is necessary.""" - - def wrapper(f): - def inner_f(*args, **kwargs): - if gef.arch in valid_architectures: - f(*args, **kwargs) - else: - reason = "This command cannot work for the '{}' architecture".format(gef.arch.arch) - raise EnvironmentError(reason) - return inner_f - return wrapper -def FakeExit(*args, **kwargs): - raise RuntimeWarning - -sys.exit = FakeExit - -def parse_arguments(required_arguments, optional_arguments): - """Argument parsing decorator.""" - - def int_wrapper(x): return int(x, 0) - - def decorator(f): - def wrapper(*args, **kwargs): - parser = argparse.ArgumentParser(prog=args[0]._cmdline_, add_help=True) - for argname in required_arguments: - argvalue = required_arguments[argname] - argtype = type(argvalue) - if argtype is int: - argtype = int_wrapper - - argname_is_list = isinstance(argname, list) or isinstance(argname, tuple) - if not argname_is_list and argname.startswith("-"): - # optional args - if argtype is bool: - parser.add_argument(argname, action="store_true" if argvalue else "store_false") - else: - parser.add_argument(argname, type=argtype, required=True, default=argvalue) - else: - if argtype in (list, tuple): - nargs = '*' - argtype = type(argvalue[0]) - else: - nargs = '?' - # positional args - parser.add_argument(argname, type=argtype, default=argvalue, nargs=nargs) - - for argname in optional_arguments: - argname_is_list = isinstance(argname, list) or isinstance(argname, tuple) - if not argname_is_list and not argname.startswith("-"): - # refuse positional arguments - continue - argvalue = optional_arguments[argname] - argtype = type(argvalue) - if not argname_is_list: - argname = [argname,] - if argtype is int: - argtype = int_wrapper - if argtype is bool: - parser.add_argument(*argname, action="store_true" if argvalue else "store_false") - else: - parser.add_argument(*argname, type=argtype, default=argvalue) - - try: - parsed_args = parser.parse_args(*(args[1:])) - except RuntimeWarning: - return - kwargs["arguments"] = parsed_args - return f(*args, **kwargs) - return wrapper - return decorator - def copy_to_clipboard(data): """Helper function to submit data to the clipboard""" @@ -3623,25 +3704,25 @@ def get_elf_headers(filename=None): return Elf(filename) -def _ptr_width(): - void = cached_lookup_type("void") - if void is None: - uintptr_t = cached_lookup_type("uintptr_t") - return uintptr_t.sizeof - else: - return void.pointer().sizeof +# def _ptr_width(): +# void = cached_lookup_type("void") +# if void is None: +# uintptr_t = cached_lookup_type("uintptr_t") +# return uintptr_t.sizeof +# else: +# return void.pointer().sizeof @lru_cache() def is_64bit(): """Checks if current target is 64bit.""" - return _ptr_width() == 8 + return gef.arch.ptrsize == 8 @lru_cache() def is_32bit(): """Checks if current target is 32bit.""" - return _ptr_width() == 4 + return gef.arch.ptrsize == 4 @lru_cache() @@ -3721,7 +3802,7 @@ def cached_lookup_type(_type): return None -@lru_cache() +@deprecated("Use `gef.arch.ptrsize` instead") def get_memory_alignment(in_bits=False): """Try to determine the size of a pointer on this system. First, try to parse it out of the ELF header. @@ -3729,11 +3810,6 @@ def get_memory_alignment(in_bits=False): Finally, try the size of $pc. If `in_bits` is set to True, the result is returned in bits, otherwise in bytes.""" - if is_32bit(): - return 4 if not in_bits else 32 - elif is_64bit(): - return 8 if not in_bits else 64 - res = cached_lookup_type("size_t") if res is not None: return res.sizeof if not in_bits else res.sizeof * 8 @@ -3742,6 +3818,7 @@ def get_memory_alignment(in_bits=False): return gdb.parse_and_eval("$pc").type.sizeof except: pass + raise EnvironmentError("GEF is running under an unsupported mode") @@ -3830,13 +3907,12 @@ def parse_address(address): def is_in_x86_kernel(address): address = align_address(address) - memalign = get_memory_alignment(in_bits=True) - 1 + memalign = gef.arch.ptrsize*8 - 1 return (address >> memalign) == 0xF - -@lru_cache() +@deprecated("Use `str(gef.arch.endianness)` instead") def endian_str(): - return "<" if is_little_endian() else ">" + return str(gef.arch.endianness) @lru_cache() @@ -4582,7 +4658,7 @@ def settings(self): """Return the list of settings for this command.""" return list(iter(self)) - @deprecated + @deprecated("") def get_setting(self, name): return self.__getitem__(name) @@ -4590,14 +4666,14 @@ def __getitem__(self, name): key = self.__get_setting_name(name) return gef.config[key] - @deprecated + @deprecated("") def has_setting(self, name): return self.__contains__(name) def __contains__(self, name): return self.__get_setting_name(name) in gef.config - @deprecated + @deprecated("") def add_setting(self, name, value, description=""): return self.__setitem__(name, (value, type(value), description)) @@ -4615,7 +4691,7 @@ def __setitem__(self, name, value): gef.config[key] = GefSetting(value[0], description=value[1]) return - @deprecated + @deprecated("") def del_setting(self, name): return self.__delitem__(name) @@ -4672,14 +4748,8 @@ def do_invoke(self, argv): class PrintFormatCommand(GenericCommand): """Print bytes format in high level languages.""" - format_matrix = { - 8: (endian_str() + "B", "char", "db"), - 16: (endian_str() + "H", "short", "dw"), - 32: (endian_str() + "I", "int", "dd"), - 64: (endian_str() + "Q", "long long", "dq"), - } - - valid_formats = ["py", "c", "js", "asm"] + valid_formats = ("py", "c", "js", "asm") + valid_bitness = (8, 16, 32, 64) _cmdline_ = "print-format" _aliases_ = ["pf",] @@ -4688,7 +4758,7 @@ class PrintFormatCommand(GenericCommand): \t--bitlen SIZE specifies size of bit (possible values: {}, default is 8). \t--length LENGTH specifies length of array (default is 256). \t--clip The output data will be copied to clipboard -\tLOCATION specifies where the address of bytes is stored.""".format(_cmdline_, str(valid_formats), str(format_matrix.keys())) +\tLOCATION specifies where the address of bytes is stored.""".format(_cmdline_, str(valid_formats), str(valid_bitness)) _example_ = "{} --lang py -l 16 $rsp".format(_cmdline_) @@ -4696,6 +4766,16 @@ def __init__(self): super().__init__(complete=gdb.COMPLETE_LOCATION) return + @property + def format_matrix(self): + # `endian_str()` is a runtime property, should not be defined as a class property + return { + 8: (endian_str() + "B", "char", "db"), + 16: (endian_str() + "H", "short", "dw"), + 32: (endian_str() + "I", "int", "dd"), + 64: (endian_str() + "Q", "long long", "dq"), + } + @only_if_gdb_running @parse_arguments({"location": "$pc", }, {("--length", "-l"): 256, "--bitlen": 0, "--lang": "py", "--clip": True,}) def do_invoke(self, argv, *args, **kwargs): @@ -5711,7 +5791,7 @@ def disconnect(self): self.sock = None return - @deprecated + @deprecated("") def do_invoke(self, argv): def parsed_arglist(arglist): args = [] @@ -6861,7 +6941,7 @@ class GlibcHeapSetArenaCommand(GenericCommand): """Display information on a heap chunk.""" _cmdline_ = "heap set-arena" - _syntax_ = "{:s} address".format(_cmdline_) + _syntax_ = "{:s} [address|symbol]".format(_cmdline_) _example_ = "{:s} 0x001337001337".format(_cmdline_) def __init__(self): @@ -6870,26 +6950,31 @@ def __init__(self): @only_if_gdb_running def do_invoke(self, argv): - global __gef_current_arena__ + global gef if not argv: - ok("Current arena set to: '{}'".format(__gef_current_arena__)) + ok("Current arena set to: '{}'".format(gef.heap.selected_arena)) return - new_arena = safe_parse_and_eval(argv[0]) - if new_arena is None: + if is_hex(argv[0]): + new_arena_address = argv[0] + else: + new_arena_symbol = safe_parse_and_eval(argv[0]) + if not new_arena_symbol: + err("Invalid symbol for arena") + return + + new_arena_address = Address(value=to_unsigned_long(new_arena_symbol)) + if not new_arena_address or not new_arena_address.valid: err("Invalid address") return - if argv[0].startswith("0x"): - new_arena = Address(value=to_unsigned_long(new_arena)) - if new_arena is None or not new_arena.valid: - err("Invalid address") - return + new_arena = GlibcArena(f"*{new_arena_address:#x}") + if new_arena not in gef.heap.arenas: + err("Invalid arena") + return - __gef_current_arena__ = "*{:s}".format(format_address(new_arena.value)) - else: - __gef_current_arena__ = argv[0] + gef.heap.selected_arena = new_arena return @@ -6902,8 +6987,9 @@ class GlibcHeapArenaCommand(GenericCommand): @only_if_gdb_running def do_invoke(self, argv): - arenas = get_glibc_arenas() - for arena in arenas: + # arenas = get_glibc_arenas() + for arena in gef.heap.arenas: + print("foo") gef_print(str(arena)) return @@ -6971,9 +7057,12 @@ def __init__(self): def do_invoke(self, *args, **kwargs): args = kwargs["arguments"] - arenas = get_glibc_arenas(addr=args.arena_address, get_all=args.all) + # arenas = get_glibc_arenas(addr=args.arena_address, get_all=args.all) + arenas = gef.heap.arenas for arena in arenas: self.dump_chunks_arena(arena, print_arena=args.all, allow_unaligned=args.allow_unaligned) + if not args.all: + break def dump_chunks_arena(self, arena, print_arena=False, allow_unaligned=False): top_chunk_addr = arena.top @@ -7334,11 +7423,11 @@ def __init__(self): @only_if_gdb_running def do_invoke(self, argv): - if get_glibc_arena() is None: - err("Invalid Glibc arena") + if not gef.heap.main_arena: + err("Heap not initialized") return - arena_addr = "*{:s}".format(argv[0]) if len(argv) == 1 else __gef_current_arena__ + arena = GlibcArena(f"*{argv[0]:s}") if len(argv) == 1 else gef.heap.selected_arena gef_print(titlify("Small Bins for arena '{:s}'".format(arena_addr))) bins = {} for i in range(1, 63): @@ -11229,8 +11318,8 @@ def __gef_prompt__(current_prompt): return GEF_PROMPT_OFF -class GefMemory: - """Class that describes memory access.""" +class GefMemoryManager: + """Class that manages memory access for gef.""" def write(self, address, buffer, length=0x10): """Write `buffer` at address `address`.""" @@ -11274,7 +11363,6 @@ def read_cstring(self, address, max_length=GEF_MAX_STRING_LENGTH, encoding=None) return ustr - def read_ascii_string(self, address): """Read an ASCII string from memory""" cstr = self.read_cstring(address) @@ -11282,14 +11370,66 @@ def read_ascii_string(self, address): return cstr return None +class GefHeapManager: + + def __init__(self): + self.__libc_main_arena = None + self.__libc_selected_arena = None + self.__heap_base = None + return + + @property + def main_arena(self): + if not self.__libc_main_arena: + try: + self.__libc_main_arena = GlibcArena(search_for_main_arena()) + # the initialization of `main_arena` also defined `selected_arena`, so + # by default, `main_arena` == `selected_arena` + self.selected_arena = self.__libc_main_arena + except: + # the search for arena can fail when the session is not started + pass + return self.__libc_main_arena + + @property + def selected_arena(self): + if not self.__libc_selected_arena: + # `selected_arena` must default to `main_arena` + self.__libc_selected_arena = self.__libc_default_arena + return self.__libc_selected_arena + + @selected_arena.setter + def selected_arena(self, value): + self.__libc_selected_arena = value + return + + @property + def arenas(self): + if not self.main_arena: + return [] + return iter(self.__libc_main_arena) + + @property + def base_address(self): + if not self.__heap_base: + self.__heap_base = HeapBaseFunction.heap_base() + return self.__heap_base + + @property + def chunks(self): + if not self.base_address: + return [] + return iter( GlibcChunk(self.base_address) ) + + class GefSetting: def __init__(self, value, cls = None, description = None): self.value = value self.type = cls or type(value) self.description = description or "" + return - -class GefSettings(dict): +class GefSettingsManager(dict): """ GefSettings acts as a dict where the global settings are stored and can be read, written or deleted as any other dict. For instance, to read a specific command setting: `gef.config[mycommand.mysetting]` @@ -11323,16 +11463,24 @@ def raw_entry(self, name): return dict.__getitem__(self, name) +class GefSessionManager: + def __init__(self): + pass + class Gef: """The GEF root class""" def __init__(self): self.binary = None - self.arch = None - self.memory = GefMemory() - self.config = GefSettings() + self.arch = GenericArchitecture() # see PR #516, will be reset by `new_objfile_handler` + self.config = GefSettingsManager() return def setup(self): + """ + Setup initialize the runtime setup, which may require for the `gef` to be not None + """ + self.memory = GefMemoryManager() + self.heap = GefHeapManager() self.instance = GefCommand() self.instance.setup() tempdir = self.config["gef.tempdir"] @@ -11341,6 +11489,8 @@ def setup(self): return + + if __name__ == "__main__": if sys.version_info[0] == 2: err("GEF has dropped Python2 support for GDB when it reached EOL on 2020/01/01.") @@ -11377,35 +11527,29 @@ def setup(self): gdb.prompt_hook = __gef_prompt__ # setup config - gdb.execute("set confirm off") - gdb.execute("set verbose off") - gdb.execute("set pagination off") - gdb.execute("set print elements 0") - - # gdb history - gdb.execute("set history save on") - gdb.execute("set history filename ~/.gdb_history") - - # gdb input and output bases - gdb.execute("set output-radix 0x10") - - # pretty print - gdb.execute("set print pretty on") - - try: - # this will raise a gdb.error unless we're on x86 - gdb.execute("set disassembly-flavor intel") - except gdb.error: - # we can safely ignore this - pass - - # SIGALRM will simply display a message, but gdb won't forward the signal to the process - gdb.execute("handle SIGALRM print nopass") + gdb_initial_config = ( + "set confirm off", + "set verbose off", + "set pagination off", + "set print elements 0", + "set history save on", + "set history filename ~/.gdb_history", + "set output-radix 0x10", + "set print pretty on", + "set disassembly-flavor intel", + "handle SIGALRM print nopass", + ) + for cmd in gdb_initial_config: + try: + gdb.execute(cmd) + except gdb.error: + pass # load GEF gef = Gef() gef.setup() + print(gef.arch) # gdb events configuration gef_on_continue_hook(continue_handler) gef_on_stop_hook(hook_stop_handler)