From d5a3bb6185e30419c8380bf1d2fb49692cc35439 Mon Sep 17 00:00:00 2001 From: hugsy Date: Sat, 8 Jan 2022 14:50:21 -0800 Subject: [PATCH] changed path to `pathlib.Path` (#775) added test for `download_file` --- gef.py | 185 +++++++++++++++++++++------------------------- tests/helpers.py | 13 ++++ tests/runtests.py | 23 +++--- 3 files changed, 109 insertions(+), 112 deletions(-) diff --git a/gef.py b/gef.py index 787cf1ec6..bc16e1ff0 100644 --- a/gef.py +++ b/gef.py @@ -1607,20 +1607,11 @@ def gef_pybytes(x): @lru_cache() def which(program): """Locate a command on the filesystem.""" - - def is_exe(fpath): - return os.path.isfile(fpath) and os.access(fpath, os.X_OK) - - fpath = os.path.split(program)[0] - if fpath: - if is_exe(program): - return program - else: - for path in os.environ["PATH"].split(os.pathsep): - path = path.strip('"') - exe_file = os.path.join(path, program) - if is_exe(exe_file): - return exe_file + for path in os.environ["PATH"].split(os.pathsep): + dirname = pathlib.Path(path) + fpath = dirname / program + if os.access(fpath, os.X_OK): + return fpath raise FileNotFoundError("Missing file `{:s}`".format(program)) @@ -1735,14 +1726,10 @@ def disable_redirect_output(): def gef_makedirs(path, mode=0o755): """Recursive mkdir() creation. If successful, return the absolute path of the directory created.""" - abspath = os.path.expanduser(path) - abspath = os.path.realpath(abspath) - - if os.path.isdir(abspath): - return abspath - - os.makedirs(abspath, mode=mode, exist_ok=True) - return abspath + fpath = pathlib.Path(path) + if not fpath.is_dir(): + fpath.mkdir(mode=mode, exist_ok=True, parents=True) + return fpath.absolute() @lru_cache() @@ -1938,9 +1925,11 @@ def gef_execute_gdb_script(commands): with os.fdopen(fd, "w") as f: f.write(commands) f.flush() - if os.access(fname, os.R_OK): + + fname = pathlib.Path(fname) + if fname.is_file() and os.access(fname, os.R_OK): gdb.execute("source {:s}".format(fname)) - os.unlink(fname) + fname.unlink() return @@ -3182,29 +3171,28 @@ def get_filepath(): return get_path_from_info_proc() -def download_file(target, use_cache=False, local_name=None): - """Download filename `target` inside the mirror tree inside the gef.config["gef.tempdir"]. +def download_file(remote_path, use_cache=False, local_name=None): + """Download filename `remote_path` inside the mirror tree inside the gef.config["gef.tempdir"]. The tree architecture must be gef.config["gef.tempdir"]/gef//. This allow a "chroot-like" tree format.""" try: - local_root = os.path.sep.join([gef.config["gef.tempdir"], str(gef.session.pid)]) + local_root = pathlib.Path(gef.config["gef.tempdir"]) / str(gef.session.pid) if local_name is None: - local_path = os.path.sep.join([local_root, os.path.dirname(target)]) - local_name = os.path.sep.join([local_path, os.path.basename(target)]) + local_path = local_root / remote_path.strip(os.sep) else: - local_path = os.path.sep.join([local_root, os.path.dirname(local_name)]) - local_name = os.path.sep.join([local_path, os.path.basename(local_name)]) + local_path = local_root / local_name.strip(os.sep) - if use_cache and os.access(local_name, os.R_OK): - return local_name + if use_cache and local_path.exists(): + return str(local_path.absolute()) - gef_makedirs(local_path) - gdb.execute("remote get {0:s} {1:s}".format(target, local_name)) + local_path.parent.mkdir(parents=True, exist_ok=True) + gdb.execute("remote get {0:s} {1:s}".format(remote_path, str(local_path.absolute()))) + local_path = str(local_path.absolute()) except gdb.error: # fallback memory view - with open(local_name, "w") as f: + with open(local_path, "w") as f: if is_32bit(): f.write("00000000-ffffffff rwxp 00000000 00:00 0 {}\n".format(get_filepath())) else: @@ -3213,7 +3201,8 @@ def download_file(target, use_cache=False, local_name=None): except Exception as e: err("download_file() failed: {}".format(str(e))) local_name = None - return local_name + + return local_path def get_function_length(sym): @@ -4580,9 +4569,9 @@ class VersionCommand(GenericCommand): _example_ = "{:s}".format(_cmdline_) def do_invoke(self, argv): - gef_fpath = os.path.abspath(os.path.expanduser(inspect.stack()[0][1])) - gef_dir = os.path.dirname(gef_fpath) - with open(gef_fpath, "rb") as f: + gef_fpath = pathlib.Path(inspect.stack()[0][1]).expanduser().absolute() + gef_dir = gef_fpath.parent + with gef_fpath.open("rb") as f: gef_hash = hashlib.sha256(f.read()).hexdigest() if os.access("{}/.git".format(gef_dir), os.X_OK): @@ -5801,8 +5790,8 @@ def import_structures(self, structs): gef_makedirs(path) for struct_name in structs: - fullpath = os.path.join(path, "{}.py".format(struct_name)) - with open(fullpath, "w") as f: + fullpath = pathlib.Path(path) / "{}.py".format(struct_name) + with fullpath.open("w") as f: f.write("from ctypes import *\n\n") f.write("class ") f.write(struct_name) @@ -7730,7 +7719,7 @@ class ProcessListingCommand(GenericCommand): def __init__(self): super().__init__(complete=gdb.COMPLETE_LOCATION) - self["ps_command"] = ( "{:s} auxww".format(gef.session.constants["ps"]), "`ps` command to get process information") + self["ps_command"] = ( "{:s} auxww".format(str(gef.session.constants["ps"])), "`ps` command to get process information") return @parse_arguments({"pattern": ""}, {"--attach": True, "--smart-scan": True}) @@ -11533,65 +11522,63 @@ def reset_caches(self): "Consider updating to GDB {} or higher (with Python {} or higher).".format(".".join(map(str, GDB_MIN_VERSION)), ".".join(map(str, PYTHON_MIN_VERSION)))) exit(1) - else: + try: + pyenv = which("pyenv") + PYENV_ROOT = gef_pystring(subprocess.check_output([pyenv, "root"]).strip()) + PYENV_VERSION = gef_pystring(subprocess.check_output([pyenv, "version-name"]).strip()) + site_packages_dir = os.path.join(PYENV_ROOT, "versions", PYENV_VERSION, "lib", + "python{}".format(PYENV_VERSION[:3]), "site-packages") + site.addsitedir(site_packages_dir) + except FileNotFoundError: + pass + + # When using a Python virtual environment, GDB still loads the system-installed Python + # so GEF doesn't load site-packages dir from environment + # In order to fix it, from the shell with venv activated we run the python binary, + # take and parse its path, add the path to the current python process using sys.path.extend + PYTHONBIN = which("python3") + PREFIX = gef_pystring(subprocess.check_output([PYTHONBIN, '-c', 'import os, sys;print((sys.prefix))'])).strip("\\n") + if PREFIX != sys.base_prefix: + SITE_PACKAGES_DIRS = subprocess.check_output( + [PYTHONBIN, "-c", "import os, sys;print(os.linesep.join(sys.path).strip())"]).decode("utf-8").split() + sys.path.extend(SITE_PACKAGES_DIRS) + + # setup prompt + gdb.prompt_hook = __gef_prompt__ + + # setup config + gdb_initial_settings = ( + "set confirm off", + "set verbose off", + "set pagination off", + "set print elements 0", + "set history save on", + "set history filename ~/.gdb_history", + "set output-radix 0x10", + "set print pretty on", + "set disassembly-flavor intel", + "handle SIGALRM print nopass", + ) + for cmd in gdb_initial_settings: try: - pyenv = which("pyenv") - PYENV_ROOT = gef_pystring(subprocess.check_output([pyenv, "root"]).strip()) - PYENV_VERSION = gef_pystring(subprocess.check_output([pyenv, "version-name"]).strip()) - site_packages_dir = os.path.join(PYENV_ROOT, "versions", PYENV_VERSION, "lib", - "python{}".format(PYENV_VERSION[:3]), "site-packages") - site.addsitedir(site_packages_dir) - except FileNotFoundError: + gdb.execute(cmd) + except gdb.error: pass - # When using a Python virtual environment, GDB still loads the system-installed Python - # so GEF doesn't load site-packages dir from environment - # In order to fix it, from the shell with venv activated we run the python binary, - # take and parse its path, add the path to the current python process using sys.path.extend - - PYTHONBIN = which("python3") - PREFIX = gef_pystring(subprocess.check_output([PYTHONBIN, '-c', 'import os, sys;print((sys.prefix))'])).strip("\\n") - if PREFIX != sys.base_prefix: - SITE_PACKAGES_DIRS = subprocess.check_output( - [PYTHONBIN, "-c", "import os, sys;print(os.linesep.join(sys.path).strip())"]).decode("utf-8").split() - sys.path.extend(SITE_PACKAGES_DIRS) - - # setup prompt - gdb.prompt_hook = __gef_prompt__ - - # setup config - gdb_initial_settings = ( - "set confirm off", - "set verbose off", - "set pagination off", - "set print elements 0", - "set history save on", - "set history filename ~/.gdb_history", - "set output-radix 0x10", - "set print pretty on", - "set disassembly-flavor intel", - "handle SIGALRM print nopass", - ) - for cmd in gdb_initial_settings: - try: - gdb.execute(cmd) - except gdb.error: - pass - - # load GEF - reset() + # load GEF + reset() - # gdb events configuration - gef_on_continue_hook(continue_handler) - gef_on_stop_hook(hook_stop_handler) - gef_on_new_hook(new_objfile_handler) - gef_on_exit_hook(exit_handler) - gef_on_memchanged_hook(memchanged_handler) - gef_on_regchanged_hook(regchanged_handler) + # gdb events configuration + gef_on_continue_hook(continue_handler) + gef_on_stop_hook(hook_stop_handler) + gef_on_new_hook(new_objfile_handler) + gef_on_exit_hook(exit_handler) + gef_on_memchanged_hook(memchanged_handler) + gef_on_regchanged_hook(regchanged_handler) - if gdb.current_progspace().filename is not None: - # if here, we are sourcing gef from a gdb session already attached - # we must force a call to the new_objfile handler (see issue #278) - new_objfile_handler(None) + if gdb.current_progspace().filename is not None: + # if here, we are sourcing gef from a gdb session already attached + # we must force a call to the new_objfile handler (see issue #278) + new_objfile_handler(None) - GefTmuxSetup() + GefTmuxSetup() diff --git a/tests/helpers.py b/tests/helpers.py index 6ff1b7188..154413b2c 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -159,3 +159,16 @@ def _target(name: str, extension: str = ".out") -> Path: if not target.exists(): raise FileNotFoundError(f"Could not find file '{target}'") return target + + +def start_gdbserver(exe=_target("default"), port=1234): + return subprocess.Popen(["gdbserver", f":{port}", exe], + stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + +def stop_gdbserver(gdbserver): + """Stops the gdbserver and waits until it is terminated if it was + still running. Needed to make the used port available again.""" + if gdbserver.poll() is None: + gdbserver.kill() + gdbserver.wait() + return \ No newline at end of file diff --git a/tests/runtests.py b/tests/runtests.py index 1ffd365cd..0bc631c3a 100755 --- a/tests/runtests.py +++ b/tests/runtests.py @@ -22,7 +22,8 @@ include_for_architectures, ARCH, is_64b, - _target + _target, + start_gdbserver, stop_gdbserver, ) BIN_LS = Path("/bin/ls") @@ -197,18 +198,6 @@ def test_cmd_got(self): return def test_cmd_gef_remote(self): - def start_gdbserver(exe=_target("default"), port=1234): - return subprocess.Popen(["gdbserver", f":{port}", exe], - stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - - def stop_gdbserver(gdbserver): - """Stops the gdbserver and waits until it is terminated if it was - still running. Needed to make the used port available again.""" - if gdbserver.poll() is None: - gdbserver.kill() - gdbserver.wait() - return - before = ["gef-remote :1234"] gdbserver = start_gdbserver() res = gdb_start_silent_cmd("vmmap", before=before) @@ -900,6 +889,14 @@ def test_func_parse_address(self): self.assertException(res) return + def test_func_download_file(self): + gdbsrv = start_gdbserver(BIN_LS) + func = f"download_file('{str(BIN_LS)}')" + res = gdb_test_python_method(func) + stop_gdbserver(gdbsrv) + self.assertNoException(res) + return + class TestGdbFunctionsUnit(GefUnitTestGeneric): """Tests gdb convenience functions added by GEF."""