diff --git a/examples/change_watcher_plugin/bs_change_watcher/__init__.py b/examples/change_watcher_plugin/bs_change_watcher/__init__.py index 6dbf11e5..04f20443 100644 --- a/examples/change_watcher_plugin/bs_change_watcher/__init__.py +++ b/examples/change_watcher_plugin/bs_change_watcher/__init__.py @@ -15,19 +15,20 @@ def create_plugin(*args, **kwargs): FunctionHeader, StackVariable, Enum, Struct, GlobalVariable, Comment ) - generic_printer = lambda *x, **y: print(f"Changed {x}{y}") - callback_handlers = { - typ: [generic_printer] for typ in (FunctionHeader, StackVariable, Enum, Struct, GlobalVariable, Comment,) - } deci = DecompilerInterface.discover_interface( plugin_name="ArtifactChangeWatcher", init_plugin=True, - artifact_write_callbacks=callback_handlers, ui_init_args=args, ui_init_kwargs=kwargs ) + # create a function to print a string in the decompiler console + decompiler_printer = lambda *x, **y: deci.print(f"Changed {x}{y}") + # register the callback for all the types we want to print + deci.artifact_write_callbacks = { + typ: [decompiler_printer] for typ in (FunctionHeader, StackVariable, Enum, Struct, GlobalVariable, Comment,) + } - # register a ctx_menu_item late since we want the callback to be inside the deci + # register a menu to open when you right click on the psuedocode view deci.register_ctx_menu_item( "StartArtifactChangeWatcher", "Start watching artifact changes", @@ -85,5 +86,10 @@ def install_angr(self, path=None, interactive=True): if not path: return + path = path / "bs_change_watcher" + path.mkdir(parents=True, exist_ok=True) + src = self.pkg_path / "plugin.toml" + dst = Path(path) / "plugin.toml" + self.link_or_copy(src, dst, symlink=True) self._copy_plugin_to_path(path) - return path \ No newline at end of file + return path diff --git a/examples/change_watcher_plugin/bs_change_watcher/bs_change_watcher_plugin.py b/examples/change_watcher_plugin/bs_change_watcher/bs_change_watcher_plugin.py index 66cfef04..f50a6272 100644 --- a/examples/change_watcher_plugin/bs_change_watcher/bs_change_watcher_plugin.py +++ b/examples/change_watcher_plugin/bs_change_watcher/bs_change_watcher_plugin.py @@ -34,14 +34,28 @@ def create_plugin(*args, **kwargs): has_ida = True except ImportError: has_ida = False + try: + import angrmanagement + has_angr = True + except ImportError: + has_angr = False - if not has_ida: + if not has_ida and not has_angr: create_plugin() + elif has_angr: + from angrmanagement.plugins import BasePlugin + class AngrBSPluginThunk(BasePlugin): + def __init__(self, workspace): + super().__init__(workspace) + globals()["workspace"] = workspace + self.plugin = create_plugin() + + def teardown(self): + pass def PLUGIN_ENTRY(*args, **kwargs): """ This is the entry point for IDA to load the plugin. """ - print("[+] Loading callback_watcher plugin (1/2)") return create_plugin(*args, **kwargs) diff --git a/examples/change_watcher_plugin/bs_change_watcher/plugin.toml b/examples/change_watcher_plugin/bs_change_watcher/plugin.toml new file mode 100644 index 00000000..992eabf1 --- /dev/null +++ b/examples/change_watcher_plugin/bs_change_watcher/plugin.toml @@ -0,0 +1,13 @@ +[meta] +plugin_metadata_version = 0 + +[plugin] +name = "bs_change_watcher" +shortname = "bs_change_watcher" +version = "0.0.0" +description = "" +long_description = "" +platforms = ["windows", "linux", "macos"] +min_angr_version = "9.0.0.0" +author = "The BinSync Team" +entrypoints = ["bs_change_watcher_plugin.py"] \ No newline at end of file diff --git a/examples/template_plugin_entry.py b/examples/template_plugin_entry.py index 1b38ef38..49596799 100644 --- a/examples/template_plugin_entry.py +++ b/examples/template_plugin_entry.py @@ -37,9 +37,24 @@ def create_plugin(*args, **kwargs): has_ida = True except ImportError: has_ida = False + try: + import angrmanagement + has_angr = True + except ImportError: + has_angr = False - if not has_ida: + if not has_ida and not has_angr: create_plugin() + elif has_angr: + from angrmanagement.plugins import BasePlugin + class AngrBSPluginThunk(BasePlugin): + def __init__(self, workspace): + super().__init__(workspace) + globals()["workspace"] = workspace + self.plugin = create_plugin() + + def teardown(self): + pass def PLUGIN_ENTRY(*args, **kwargs): diff --git a/libbs/__init__.py b/libbs/__init__.py index 777f190d..3e2f46a3 100644 --- a/libbs/__init__.py +++ b/libbs/__init__.py @@ -1 +1 @@ -__version__ = "0.8.0" +__version__ = "0.9.0" diff --git a/libbs/api/decompiler_interface.py b/libbs/api/decompiler_interface.py index f0ca7648..f088b84f 100644 --- a/libbs/api/decompiler_interface.py +++ b/libbs/api/decompiler_interface.py @@ -86,6 +86,7 @@ def __init__( self._gui_ctx_menu_actions = [] self._plugin_name = plugin_name self.gui_plugin = None + self._artifact_watchers_started = False # locks self.artifact_write_lock = threading.Lock() @@ -120,7 +121,8 @@ def start_artifact_watchers(self): @return: """ - pass + self.info("Starting BinSync artifact watchers...") + self._artifact_watchers_started = True def stop_artifact_watchers(self): """ @@ -129,7 +131,8 @@ def stop_artifact_watchers(self): decompiler. This is useful for plugins that want to watch for changes in the decompiler and react to them. """ - pass + self.info("Stopping BinSync artifact watchers...") + self._artifact_watchers_started = False def _init_ui_components(self, *args, **kwargs): from libbs.ui.version import set_ui_version @@ -591,9 +594,12 @@ def artifact_set_event_handler( return had_changes # - # Special Loggers + # Special Loggers and Printers # + def print(self, msg: str, **kwargs): + print(msg) + def info(self, msg: str, **kwargs): _l.info(msg) diff --git a/libbs/decompilers/angr/compat.py b/libbs/decompilers/angr/compat.py index 65b4dfe8..723ff367 100644 --- a/libbs/decompilers/angr/compat.py +++ b/libbs/decompilers/angr/compat.py @@ -5,6 +5,10 @@ from angrmanagement.plugins import BasePlugin from angrmanagement.ui.workspace import Workspace +from libbs.data import ( + StackVariable, FunctionHeader, Enum, Struct, GlobalVariable, Comment, FunctionArgument +) + if typing.TYPE_CHECKING: from .interface import AngrInterface @@ -89,46 +93,83 @@ def handle_stack_var_renamed(self, func, offset, old_name, new_name): decompilation = self.interface.decompile_function(func) stack_var = self.interface.find_stack_var_in_codegen(decompilation, offset) - var_type = AngrInterface.stack_var_type_str(decompilation, stack_var) - return False + self.interface.stack_variable_changed(StackVariable(offset, new_name, None, stack_var.size, func.addr)) + return True # pylint: disable=unused-argument def handle_stack_var_retyped(self, func, offset, old_type, new_type): decompilation = self.interface.decompile_function(func) stack_var = self.interface.find_stack_var_in_codegen(decompilation, offset) - return False + var_type = AngrInterface.stack_var_type_str(decompilation, stack_var) + self.interface.stack_variable_changed(StackVariable(offset, stack_var.name, var_type, stack_var.size, func.addr)) + return True # pylint: disable=unused-argument def handle_func_arg_renamed(self, func, offset, old_name, new_name): decompilation = self.interface.decompile_function(func) func_args = AngrInterface.func_args_as_libbs_args(decompilation) - func_type = decompilation.cfunc.functy.returnty.c_repr() - return False + self.interface.function_header_changed( + FunctionHeader( + None, + func.addr, + type_=None, + args={offset: FunctionArgument(offset, new_name, None, func_args[offset].size)}, + ) + ) + + return True # pylint: disable=unused-argument def handle_func_arg_retyped(self, func, offset, old_type, new_type): decompilation = self.interface.decompile_function(func) func_args = AngrInterface.func_args_as_libbs_args(decompilation) - func_type = decompilation.cfunc.functy.returnty.c_repr() - return False + self.interface.function_header_changed( + FunctionHeader( + None, + func.addr, + type_=None, + args={offset: FunctionArgument(offset, None, new_type, func_args[offset].size)}, + ) + ) + + return True # pylint: disable=unused-argument,no-self-use def handle_global_var_renamed(self, address, old_name, new_name): - return False + self.interface.global_variable_changed( + GlobalVariable(address, new_name, type_=None) + ) + return True # pylint: disable=unused-argument,no-self-use def handle_global_var_retyped(self, address, old_type, new_type): - return False + self.interface.global_variable_changed( + GlobalVariable(address, None, type_=new_type) + ) + return True # pylint: disable=unused-argument def handle_function_renamed(self, func, old_name, new_name): - return False + if func is None: + return False + + self.interface.function_header_changed(FunctionHeader(new_name, func.addr)) + return True # pylint: disable=unused-argument,no-self-use def handle_function_retyped(self, func, old_type, new_type): - return False + if func is None: + return False + + self.interface.function_header_changed(FunctionHeader(None, func.addr, type_=new_type)) + return True # pylint: disable=unused-argument def handle_comment_changed(self, address, old_cmt, new_cmt, created: bool, decomp: bool): + # comments are only possible in functions in AM func_addr = self.interface.get_closest_function(address) - return False + if func_addr is None: + return False + + self.interface.comment_changed(Comment(address, new_cmt, func_addr=func_addr, decompiled=True)) + return True diff --git a/libbs/decompilers/angr/interface.py b/libbs/decompilers/angr/interface.py index ce581e85..f745383d 100644 --- a/libbs/decompilers/angr/interface.py +++ b/libbs/decompilers/angr/interface.py @@ -34,6 +34,7 @@ class AngrInterface(DecompilerInterface): """ def __init__(self, workspace=None, headless=False, binary_path: Path = None, **kwargs): + plugin_name = kwargs.get("plugin_name", "generic_plugin") if workspace is None and not headless: l.critical("The workspace provided is None, which will result in a broken BinSync.") return @@ -42,6 +43,12 @@ def __init__(self, workspace=None, headless=False, binary_path: Path = None, **k self.main_instance = workspace.main_instance if workspace else self self._binary_path = Path(binary_path) if binary_path is not None else binary_path self._ctx_menu_items = [] + if not headless: + self._am_logger = logging.getLogger(f"angrmanagement.{plugin_name}") + self._am_logger.setLevel(logging.INFO) + else: + self._am_logger = None + super().__init__(name="angr", artifact_lifter=AngrArtifactLifter(self), headless=headless, **kwargs) def _init_headless_components(self): @@ -52,6 +59,10 @@ def _init_headless_components(self): cfg = self.project.analyses.CFG(show_progressbar=True, normalize=True, data_references=True) self.project.analyses.CompleteCallingConventions(cfg=cfg, recover_variables=True) + # + # Decompiler API + # + def binary_hash(self) -> str: return self.main_instance.project.loader.main_object.md5.hex() @@ -83,9 +94,6 @@ def rebase_addr(self, addr, up=False): return rebased_addr - def goto_address(self, func_addr): - self.workspace.jump_to(self.rebase_addr(func_addr, up=True)) - def xrefs_to(self, artifact: Artifact) -> List[Artifact]: if not isinstance(artifact, Function): l.warning("xrefs_to is only implemented for functions.") @@ -111,6 +119,15 @@ def xrefs_to(self, artifact: Artifact) -> List[Artifact]: return xrefs def _decompile(self, function: Function) -> Optional[str]: + if function.dec_obj is not None: + dec_text = function.dec_obj.text + else: + function.dec_obj = self.get_decompilation_object(function) + dec_text = function.dec_obj.text if function.dec_obj else None + + return dec_text + + def get_decompilation_object(self, function: Function) -> Optional[object]: func = self.main_instance.project.kb.functions.get(function.addr, None) if func is None: return None @@ -121,10 +138,25 @@ def _decompile(self, function: Function) -> Optional[str]: l.warning(f"Failed to decompile {func} because {e}") codegen = None - if not codegen or not codegen.text: - return None + return codegen - return codegen.text + def local_variable_names(self, func: Function) -> List[str]: + codegen = self.decompile_function(self.main_instance.project.kb.functions[func.addr]) + if not codegen or not codegen.cfunc or not codegen.cfunc.variable_manager: + return [] + + return [v.name for v in codegen.cfunc.variable_manager._unified_variables] + + def rename_local_variables_by_names(self, func: Function, name_map: Dict[str, str]) -> bool: + codegen = self.decompile_function(self.main_instance.project.kb.functions[func.addr]) + if not codegen or not codegen.cfunc or not codegen.cfunc.variable_manager: + return False + + for v in codegen.cfunc.variable_manager._unified_variables: + if v.name in name_map: + v.name = name_map[v.name] + + return self.refresh_decompilation(func.addr) # # GUI API @@ -135,6 +167,9 @@ def _init_gui_plugin(self, *args, **kwargs): self.workspace.plugins.register_active_plugin(self._plugin_name, self.gui_plugin) return self.gui_plugin + def goto_address(self, func_addr): + self.workspace.jump_to(self.rebase_addr(func_addr, up=True)) + def register_ctx_menu_item(self, name, action_string, callback_func, category=None) -> bool: if self.gui_plugin is None: l.critical("Cannot register context menu item without a GUI plugin.") @@ -275,17 +310,48 @@ def _set_comment(self, comment: Comment, decompilation=None, **kwargs) -> bool: self.main_instance.project.kb.comments[comment.addr] = comment.comment changed |= True - return changed + func_addr = comment.func_addr or self.get_closest_function(comment.addr) + return changed & self.refresh_decompilation(func_addr) # # Utils # + def info(self, msg: str, **kwargs): + if self._am_logger is not None: + self._am_logger.info(msg) + + def debug(self, msg: str, **kwargs): + if self._am_logger is not None: + self._am_logger.debug(msg) + + def warning(self, msg: str, **kwargs): + if self._am_logger is not None: + self._am_logger.warning(msg) + + def error(self, msg: str, **kwargs): + if self._am_logger is not None: + self._am_logger.error(msg) + + def print(self, msg: str, **kwargs): + if self.headless: + print(msg) + else: + self.info(msg) + + # + # angr-management specific helpers + # + def refresh_decompilation(self, func_addr): + if self.headless: + return False + self.main_instance.workspace.jump_to(func_addr) view = self.main_instance.workspace._get_or_create_view("pseudocode", CodeView) view.codegen.am_event() view.focus() + return True def _headless_decompile(self, func): all_optimization_passes = angr.analyses.decompiler.optimization_passes.get_default_optimization_passes( @@ -344,10 +410,6 @@ def decompile_function(self, func, refresh_gui=False): return decomp - # - # Function Data Helpers - # - @staticmethod def find_stack_var_in_codegen(decompilation, stack_offset: int) -> Optional[angr.sim_variable.SimStackVariable]: for var in decompilation.cfunc.variable_manager._unified_variables: diff --git a/libbs/decompilers/ghidra/interface.py b/libbs/decompilers/ghidra/interface.py index f9bd99ae..9b522bba 100644 --- a/libbs/decompilers/ghidra/interface.py +++ b/libbs/decompilers/ghidra/interface.py @@ -466,20 +466,23 @@ def global_vars(self) -> Dict[int, GlobalVariable]: # Specialized print handlers # - def info(self, msg: str): + def print(self, msg, print_local=True, **kwargs): + self.ghidra.print(msg, print_local=print_local) + + def info(self, msg: str, **kwargs): _l.info(msg) self.print(self._fmt_log_msg(msg, "INFO"), print_local=False) - def debug(self, msg: str): + def debug(self, msg: str, **kwargs): _l.debug(msg) if _l.level >= logging.DEBUG: self.print(self._fmt_log_msg(msg, "DEBUG"), print_local=False) - def warning(self, msg: str): + def warning(self, msg: str, **kwargs): _l.warning(msg) self.print(self._fmt_log_msg(msg, "WARNING"), print_local=False) - def error(self, msg: str): + def error(self, msg: str, **kwargs): _l.error(msg) self.print(self._fmt_log_msg(msg, "ERROR"), print_local=False) @@ -494,9 +497,6 @@ def _fmt_log_msg(msg: str, level: str): return f"[{level}] | {log_path} | {msg}" - def print(self, string, print_local=True): - self.ghidra.print(string, print_local=print_local) - # # Ghidra Specific API # diff --git a/libbs/plugin_installer.py b/libbs/plugin_installer.py index f7a91b40..d78c06f4 100644 --- a/libbs/plugin_installer.py +++ b/libbs/plugin_installer.py @@ -223,10 +223,10 @@ def install_angr(self, path=None, interactive=True): import angrmanagement except ImportError: angr_resolved = False - default_path = Path(angrmanagement.__file__).parent if angr_resolved else None + default_path = Path(angrmanagement.__file__).parent / "plugins" if angr_resolved else None default_path, skip_ask = self._get_path_without_ask(path, default_path=default_path, interactive=interactive) - return self.ask_path("angr-management", "Angr Install Path", default=default_path) if not skip_ask \ + return self.ask_path("angr-management", "angr-management Plugins Path", default=default_path) if not skip_ask \ else default_path def install_gdb(self, path=None, interactive=True):