diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index f03fea3c6..0f1503ead 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -19,7 +19,7 @@ jobs: python-version: ${{ matrix.python-version }} - name: Install dependencies run: | - python -m pip install --upgrade pip pytest pytest-timeout + python -m pip install -e .[dev] - name: Test with pytest run: | pytest -v diff --git a/CHANGELOG.md b/CHANGELOG.md index 018ace895..3a10a7c9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,6 @@ # **Upcoming release** - -## XXX - -- XXX +## New feature +- #464 Improve autoimport code to use a sqllite3 database, cache all available modules quickly, search for names and produce import statements, sort import statements. # Release 1.0.0 diff --git a/docs/library.rst b/docs/library.rst index 60f6303e6..a4031f375 100644 --- a/docs/library.rst +++ b/docs/library.rst @@ -835,6 +835,23 @@ returns the list of modules with the given global name. ``AutoImport.import_assist()`` tries to find the modules that have a global name that starts with the given prefix. +It uses an sqllite3 database, which can be made persistent by passing memory as false to the constructor. +It must be closed when done with the ```AutoImport.close()``` method. + +AutoImport can search for a name from both modules and statements you can import from them. + +.. code-block:: python + + from rope.base.project import Project + from rope.contrib.autoimport import AutoImport + + project = Project("/path/to/project") + autoimport = AutoImport(project, memory=False) + autoimport.generate_resource_cache() # Generates a cache of the local modules, from the project you're working on + autoimport.generate_modules_cache() # Generates a cache of external modules + print(autoimport.search("Dict")) + autoimport.close() + Cross-Project Refactorings -------------------------- diff --git a/rope/contrib/autoimport.py b/rope/contrib/autoimport.py deleted file mode 100644 index 4d6890e2a..000000000 --- a/rope/contrib/autoimport.py +++ /dev/null @@ -1,226 +0,0 @@ -import re - -from rope.base import builtins -from rope.base import exceptions -from rope.base import libutils -from rope.base import pynames -from rope.base import pyobjects -from rope.base import resources -from rope.base import resourceobserver -from rope.base import taskhandle -from rope.refactor import importutils - - -class AutoImport(object): - """A class for finding the module that provides a name - - This class maintains a cache of global names in python modules. - Note that this cache is not accurate and might be out of date. - - """ - - def __init__(self, project, observe=True, underlined=False): - """Construct an AutoImport object - - If `observe` is `True`, listen for project changes and update - the cache. - - If `underlined` is `True`, underlined names are cached, too. - """ - self.project = project - self.underlined = underlined - self.names = project.data_files.read_data("globalnames") - if self.names is None: - self.names = {} - project.data_files.add_write_hook(self._write) - # XXX: using a filtered observer - observer = resourceobserver.ResourceObserver( - changed=self._changed, moved=self._moved, removed=self._removed - ) - if observe: - project.add_observer(observer) - - def import_assist(self, starting): - """Return a list of ``(name, module)`` tuples - - This function tries to find modules that have a global name - that starts with `starting`. - """ - # XXX: breaking if gave up! use generators - result = [] - for module in self.names: - for global_name in self.names[module]: - if global_name.startswith(starting): - result.append((global_name, module)) - return result - - def get_modules(self, name): - """Return the list of modules that have global `name`""" - result = [] - for module in self.names: - if name in self.names[module]: - result.append(module) - return result - - def get_all_names(self): - """Return the list of all cached global names""" - result = set() - for module in self.names: - result.update(set(self.names[module])) - return result - - def get_name_locations(self, name): - """Return a list of ``(resource, lineno)`` tuples""" - result = [] - for module in self.names: - if name in self.names[module]: - try: - pymodule = self.project.get_module(module) - if name in pymodule: - pyname = pymodule[name] - module, lineno = pyname.get_definition_location() - if module is not None: - resource = module.get_module().get_resource() - if resource is not None and lineno is not None: - result.append((resource, lineno)) - except exceptions.ModuleNotFoundError: - pass - return result - - def generate_cache( - self, resources=None, underlined=None, task_handle=taskhandle.NullTaskHandle() - ): - """Generate global name cache for project files - - If `resources` is a list of `rope.base.resource.File`, only - those files are searched; otherwise all python modules in the - project are cached. - - """ - if resources is None: - resources = self.project.get_python_files() - job_set = task_handle.create_jobset( - "Generating autoimport cache", len(resources) - ) - for file in resources: - job_set.started_job("Working on <%s>" % file.path) - self.update_resource(file, underlined) - job_set.finished_job() - - def generate_modules_cache( - self, modules, underlined=None, task_handle=taskhandle.NullTaskHandle() - ): - """Generate global name cache for modules listed in `modules`""" - job_set = task_handle.create_jobset( - "Generating autoimport cache for modules", len(modules) - ) - for modname in modules: - job_set.started_job("Working on <%s>" % modname) - if modname.endswith(".*"): - mod = self.project.find_module(modname[:-2]) - if mod: - for sub in submodules(mod): - self.update_resource(sub, underlined) - else: - self.update_module(modname, underlined) - job_set.finished_job() - - def clear_cache(self): - """Clear all entries in global-name cache - - It might be a good idea to use this function before - regenerating global names. - - """ - self.names.clear() - - def find_insertion_line(self, code): - """Guess at what line the new import should be inserted""" - match = re.search(r"^(def|class)\s+", code) - if match is not None: - code = code[: match.start()] - try: - pymodule = libutils.get_string_module(self.project, code) - except exceptions.ModuleSyntaxError: - return 1 - testmodname = "__rope_testmodule_rope" - importinfo = importutils.NormalImport(((testmodname, None),)) - module_imports = importutils.get_module_imports(self.project, pymodule) - module_imports.add_import(importinfo) - code = module_imports.get_changed_source() - offset = code.index(testmodname) - lineno = code.count("\n", 0, offset) + 1 - return lineno - - def update_resource(self, resource, underlined=None): - """Update the cache for global names in `resource`""" - try: - pymodule = self.project.get_pymodule(resource) - modname = self._module_name(resource) - self._add_names(pymodule, modname, underlined) - except exceptions.ModuleSyntaxError: - pass - - def update_module(self, modname, underlined=None): - """Update the cache for global names in `modname` module - - `modname` is the name of a module. - """ - try: - pymodule = self.project.get_module(modname) - self._add_names(pymodule, modname, underlined) - except exceptions.ModuleNotFoundError: - pass - - def _module_name(self, resource): - return libutils.modname(resource) - - def _add_names(self, pymodule, modname, underlined): - if underlined is None: - underlined = self.underlined - globals = [] - if isinstance(pymodule, pyobjects.PyDefinedObject): - attributes = pymodule._get_structural_attributes() - else: - attributes = pymodule.get_attributes() - for name, pyname in attributes.items(): - if not underlined and name.startswith("_"): - continue - if isinstance(pyname, (pynames.AssignedName, pynames.DefinedName)): - globals.append(name) - if isinstance(pymodule, builtins.BuiltinModule): - globals.append(name) - self.names[modname] = globals - - def _write(self): - self.project.data_files.write_data("globalnames", self.names) - - def _changed(self, resource): - if not resource.is_folder(): - self.update_resource(resource) - - def _moved(self, resource, newresource): - if not resource.is_folder(): - modname = self._module_name(resource) - if modname in self.names: - del self.names[modname] - self.update_resource(newresource) - - def _removed(self, resource): - if not resource.is_folder(): - modname = self._module_name(resource) - if modname in self.names: - del self.names[modname] - - -def submodules(mod): - if isinstance(mod, resources.File): - if mod.name.endswith(".py") and mod.name != "__init__.py": - return set([mod]) - return set() - if not mod.has_child("__init__.py"): - return set() - result = set([mod]) - for child in mod.get_children(): - result |= submodules(child) - return result diff --git a/rope/contrib/autoimport/__init__.py b/rope/contrib/autoimport/__init__.py new file mode 100644 index 000000000..1990bad21 --- /dev/null +++ b/rope/contrib/autoimport/__init__.py @@ -0,0 +1,4 @@ +"""AutoImport module for rope.""" +from .autoimport import AutoImport + +__all__ = ["AutoImport"] diff --git a/rope/contrib/autoimport/autoimport.py b/rope/contrib/autoimport/autoimport.py new file mode 100644 index 000000000..7e03d38d9 --- /dev/null +++ b/rope/contrib/autoimport/autoimport.py @@ -0,0 +1,524 @@ +"""AutoImport module for rope.""" +import pathlib +import re +import sqlite3 +import sys +from collections import OrderedDict +from concurrent.futures import Future, ProcessPoolExecutor, as_completed +from itertools import chain +from typing import Generator, Iterable, List, Optional, Set, Tuple + +from rope.base import exceptions, libutils, resourceobserver, taskhandle +from rope.base.project import Project +from rope.base.resources import Resource +from rope.contrib.autoimport.defs import ( + ModuleFile, + Name, + NameType, + Package, + PackageType, + SearchResult, + Source, +) +from rope.contrib.autoimport.parse import get_names +from rope.contrib.autoimport.utils import ( + get_files, + get_modname_from_path, + get_package_tuple, + sort_and_deduplicate, + sort_and_deduplicate_tuple, +) +from rope.refactor import importutils + + +def get_future_names( + packages: List[Package], + underlined: bool, + job_set: taskhandle.JobSet, +) -> Generator[Future[Iterable[Name]], None, None]: + """Get all names as futures.""" + with ProcessPoolExecutor() as executor: + for package in packages: + for module in get_files(package, underlined): + job_set.started_job(module.modname) + if not isinstance(job_set, taskhandle.NullJobSet): + job_set.count += 1 + yield executor.submit(get_names, module, package) + + +def filter_packages( + packages: Iterable[Package], underlined: bool, existing: List[str] +) -> Iterable[Package]: + """Filter list of packages to parse.""" + if underlined: + + def filter_package(package: Package) -> bool: + return package.name not in existing + + else: + + def filter_package(package: Package) -> bool: + return package.name not in existing and not package.name.startswith("_") + + return filter(filter_package, packages) + + +class AutoImport: + """A class for finding the module that provides a name. + + This class maintains a cache of global names in python modules. + Note that this cache is not accurate and might be out of date. + + """ + + connection: sqlite3.Connection + underlined: bool + rope_project: Project + project: Package + + def __init__(self, project: Project, observe=True, underlined=False, memory=True): + """Construct an AutoImport object. + + Parameters + ___________ + project : rope.base.project.Project + the project to use for project imports + observe : bool + if true, listen for project changes and update the cache. + underlined : bool + If `underlined` is `True`, underlined names are cached, too. + memory : bool + if true, don't persist to disk + """ + self.rope_project = project + project_package = get_package_tuple( + pathlib.Path(project.root.real_path), project + ) + assert project_package is not None + assert project_package.path is not None + self.project = project_package + self.underlined = underlined + db_path: str + if memory or project.ropefolder is None: + db_path = ":memory:" + else: + db_path = f"{project.ropefolder.path}/autoimport.db" + self.connection = sqlite3.connect(db_path) + self._setup_db() + if observe: + observer = resourceobserver.ResourceObserver( + changed=self._changed, moved=self._moved, removed=self._removed + ) + project.add_observer(observer) + + def _setup_db(self): + packages_table = "(package TEXT)" + names_table = ( + "(name TEXT, module TEXT, package TEXT, source INTEGER, type INTEGER)" + ) + self.connection.execute(f"create table if not exists names{names_table}") + self.connection.execute(f"create table if not exists packages{packages_table}") + self.connection.execute("CREATE INDEX IF NOT EXISTS name on names(name)") + self.connection.execute("CREATE INDEX IF NOT EXISTS module on names(module)") + self.connection.execute("CREATE INDEX IF NOT EXISTS package on names(package)") + self.connection.commit() + + def import_assist(self, starting: str): + """ + Find modules that have a global name that starts with `starting`. + + For a more complete list, use the search or search_full methods. + + Parameters + __________ + starting : str + what all the names should start with + Return + __________ + Return a list of ``(name, module)`` tuples + """ + results = self.connection.execute( + "select name, module, source from names WHERE name LIKE (?)", + (starting + "%",), + ).fetchall() + return sort_and_deduplicate_tuple( + results + ) # Remove duplicates from multiple occurences of the same item + + def search(self, name: str, exact_match: bool = False) -> List[Tuple[str, str]]: + """ + Search both modules and names for an import string. + + This is a simple wrapper around search_full with basic sorting based on Source. + + Returns a sorted list of import statement, modname pairs + """ + results: List[Tuple[str, str, int]] = [ + (statement, import_name, source) + for statement, import_name, source, type in self.search_full( + name, exact_match + ) + ] + return sort_and_deduplicate_tuple(results) + + def search_full( + self, + name: str, + exact_match: bool = False, + ignored_names: Set[str] = set(), + ) -> Generator[SearchResult, None, None]: + """ + Search both modules and names for an import string. + + Parameters + __________ + name: str + Name to search for + exact_match: bool + If using exact_match, only search for that name. + Otherwise, search for any name starting with that name. + ignored_names : Set[str] + Will ignore any names in this set + + Return + __________ + Unsorted Generator of SearchResults. Each is guaranteed to be unique. + """ + results = set(self._search_name(name, exact_match)) + results = results.union(self._search_module(name, exact_match)) + for result in results: + if result.name not in ignored_names: + yield result + + def _search_name( + self, name: str, exact_match: bool = False + ) -> Generator[SearchResult, None, None]: + """ + Search both names for avalible imports. + + Returns the import statement, import name, source, and type. + """ + if not exact_match: + name = name + "%" # Makes the query a starts_with query + for import_name, module, source, name_type in self.connection.execute( + "SELECT name, module, source, type FROM names WHERE name LIKE (?)", (name,) + ): + yield ( + SearchResult( + f"from {module} import {import_name}", + import_name, + source, + name_type, + ) + ) + + def _search_module( + self, name: str, exact_match: bool = False + ) -> Generator[SearchResult, None, None]: + """ + Search both modules for avalible imports. + + Returns the import statement, import name, source, and type. + """ + if not exact_match: + name = name + "%" # Makes the query a starts_with query + for module, source in self.connection.execute( + "Select module, source FROM names where module LIKE (?)", + ("%." + name,), + ): + parts = module.split(".") + import_name = parts[-1] + remaining = parts[0] + for part in parts[1:-1]: + remaining += "." + remaining += part + yield ( + SearchResult( + f"from {remaining} import {import_name}", + import_name, + source, + NameType.Module.value, + ) + ) + for module, source in self.connection.execute( + "Select module, source from names where module LIKE (?)", (name,) + ): + if "." in module: + continue + yield SearchResult( + f"import {module}", module, source, NameType.Module.value + ) + + def get_modules(self, name) -> List[str]: + """Get the list of modules that have global `name`.""" + results = self.connection.execute( + "SELECT module, source FROM names WHERE name LIKE (?)", (name,) + ).fetchall() + return sort_and_deduplicate(results) + + def get_all_names(self) -> List[str]: + """Get the list of all cached global names.""" + results = self.connection.execute("select name from names").fetchall() + return results + + def _dump_all(self) -> Tuple[List[Name], List[Package]]: + """Dump the entire database.""" + name_results = self.connection.execute("select * from names").fetchall() + package_results = self.connection.execute("select * from packages").fetchall() + return name_results, package_results + + def generate_cache( + self, + resources: List[Resource] = None, + underlined: bool = False, + task_handle=taskhandle.NullTaskHandle(), + ): + """Generate global name cache for project files. + + If `resources` is a list of `rope.base.resource.File`, only + those files are searched; otherwise all python modules in the + project are cached. + """ + if resources is None: + resources = self.rope_project.get_python_files() + job_set = task_handle.create_jobset( + "Generating autoimport cache", len(resources) + ) + self.connection.execute( + "delete from names where package = ?", (self.project.name,) + ) + futures = [] + with ProcessPoolExecutor() as executor: + for file in resources: + job_set.started_job(f"Working on {file.path}") + module = self._resource_to_module(file, underlined) + futures.append(executor.submit(get_names, module, self.project)) + for future in as_completed(futures): + self._add_names(future.result()) + job_set.finished_job() + self.connection.commit() + + def generate_modules_cache( + self, + modules: List[str] = None, + task_handle=taskhandle.NullTaskHandle(), + single_thread: bool = False, + underlined: bool = False, + ): + """ + Generate global name cache for external modules listed in `modules`. + + If no modules are provided, it will generate a cache for every module avalible. + This method searches in your sys.path and configured python folders. + Do not use this for generating your own project's internal names, + use generate_resource_cache for that instead. + """ + packages: List[Package] = [] + if self.underlined: + underlined = True + existing = self._get_existing() + if modules is None: + packages = self._get_available_packages() + else: + for modname in modules: + package = self._find_package_path(modname) + if package is None: + continue + packages.append(package) + packages = list(filter_packages(packages, underlined, existing)) + if len(packages) == 0: + return + self._add_packages(packages) + job_set = task_handle.create_jobset("Generating autoimport cache", 0) + if single_thread: + for package in packages: + for module in get_files(package, underlined): + job_set.started_job(module.modname) + for name in get_names(module, package): + self._add_name(name) + job_set.finished_job() + else: + for future_name in as_completed( + get_future_names(packages, underlined, job_set) + ): + self._add_names(future_name.result()) + job_set.finished_job() + + self.connection.commit() + + def update_module(self, module: str): + """Update a module in the cache, or add it if it doesn't exist.""" + self._del_if_exist(module) + self.generate_modules_cache([module]) + + def close(self): + """Close the autoimport database.""" + self.connection.commit() + self.connection.close() + + def get_name_locations(self, name): + """Return a list of ``(resource, lineno)`` tuples.""" + result = [] + modules = self.connection.execute( + "select module from names where name like (?)", (name,) + ).fetchall() + for module in modules: + try: + module_name = module[0] + if module_name.startswith(f"{self.project.name}."): + module_name = ".".join(module_name.split(".")) + pymodule = self.rope_project.get_module(module_name) + if name in pymodule: + pyname = pymodule[name] + module, lineno = pyname.get_definition_location() + if module is not None: + resource = module.get_module().get_resource() + if resource is not None and lineno is not None: + result.append((resource, lineno)) + except exceptions.ModuleNotFoundError: + pass + return result + + def clear_cache(self): + """Clear all entries in global-name cache. + + It might be a good idea to use this function before + regenerating global names. + + """ + self.connection.execute("drop table names") + self._setup_db() + self.connection.commit() + + def find_insertion_line(self, code): + """Guess at what line the new import should be inserted.""" + match = re.search(r"^(def|class)\s+", code) + if match is not None: + code = code[: match.start()] + try: + pymodule = libutils.get_string_module(self.rope_project, code) + except exceptions.ModuleSyntaxError: + return 1 + testmodname = "__rope_testmodule_rope" + importinfo = importutils.NormalImport(((testmodname, None),)) + module_imports = importutils.get_module_imports(self.rope_project, pymodule) + module_imports.add_import(importinfo) + code = module_imports.get_changed_source() + offset = code.index(testmodname) + lineno = code.count("\n", 0, offset) + 1 + return lineno + + def update_resource( + self, resource: Resource, underlined: bool = False, commit: bool = True + ): + """Update the cache for global names in `resource`.""" + underlined = underlined if underlined else self.underlined + module = self._resource_to_module(resource, underlined) + self._del_if_exist(module_name=module.modname, commit=False) + for name in get_names(module, self.project): + self._add_name(name) + if commit: + self.connection.commit() + + def _changed(self, resource): + if not resource.is_folder(): + self.update_resource(resource) + + def _moved(self, resource: Resource, newresource: Resource): + if not resource.is_folder(): + modname = self._resource_to_module(resource).modname + self._del_if_exist(modname) + self.update_resource(newresource) + + def _del_if_exist(self, module_name, commit: bool = True): + self.connection.execute("delete from names where module = ?", (module_name,)) + if commit: + self.connection.commit() + + def _get_python_folders(self) -> List[pathlib.Path]: + folders = self.rope_project.get_python_path_folders() + folder_paths = [ + pathlib.Path(folder.path) for folder in folders if folder.path != "/usr/bin" + ] + return list(OrderedDict.fromkeys(folder_paths)) + + def _get_available_packages(self) -> List[Package]: + packages: List[Package] = [ + Package(module, Source.BUILTIN, None, PackageType.BUILTIN) + for module in sys.builtin_module_names + ] + for folder in self._get_python_folders(): + for package in folder.iterdir(): + package_tuple = get_package_tuple(package, self.rope_project) + if package_tuple is None: + continue + packages.append(package_tuple) + return packages + + def _add_packages(self, packages: List[Package]): + for package in packages: + self.connection.execute("INSERT into packages values(?)", (package.name,)) + + def _get_existing(self) -> List[str]: + existing: List[str] = list( + chain(*self.connection.execute("select * from packages").fetchall()) + ) + existing.append(self.project.name) + return existing + + def _removed(self, resource): + if not resource.is_folder(): + modname = self._resource_to_module(resource).modname + self._del_if_exist(modname) + + def _add_future_names(self, names: Future[List[Name]]): + self._add_names(names.result()) + + def _add_names(self, names: Iterable[Name]): + for name in names: + self._add_name(name) + + def _add_name(self, name: Name): + self.connection.execute( + "insert into names values (?,?,?,?,?)", + ( + name.name, + name.modname, + name.package, + name.source.value, + name.name_type.value, + ), + ) + + def _find_package_path(self, target_name: str) -> Optional[Package]: + if target_name in sys.builtin_module_names: + return Package(target_name, Source.BUILTIN, None, PackageType.BUILTIN) + for folder in self._get_python_folders(): + for package in folder.iterdir(): + package_tuple = get_package_tuple(package, self.rope_project) + if package_tuple is None: + continue + name, source, package_path, package_type = package_tuple + if name == target_name: + return package_tuple + + return None + + def _resource_to_module( + self, resource: Resource, underlined: bool = False + ) -> ModuleFile: + assert self.project.path + underlined = underlined if underlined else self.underlined + resource_path: pathlib.Path = pathlib.Path(resource.real_path) + # The project doesn't need its name added to the path, + # since the standard python file layout accounts for that + # so we set add_package_name to False + resource_modname: str = get_modname_from_path( + resource_path, self.project.path, add_package_name=False + ) + return ModuleFile( + resource_path, + resource_modname, + underlined, + resource_path.name == "__init__.py", + ) diff --git a/rope/contrib/autoimport/defs.py b/rope/contrib/autoimport/defs.py new file mode 100644 index 000000000..b5559221c --- /dev/null +++ b/rope/contrib/autoimport/defs.py @@ -0,0 +1,118 @@ +"""Definitions of types for the Autoimport program.""" +import pathlib +from enum import Enum +from typing import NamedTuple, Optional + + +class Source(Enum): + """Describes the source of the package, for sorting purposes.""" + + PROJECT = 0 # Obviously any project packages come first + MANUAL = 1 # Placeholder since Autoimport classifies manually added modules + BUILTIN = 2 + STANDARD = 3 # We want to favor standard library items + SITE_PACKAGE = 4 + UNKNOWN = 5 + + # modified_time + + +class ModuleInfo(NamedTuple): + """Descriptor of information to get names from a module.""" + + filepath: Optional[pathlib.Path] + modname: str + underlined: bool + process_imports: bool + + +class ModuleFile(ModuleInfo): + """Descriptor of information to get names from a file using ast.""" + + filepath: pathlib.Path + modname: str + underlined: bool + process_imports: bool + + +class ModuleCompiled(ModuleInfo): + """Descriptor of information to get names using imports.""" + + filepath = None + modname: str + underlined: bool + process_imports: bool + + +class PackageType(Enum): + """Describes the type of package, to determine how to get the names from it.""" + + BUILTIN = 0 # No file exists, compiled into python. IE: Sys + STANDARD = 1 # Just a folder + COMPILED = 2 # .so module + SINGLE_FILE = 3 # a .py file + + +class NameType(Enum): + """Describes the type of Name for lsp completions. Taken from python lsp server.""" + + Text = 1 + Method = 2 + Function = 3 + Constructor = 4 + Field = 5 + Variable = 6 + Class = 7 + Interface = 8 + Module = 9 + Property = 10 + Unit = 11 + Value = 12 + Enum = 13 + Keyword = 14 + Snippet = 15 + Color = 16 + File = 17 + Reference = 18 + Folder = 19 + EnumMember = 20 + Constant = 21 + Struct = 22 + Event = 23 + Operator = 24 + TypeParameter = 25 + + +class Package(NamedTuple): + """Attributes of a package.""" + + name: str + source: Source + path: Optional[pathlib.Path] + type: PackageType + + +class Name(NamedTuple): + """A Name to be added to the database.""" + + name: str + modname: str + package: str + source: Source + name_type: NameType + + +class PartialName(NamedTuple): + """Partial information of a Name.""" + + name: str + name_type: NameType + + +class SearchResult(NamedTuple): + """Search Result.""" + + import_statement: str + name: str + source: int + itemkind: int diff --git a/rope/contrib/autoimport/parse.py b/rope/contrib/autoimport/parse.py new file mode 100644 index 000000000..433e54bf5 --- /dev/null +++ b/rope/contrib/autoimport/parse.py @@ -0,0 +1,159 @@ +""" +Functions to find importable names. + +Can extract names from source code of a python file, .so object, or builtin module. +""" + +import ast +import inspect +import logging +import pathlib +from importlib import import_module +from typing import Generator, List + +from .defs import ( + ModuleCompiled, + ModuleFile, + ModuleInfo, + Name, + NameType, + Package, + PartialName, + Source, +) + +logger = logging.getLogger(__name__) + + +def get_type_ast(node: ast.AST) -> NameType: + """Get the lsp type of a node.""" + if isinstance(node, ast.ClassDef): + return NameType.Class + if isinstance(node, ast.FunctionDef): + return NameType.Function + if isinstance(node, ast.Assign): + return NameType.Variable + return NameType.Variable # default value + + +def get_names_from_file( + module: pathlib.Path, + package_name: str = "", + underlined: bool = False, + process_imports: bool = False, +) -> Generator[PartialName, None, None]: + """Get all the names from a given file using ast.""" + try: + root_node = ast.parse(module.read_bytes()) + except SyntaxError as error: + print(error) + return + for node in ast.iter_child_nodes(root_node): + if isinstance(node, ast.Assign): + for target in node.targets: + try: + assert isinstance(target, ast.Name) + if underlined or not target.id.startswith("_"): + yield PartialName( + target.id, + get_type_ast(node), + ) + except (AttributeError, AssertionError): + # TODO handle tuple assignment + pass + elif isinstance(node, (ast.FunctionDef, ast.ClassDef)): + if underlined or not node.name.startswith("_"): + yield PartialName( + node.name, + get_type_ast(node), + ) + elif process_imports and isinstance(node, ast.ImportFrom): + # When we process imports, we want to include names in it's own package. + if node.level == 0: + continue + if not node.module or package_name is node.module.split(".")[0]: + continue + for name in node.names: + if isinstance(name, ast.alias): + if name.asname: + real_name = name.asname + else: + real_name = name.name + else: + real_name = name + if underlined or not real_name.startswith("_"): + yield PartialName(real_name, get_type_ast(node)) + + +def get_type_object(imported_object) -> NameType: + """Determine the type of an object.""" + if inspect.isclass(imported_object): + return NameType.Class + if inspect.isfunction(imported_object) or inspect.isbuiltin(imported_object): + return NameType.Function + return NameType.Variable + + +def get_names(module: ModuleInfo, package: Package) -> List[Name]: + """Get all names from a module and package.""" + if isinstance(module, ModuleCompiled): + return list( + get_names_from_compiled(package.name, package.source, module.underlined) + ) + if isinstance(module, ModuleFile): + return [ + combine(package, module, partial_name) + for partial_name in get_names_from_file( + module.filepath, + package.name, + underlined=module.underlined, + process_imports=module.process_imports, + ) + ] + return [] + + +def get_names_from_compiled( + package: str, + source: Source, + underlined: bool = False, +) -> Generator[Name, None, None]: + """ + Get the names from a compiled module. + + Instead of using ast, it imports the module. + Parameters + ---------- + package : str + package to import. Must be in sys.path + underlined : bool + include underlined names + """ + # builtins is banned because you never have to import it + # python_crun is banned because it crashes python + banned = ["builtins", "python_crun"] + if package in banned or (package.startswith("_") and not underlined): + return # Builtins is redundant since you don't have to import it. + if source not in (Source.BUILTIN, Source.STANDARD): + return + try: + module = import_module(str(package)) + except ImportError: + logger.error(f"{package} could not be imported for autoimport analysis") + return + else: + for name, value in inspect.getmembers(module): + if underlined or not name.startswith("_"): + if ( + inspect.isclass(value) + or inspect.isfunction(value) + or inspect.isbuiltin(value) + ): + yield Name( + str(name), package, package, source, get_type_object(value) + ) + + +def combine(package: Package, module: ModuleFile, name: PartialName) -> Name: + """Combine information to form a full name.""" + return Name(name.name, module.modname, package.name, package.source, name.name_type) diff --git a/rope/contrib/autoimport/utils.py b/rope/contrib/autoimport/utils.py new file mode 100644 index 000000000..bb49b6a49 --- /dev/null +++ b/rope/contrib/autoimport/utils.py @@ -0,0 +1,124 @@ +"""Utility functions for the autoimport code.""" +import pathlib +import sys +from collections import OrderedDict +from typing import Generator, List, Optional, Tuple + +from rope.base.project import Project + +from .defs import ModuleCompiled, ModuleFile, ModuleInfo, Package, PackageType, Source + + +def get_package_tuple( + package_path: pathlib.Path, project: Optional[Project] = None +) -> Optional[Package]: + """ + Get package name and type from a path. + + Checks for common issues, such as not being a viable python module + Returns None if not a viable package. + """ + package_name = package_path.name + package_source = get_package_source(package_path, project) + if package_name.startswith(".") or package_name == "__pycache__": + return None + if package_path.is_file(): + if package_name.endswith(".so"): + name = package_name.split(".")[0] + return Package(name, package_source, package_path, PackageType.COMPILED) + if package_name.endswith(".py"): + stripped_name = package_path.stem + return Package( + stripped_name, package_source, package_path, PackageType.SINGLE_FILE + ) + return None + if package_name.endswith((".egg-info", ".dist-info")): + return None + return Package(package_name, package_source, package_path, PackageType.STANDARD) + + +def get_package_source( + package: pathlib.Path, project: Optional[Project] = None +) -> Source: + """Detect the source of a given package. Rudimentary implementation.""" + if project is not None and project.address in str(package): + return Source.PROJECT + if "site-packages" in package.parts: + return Source.SITE_PACKAGE + if package.as_posix().startswith(sys.prefix): + return Source.STANDARD + return Source.UNKNOWN + + +def get_modname_from_path( + modpath: pathlib.Path, package_path: pathlib.Path, add_package_name: bool = True +) -> str: + """Get module name from a path in respect to package.""" + package_name: str = package_path.stem + rel_path_parts = modpath.relative_to(package_path).parts + modname = "" + if len(rel_path_parts) > 0: + for part in rel_path_parts[:-1]: + modname += part + modname += "." + if rel_path_parts[-1] == "__init__": + modname = modname[:-1] + else: + modname = modname + modpath.stem + if add_package_name: + modname = package_name if modname == "" else package_name + "." + modname + else: + assert modname != "." + return modname + + +def sort_and_deduplicate(results: List[Tuple[str, int]]) -> List[str]: + """Sort and deduplicate a list of name, source entries.""" + results = sorted(results, key=lambda y: y[-1]) + results_sorted = [name for name, source in results] + return list(OrderedDict.fromkeys(results_sorted)) + + +def sort_and_deduplicate_tuple( + results: List[Tuple[str, str, int]] +) -> List[Tuple[str, str]]: + """Sort and deduplicate a list of name, module, source entries.""" + results = sorted(results, key=lambda y: y[-1]) + results_sorted = [result[:-1] for result in results] + return list(OrderedDict.fromkeys(results_sorted)) + + +def should_parse(path: pathlib.Path, underlined: bool) -> bool: + if underlined: + return True + for part in path.parts: + if part.startswith("_"): + return False + return True + + +def get_files( + package: Package, underlined: bool = False +) -> Generator[ModuleInfo, None, None]: + """Find all files to parse in a given path using __init__.py.""" + if package.type in (PackageType.COMPILED, PackageType.BUILTIN): + if package.source in (Source.STANDARD, Source.BUILTIN): + yield ModuleCompiled(None, package.name, underlined, True) + elif package.type == PackageType.SINGLE_FILE: + assert package.path + assert package.path.suffix == ".py" + yield ModuleFile(package.path, package.path.stem, underlined, False) + else: + assert package.path + for file in package.path.glob("*.py"): + if file.name == "__init__.py": + yield ModuleFile( + file, + get_modname_from_path(file.parent, package.path), + underlined, + True, + ) + elif should_parse(file, underlined): + yield ModuleFile( + file, get_modname_from_path(file, package.path), underlined, False + ) diff --git a/ropetest/contrib/autoimport/conftest.py b/ropetest/contrib/autoimport/conftest.py new file mode 100644 index 000000000..c539eb325 --- /dev/null +++ b/ropetest/contrib/autoimport/conftest.py @@ -0,0 +1,57 @@ +import pathlib + +import pytest + +from ropetest import testutils + + +@pytest.fixture +def project(): + project = testutils.sample_project() + yield project + testutils.remove_project(project) + + +@pytest.fixture +def mod1(project): + mod1 = testutils.create_module(project, "mod1") + yield mod1 + + +@pytest.fixture +def mod1_path(mod1): + yield pathlib.Path(mod1.real_path) + + +@pytest.fixture +def project_path(project): + yield pathlib.Path(project.address) + + +@pytest.fixture +def typing_path(): + import typing + + yield pathlib.Path(typing.__file__) + +@pytest.fixture +def build_env_path(): + from build import env + + yield pathlib.Path(env.__file__) + + +@pytest.fixture +def build_path(): + import build + + # Uses __init__.py so we need the parent + + yield pathlib.Path(build.__file__).parent + + +@pytest.fixture +def zlib_path(): + import zlib + + yield pathlib.Path(zlib.__file__) diff --git a/ropetest/contrib/autoimport/parsetest.py b/ropetest/contrib/autoimport/parsetest.py new file mode 100644 index 000000000..a5b82e699 --- /dev/null +++ b/ropetest/contrib/autoimport/parsetest.py @@ -0,0 +1,21 @@ +from rope.contrib.autoimport import parse +from rope.contrib.autoimport.defs import Name, NameType, PartialName, Source + + +def test_typing_names(typing_path): + names = list(parse.get_names_from_file(typing_path)) + assert PartialName("Dict", NameType.Variable) in names + import typing + name_set = set(name.name for name in names) + for name in typing.__all__: + assert name in name_set + + +def test_find_sys(): + names = list(parse.get_names_from_compiled("sys", Source.BUILTIN)) + assert Name("exit", "sys", "sys", Source.BUILTIN, NameType.Function) in names + + +def test_find_underlined(): + names = list(parse.get_names_from_compiled("os", Source.BUILTIN, underlined=True)) + assert Name("_exit", "os", "os", Source.BUILTIN, NameType.Function) in names diff --git a/ropetest/contrib/autoimport/utilstest.py b/ropetest/contrib/autoimport/utilstest.py new file mode 100644 index 000000000..423c0e754 --- /dev/null +++ b/ropetest/contrib/autoimport/utilstest.py @@ -0,0 +1,59 @@ +"""Tests for autoimport utility functions, written in pytest""" +import pathlib + +from rope.contrib.autoimport import utils +from rope.contrib.autoimport.defs import Package, PackageType, Source + + +def test_get_package_source(mod1_path, project): + assert utils.get_package_source(mod1_path, project) == Source.PROJECT + + +def test_get_package_source_not_project(mod1_path): + assert utils.get_package_source(mod1_path) == Source.UNKNOWN + + +def test_get_package_source_pytest(build_path): + # pytest is not installed as part of the standard library + # but should be installed into site_packages, + # so it should return Source.SITE_PACKAGE + assert utils.get_package_source(build_path) == Source.SITE_PACKAGE + + +def test_get_package_source_typing(typing_path): + + assert utils.get_package_source(typing_path) == Source.STANDARD + + +def test_get_modname_project_no_add(mod1_path, project_path): + + assert utils.get_modname_from_path(mod1_path, project_path, False) == "mod1" + + +def test_get_modname_single_file(typing_path): + + assert utils.get_modname_from_path(typing_path, typing_path) == "typing" + + +def test_get_modname_folder(build_path, build_env_path): + + assert utils.get_modname_from_path(build_env_path, build_path) == "build.env" + + +def test_get_package_tuple_sample(project_path): + assert Package( + "sample_project", Source.UNKNOWN, project_path, PackageType.STANDARD + ) == utils.get_package_tuple(project_path) + + +def test_get_package_tuple_typing(typing_path): + + assert Package( + "typing", Source.STANDARD, typing_path, PackageType.SINGLE_FILE + ) == utils.get_package_tuple(typing_path) + + +def test_get_package_tuple_compiled(zlib_path): + assert Package( + "zlib", Source.STANDARD, zlib_path, PackageType.COMPILED + ) == utils.get_package_tuple(zlib_path) diff --git a/ropetest/contrib/autoimporttest.py b/ropetest/contrib/autoimporttest.py index 995eba6b9..5d7b2eef2 100644 --- a/ropetest/contrib/autoimporttest.py +++ b/ropetest/contrib/autoimporttest.py @@ -3,8 +3,8 @@ except ImportError: import unittest -from ropetest import testutils from rope.contrib import autoimport +from ropetest import testutils class AutoImportTest(unittest.TestCase): @@ -28,11 +28,6 @@ def test_update_resource(self): self.importer.update_resource(self.mod1) self.assertEqual([("myvar", "mod1")], self.importer.import_assist("myva")) - def test_update_module(self): - self.mod1.write("myvar = None") - self.importer.update_module("mod1") - self.assertEqual([("myvar", "mod1")], self.importer.import_assist("myva")) - def test_update_non_existent_module(self): self.importer.update_module("does_not_exists_this") self.assertEqual([], self.importer.import_assist("myva")) @@ -118,11 +113,50 @@ def test_name_locations_with_multiple_occurrences(self): def test_handling_builtin_modules(self): self.importer.update_module("sys") - self.assertTrue("sys" in self.importer.get_modules("exit")) - - def test_submodules(self): - self.assertEqual(set([self.mod1]), autoimport.submodules(self.mod1)) - self.assertEqual(set([self.mod2, self.pkg]), autoimport.submodules(self.pkg)) + self.assertIn("sys", self.importer.get_modules("exit")) + + def test_search_submodule(self): + self.importer.update_module("build") + import_statement = ("from build import env", "env") + self.assertIn(import_statement, self.importer.search("env", exact_match=True)) + self.assertIn(import_statement, self.importer.search("en")) + self.assertIn(import_statement, self.importer.search("env")) + + def test_search_module(self): + self.importer.update_module("os") + import_statement = ("import os", "os") + self.assertIn(import_statement, self.importer.search("os", exact_match=True)) + self.assertIn(import_statement, self.importer.search("os")) + self.assertIn(import_statement, self.importer.search("o")) + + def test_search(self): + self.importer.update_module("typing") + import_statement = ("from typing import Dict", "Dict") + self.assertIn(import_statement, self.importer.search("Dict", exact_match=True)) + self.assertIn(import_statement, self.importer.search("Dict")) + self.assertIn(import_statement, self.importer.search("Dic")) + self.assertIn(import_statement, self.importer.search("Di")) + self.assertIn(import_statement, self.importer.search("D")) + + def test_typing_all(self): + import typing + + self.importer._del_if_exist("typing") + self.importer.generate_modules_cache(["typing"], single_thread=True) + for item in typing.__all__: + self.assertIn( + (f"from typing import {item}", item), + self.importer.search(item, exact_match=True), + ) + + def test_generate_full_cache(self): + """The single thread test takes much longer than the multithread test but is easier to debug""" + single_thread = False + self.importer.generate_modules_cache(single_thread=single_thread) + self.assertIn(("from typing import Dict", "Dict"), self.importer.search("Dict")) + self.assertTrue(len(self.importer._dump_all()) > 0) + for table in self.importer._dump_all(): + self.assertTrue(len(table) > 0) class AutoImportObservingTest(unittest.TestCase):