From ad8cbfcd22e9939567baafeff2cab0a502bbacc4 Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Fri, 30 Jul 2021 13:28:08 -0500 Subject: [PATCH 1/4] * system_client logic -> local_filesystem adapter * basic storage_adapter logic for dynamic loading --- core/dbt/adapters/TODO.md | 1 + .../internal_storage/local_filesystem.py | 557 ++++++++++++++++++ core/dbt/clients/git.py | 5 +- core/dbt/clients/storage_adapter.py | 26 + core/dbt/compilation.py | 6 +- core/dbt/config/profile.py | 4 +- core/dbt/config/project.py | 12 +- core/dbt/config/selectors.py | 14 +- 8 files changed, 602 insertions(+), 23 deletions(-) create mode 100644 core/dbt/adapters/TODO.md create mode 100644 core/dbt/adapters/internal_storage/local_filesystem.py create mode 100644 core/dbt/clients/storage_adapter.py diff --git a/core/dbt/adapters/TODO.md b/core/dbt/adapters/TODO.md new file mode 100644 index 00000000000..58b03f1a586 --- /dev/null +++ b/core/dbt/adapters/TODO.md @@ -0,0 +1 @@ +Move the warehouse adapters to a subfolder of adapters (adapters/warehouse). diff --git a/core/dbt/adapters/internal_storage/local_filesystem.py b/core/dbt/adapters/internal_storage/local_filesystem.py new file mode 100644 index 00000000000..98692d605cd --- /dev/null +++ b/core/dbt/adapters/internal_storage/local_filesystem.py @@ -0,0 +1,557 @@ +""" +## +# Storage adaptor to hold all state in the local filesystem +# +# TODO: +# Simplify public methods +## +""" +import errno +import fnmatch +import json +import os +import os.path +import re +import shutil +import subprocess +import sys +import tarfile +import requests +import stat +from typing import Type, NoReturn, List, Optional, Dict, Any, Tuple, Callable, Union + +import dbt.exceptions +import dbt.utils + +from dbt.logger import GLOBAL_LOGGER as logger + +if sys.platform == "win32": + from ctypes import WinDLL, c_bool +else: + WinDLL = None + c_bool = None + + +class Adapter: + + # Re-write as regex searches on key + def find_matching( + root_path: str, + relative_paths_to_search: List[str], + file_pattern: str, + ) -> List[Dict[str, str]]: + """ + Given an absolute `root_path`, a list of relative paths to that + absolute root path (`relative_paths_to_search`), and a `file_pattern` + like '*.sql', returns information about the files. For example: + + > find_matching('/root/path', ['models'], '*.sql') + + [ { 'absolute_path': '/root/path/models/model_one.sql', + 'relative_path': 'model_one.sql', + 'searched_path': 'models' }, + { 'absolute_path': '/root/path/models/subdirectory/model_two.sql', + 'relative_path': 'subdirectory/model_two.sql', + 'searched_path': 'models' } ] + """ + matching = [] + root_path = os.path.normpath(root_path) + regex = fnmatch.translate(file_pattern) + reobj = re.compile(regex, re.IGNORECASE) + + for relative_path_to_search in relative_paths_to_search: + absolute_path_to_search = os.path.join(root_path, relative_path_to_search) + walk_results = os.walk(absolute_path_to_search) + + for current_path, subdirectories, local_files in walk_results: + for local_file in local_files: + absolute_path = os.path.join(current_path, local_file) + relative_path = os.path.relpath( + absolute_path, absolute_path_to_search + ) + if reobj.match(local_file): + matching.append( + { + "searched_path": relative_path_to_search, + "absolute_path": absolute_path, + "relative_path": relative_path, + } + ) + + return matching + + # switch to read + def load_file_contents(path: str, strip: bool = True) -> str: + # sys.stdout.write("Using local file adapter\n") + path = Adapter._convert_path(path) + with open(path, "rb") as handle: + to_return = handle.read().decode("utf-8") + + if strip: + to_return = to_return.strip() + + return to_return + + # switch to write + def make_directory(path: str) -> None: + """ + Make a directory and any intermediate directories that don't already + exist. This function handles the case where two threads try to create + a directory at once. + """ + path = Adapter._convert_path(path) + if not os.path.exists(path): + # concurrent writes that try to create the same dir can fail + try: + os.makedirs(path) + + except OSError as e: + if e.errno == errno.EEXIST: + pass + else: + raise e + + # switch to write + def make_file(path: str, contents: str = "", overwrite: bool = False) -> bool: + """ + Make a file at `path` assuming that the directory it resides in already + exists. The file is saved with contents `contents` + """ + if overwrite or not os.path.exists(path): + path = Adapter._convert_path(path) + with open(path, "w") as fh: + fh.write(contents) + return True + + return False + + # move to codebase (symlinks only used in deps) + def make_symlink(source: str, link_path: str) -> None: + """ + Create a symlink at `link_path` referring to `source`. + """ + if not Adapter.supports_symlinks(): + dbt.exceptions.system_error("create a symbolic link") + + os.symlink(source, link_path) + + # move to codebase (symlinks only used in deps) + def supports_symlinks() -> bool: + return getattr(os, "symlink", None) is not None + + # switch to write + def write_file(path: str, contents: str = "") -> bool: + path = Adapter._convert_path(path) + try: + Adapter.make_directory(os.path.dirname(path)) + with open(path, "w", encoding="utf-8") as f: + f.write(str(contents)) + except Exception as exc: + # note that you can't just catch FileNotFound, because sometimes + # windows apparently raises something else. + # It's also not sufficient to look at the path length, because + # sometimes windows fails to write paths that are less than the length + # limit. So on windows, suppress all errors that happen from writing + # to disk. + if os.name == "nt": + # sometimes we get a winerror of 3 which means the path was + # definitely too long, but other times we don't and it means the + # path was just probably too long. This is probably based on the + # windows/python version. + if getattr(exc, "winerror", 0) == 3: + reason = "Path was too long" + else: + reason = "Path was possibly too long" + # all our hard work and the path was still too long. Log and + # continue. + logger.debug( + f"Could not write to path {path}({len(path)} characters): " + f"{reason}\nexception: {exc}" + ) + else: + raise + return True + + # switch to read + def read_json(path: str) -> Dict[str, Any]: + return json.loads(Adapter.load_file_contents(path)) + + # switch to write + def write_json(path: str, data: Dict[str, Any]) -> bool: + return Adapter.write_file(path, json.dumps(data, cls=dbt.utils.JSONEncoder)) + + # helper + @staticmethod + def _windows_rmdir_readonly( + func: Callable[[str], Any], path: str, exc: Tuple[Any, OSError, Any] + ): + exception_val = exc[1] + if exception_val.errno == errno.EACCES: + os.chmod(path, stat.S_IWUSR) + func(path) + else: + raise + + # move to codebase (sys client) + @staticmethod + def resolve_path_from_base(path_to_resolve: str, base_path: str) -> str: + """ + If path-to_resolve is a relative path, create an absolute path + with base_path as the base. + + If path_to_resolve is an absolute path or a user path (~), just + resolve it to an absolute path and return. + """ + return os.path.abspath( + os.path.join(base_path, os.path.expanduser(path_to_resolve)) + ) + + # switch to delete + def rmdir(path: str) -> None: + """ + Recursively deletes a directory. Includes an error handler to retry with + different permissions on Windows. Otherwise, removing directories (eg. + cloned via git) can cause rmtree to throw a PermissionError exception + """ + path = Adapter._convert_path(path) + if sys.platform == "win32": + onerror = Adapter._windows_rmdir_readonly + else: + onerror = None + + shutil.rmtree(path, onerror=onerror) + + # helper + def _win_prepare_path(path: str) -> str: + """Given a windows path, prepare it for use by making sure it is absolute + and normalized. + """ + path = os.path.normpath(path) + + # if a path starts with '\', splitdrive() on it will return '' for the + # drive, but the prefix requires a drive letter. So let's add the drive + # letter back in. + # Unless it starts with '\\'. In that case, the path is a UNC mount point + # and splitdrive will be fine. + if not path.startswith("\\\\") and path.startswith("\\"): + curdrive = os.path.splitdrive(os.getcwd())[0] + path = curdrive + path + + # now our path is either an absolute UNC path or relative to the current + # directory. If it's relative, we need to make it absolute or the prefix + # won't work. `ntpath.abspath` allegedly doesn't always play nice with long + # paths, so do this instead. + if not os.path.splitdrive(path)[0]: + path = os.path.join(os.getcwd(), path) + + return path + + # helper + def _supports_long_paths() -> bool: + if sys.platform != "win32": + return True + # Eryk Sun says to use `WinDLL('ntdll')` instead of `windll.ntdll` because + # of pointer caching in a comment here: + # https://stackoverflow.com/a/35097999/11262881 + # I don't know exaclty what he means, but I am inclined to believe him as + # he's pretty active on Python windows bugs! + try: + dll = WinDLL("ntdll") + except OSError: # I don't think this happens? you need ntdll to run python + return False + # not all windows versions have it at all + if not hasattr(dll, "RtlAreLongPathsEnabled"): + return False + # tell windows we want to get back a single unsigned byte (a bool). + dll.RtlAreLongPathsEnabled.restype = c_bool + return dll.RtlAreLongPathsEnabled() + + # helper + def _convert_path(path: str) -> str: + """Convert a path that dbt has, which might be >260 characters long, to one + that will be writable/readable on Windows. + + On other platforms, this is a no-op. + """ + # some parts of python seem to append '\*.*' to strings, better safe than + # sorry. + if len(path) < 250: + return path + if Adapter._supports_long_paths(): + return path + + prefix = "\\\\?\\" + # Nothing to do + if path.startswith(prefix): + return path + + path = Adapter._win_prepare_path(path) + + # add the prefix. The check is just in case os.getcwd() does something + # unexpected - I believe this if-state should always be True though! + if not path.startswith(prefix): + path = prefix + path + return path + + # switch to delete + def remove_file(path: str) -> None: + path = Adapter._convert_path(path) + os.remove(path) + + # switch to read/delete by returning None if path doesn't exist + def path_exists(path: str) -> bool: + path = Adapter._convert_path(path) + return os.path.lexists(path) + + # move to codebase (symlinks only used in deps) + def path_is_symlink(path: str) -> bool: + path = Adapter._convert_path(path) + return os.path.islink(path) + + # move to codebase (sys client) + def open_dir_cmd() -> str: + # https://docs.python.org/2/library/sys.html#sys.platform + if sys.platform == "win32": + return "start" + + elif sys.platform == "darwin": + return "open" + + else: + return "xdg-open" + + # helper` + def _handle_posix_cwd_error(exc: OSError, cwd: str, cmd: List[str]) -> NoReturn: + if exc.errno == errno.ENOENT: + message = "Directory does not exist" + elif exc.errno == errno.EACCES: + message = "Current user cannot access directory, check permissions" + elif exc.errno == errno.ENOTDIR: + message = "Not a directory" + else: + message = "Unknown OSError: {} - cwd".format(str(exc)) + raise dbt.exceptions.WorkingDirectoryError(cwd, cmd, message) + + # helper + def _handle_posix_cmd_error(exc: OSError, cwd: str, cmd: List[str]) -> NoReturn: + if exc.errno == errno.ENOENT: + message = "Could not find command, ensure it is in the user's PATH" + elif exc.errno == errno.EACCES: + message = "User does not have permissions for this command" + else: + message = "Unknown OSError: {} - cmd".format(str(exc)) + raise dbt.exceptions.ExecutableError(cwd, cmd, message) + + # helper + def _handle_posix_error(exc: OSError, cwd: str, cmd: List[str]) -> NoReturn: + """OSError handling for posix systems. + + Some things that could happen to trigger an OSError: + - cwd could not exist + - exc.errno == ENOENT + - exc.filename == cwd + - cwd could have permissions that prevent the current user moving to it + - exc.errno == EACCES + - exc.filename == cwd + - cwd could exist but not be a directory + - exc.errno == ENOTDIR + - exc.filename == cwd + - cmd[0] could not exist + - exc.errno == ENOENT + - exc.filename == None(?) + - cmd[0] could exist but have permissions that prevents the current + user from executing it (executable bit not set for the user) + - exc.errno == EACCES + - exc.filename == None(?) + """ + if getattr(exc, "filename", None) == cwd: + Adapter._handle_posix_cwd_error(exc, cwd, cmd) + else: + Adapter._handle_posix_cmd_error(exc, cwd, cmd) + + # helper + def _handle_windows_error(exc: OSError, cwd: str, cmd: List[str]) -> NoReturn: + cls: Type[dbt.exceptions.Exception] = dbt.exceptions.CommandError + if exc.errno == errno.ENOENT: + message = ( + "Could not find command, ensure it is in the user's PATH " + "and that the user has permissions to run it" + ) + cls = dbt.exceptions.ExecutableError + elif exc.errno == errno.ENOEXEC: + message = "Command was not executable, ensure it is valid" + cls = dbt.exceptions.ExecutableError + elif exc.errno == errno.ENOTDIR: + message = ( + "Unable to cd: path does not exist, user does not have" + " permissions, or not a directory" + ) + cls = dbt.exceptions.WorkingDirectoryError + else: + message = 'Unknown error: {} (errno={}: "{}")'.format( + str(exc), exc.errno, errno.errorcode.get(exc.errno, "") + ) + raise cls(cwd, cmd, message) + + # helper + def _interpret_oserror(exc: OSError, cwd: str, cmd: List[str]) -> NoReturn: + """Interpret an OSError exc and raise the appropriate dbt exception.""" + if len(cmd) == 0: + raise dbt.exceptions.CommandError(cwd, cmd) + + # all of these functions raise unconditionally + if os.name == "nt": + Adapter._handle_windows_error(exc, cwd, cmd) + else: + Adapter._handle_posix_error(exc, cwd, cmd) + + # this should not be reachable, raise _something_ at least! + raise dbt.exceptions.InternalException( + "Unhandled exception in _interpret_oserror: {}".format(exc) + ) + + # move to codebase (sys client) + def run_cmd( + cwd: str, cmd: List[str], env: Optional[Dict[str, Any]] = None + ) -> Tuple[bytes, bytes]: + logger.debug('Executing "{}"'.format(" ".join(cmd))) + if len(cmd) == 0: + raise dbt.exceptions.CommandError(cwd, cmd) + + # the env argument replaces the environment entirely, which has exciting + # consequences on Windows! Do an update instead. + full_env = env + if env is not None: + full_env = os.environ.copy() + full_env.update(env) + + try: + exe_pth = shutil.which(cmd[0]) + if exe_pth: + cmd = [os.path.abspath(exe_pth)] + list(cmd[1:]) + proc = subprocess.Popen( + cmd, + cwd=cwd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=full_env, + ) + + out, err = proc.communicate() + except OSError as exc: + Adapter._interpret_oserror(exc, cwd, cmd) + + logger.debug('STDOUT: "{!s}"'.format(out)) + logger.debug('STDERR: "{!s}"'.format(err)) + + if proc.returncode != 0: + logger.debug("command return code={}".format(proc.returncode)) + raise dbt.exceptions.CommandResultError(cwd, cmd, proc.returncode, out, err) + + return out, err + + # move to codebase (sys client) + def download( + url: str, path: str, timeout: Optional[Union[float, tuple]] = None + ) -> None: + path = Adapter._convert_path(path) + connection_timeout = timeout or float(os.getenv("DBT_HTTP_TIMEOUT", 10)) + response = requests.get(url, timeout=connection_timeout) + with open(path, "wb") as handle: + for block in response.iter_content(1024 * 64): + handle.write(block) + + # Remove entirely (dead code I think) + def rename(from_path: str, to_path: str, force: bool = False) -> None: + from_path = Adapter._convert_path(from_path) + to_path = Adapter._convert_path(to_path) + is_symlink = Adapter.path_is_symlink(to_path) + + if os.path.exists(to_path) and force: + if is_symlink: + Adapter.remove_file(to_path) + else: + Adapter.rmdir(to_path) + + shutil.move(from_path, to_path) + + # move to codebase (sys client) + def untar_package( + tar_path: str, dest_dir: str, rename_to: Optional[str] = None + ) -> None: + tar_path = Adapter._convert_path(tar_path) + tar_dir_name = None + with tarfile.open(tar_path, "r") as tarball: + tarball.extractall(dest_dir) + tar_dir_name = os.path.commonprefix(tarball.getnames()) + if rename_to: + downloaded_path = os.path.join(dest_dir, tar_dir_name) + desired_path = os.path.join(dest_dir, rename_to) + dbt.clients.system.rename(downloaded_path, desired_path, force=True) + + # helper + def _chmod_and_retry(func, path, exc_info): + """Define an error handler to pass to shutil.rmtree. + On Windows, when a file is marked read-only as git likes to do, rmtree will + fail. To handle that, on errors try to make the file writable. + We want to retry most operations here, but listdir is one that we know will + be useless. + """ + if func is os.listdir or os.name != "nt": + raise + os.chmod(path, stat.S_IREAD | stat.S_IWRITE) + # on error,this will raise. + func(path) + + # helper + def _absnorm(path): + return os.path.normcase(os.path.abspath(path)) + + # move to codebase (move only used in install) + def move(src, dst): + """A re-implementation of shutil.move that properly removes the source + directory on windows when it has read-only files in it and the move is + between two drives. + + This is almost identical to the real shutil.move, except it uses our rmtree + and skips handling non-windows OSes since the existing one works ok there. + """ + src = Adapter._convert_path(src) + dst = Adapter._convert_path(dst) + if os.name != "nt": + return shutil.move(src, dst) + + if os.path.isdir(dst): + if Adapter._absnorm(src) == Adapter._absnorm(dst): + os.rename(src, dst) + return + + dst = os.path.join(dst, os.path.basename(src.rstrip("/\\"))) + if os.path.exists(dst): + raise EnvironmentError("Path '{}' already exists".format(dst)) + + try: + os.rename(src, dst) + except OSError: + # probably different drives + if os.path.isdir(src): + if Adapter._absnorm(dst + "\\").startswith( + Adapter._absnorm(src + "\\") + ): + # dst is inside src + raise EnvironmentError( + "Cannot move a directory '{}' into itself '{}'".format(src, dst) + ) + shutil.copytree(src, dst, symlinks=True) + Adapter.rmtree(src) + else: + shutil.copy2(src, dst) + os.unlink(src) + + # move to codebase (sys client) + def rmtree(path): + """Recursively remove path. On permissions errors on windows, try to remove + the read-only flag and try again. + """ + path = Adapter._convert_path(path) + return shutil.rmtree(path, onerror=Adapter._chmod_and_retry) diff --git a/core/dbt/clients/git.py b/core/dbt/clients/git.py index ba3bf2e1330..48247892f08 100644 --- a/core/dbt/clients/git.py +++ b/core/dbt/clients/git.py @@ -1,7 +1,8 @@ import re import os.path -from dbt.clients.system import run_cmd, rmdir +from dbt.clients.system import run_cmd +from dbt.clients.storage_adapter import StorageAdapter as SA from dbt.logger import GLOBAL_LOGGER as logger import dbt.exceptions from packaging import version @@ -42,7 +43,7 @@ def clone(repo, cwd, dirname=None, remove_git_dir=False, revision=None, subdirec run_cmd(os.path.join(cwd, dirname or ''), ['git', 'sparse-checkout', 'set', subdirectory]) if remove_git_dir: - rmdir(os.path.join(dirname, '.git')) + SA.adapter.rmdir(os.path.join(dirname, '.git')) return result diff --git a/core/dbt/clients/storage_adapter.py b/core/dbt/clients/storage_adapter.py new file mode 100644 index 00000000000..e828d283b77 --- /dev/null +++ b/core/dbt/clients/storage_adapter.py @@ -0,0 +1,26 @@ +from importlib import import_module +from dbt.adapters.internal_storage.local_filesystem import Adapter as configured_adapter + +DEFAULT_STORAGE_ADAPTER = "dbt.adapters.internal_storage.local_filesystem" +DEFAULT_STORAGE_ADAPTER_NAME = "local_filesystem_storage_adapter" + +import_module(DEFAULT_STORAGE_ADAPTER) + + +class StorageAdapter(): + def __init__(self) -> None: + # TODO: + # get the config + # ensure configured adapter exsists + # import configured adapter + # ensure configured adapter has the correct class signature + # ensure configured adapter can access it's datastore + pass + + # Dynamically generated class attribute that is the + # configured storage adapter + adapter = type( + DEFAULT_STORAGE_ADAPTER_NAME, + (configured_adapter,), + dict() + ) diff --git a/core/dbt/compilation.py b/core/dbt/compilation.py index 5e0fa259f2a..8cba4e3fbc6 100644 --- a/core/dbt/compilation.py +++ b/core/dbt/compilation.py @@ -8,7 +8,7 @@ from dbt import flags from dbt.adapters.factory import get_adapter from dbt.clients import jinja -from dbt.clients.system import make_directory +from dbt.clients.storage_adapter import StorageAdapter as SA from dbt.context.providers import generate_runtime_model from dbt.contracts.graph.manifest import Manifest from dbt.contracts.graph.compiled import ( @@ -153,8 +153,8 @@ def __init__(self, config): self.config = config def initialize(self): - make_directory(self.config.target_path) - make_directory(self.config.modules_path) + SA.adapter.make_directory(self.config.target_path) + SA.adapter.make_directory(self.config.modules_path) # creates a ModelContext which is converted to # a dict for jinja rendering of SQL diff --git a/core/dbt/config/profile.py b/core/dbt/config/profile.py index 8ba78ad7aad..f4207270380 100644 --- a/core/dbt/config/profile.py +++ b/core/dbt/config/profile.py @@ -4,7 +4,7 @@ from dbt.dataclass_schema import ValidationError -from dbt.clients.system import load_file_contents +from dbt.clients.storage_adapter import StorageAdapter as SA from dbt.clients.yaml_helper import load_yaml_text from dbt.contracts.connection import Credentials, HasCredentials from dbt.contracts.project import ProfileConfig, UserConfig @@ -52,7 +52,7 @@ def read_profile(profiles_dir: str) -> Dict[str, Any]: contents = None if os.path.isfile(path): try: - contents = load_file_contents(path, strip=False) + contents = SA.adapter.load_file_contents(path, strip=False) # type: ignore yaml_content = load_yaml_text(contents) if not yaml_content: msg = f'The profiles.yml file at {path} is empty' diff --git a/core/dbt/config/project.py b/core/dbt/config/project.py index bbd3e7a34c3..58b13d1d814 100644 --- a/core/dbt/config/project.py +++ b/core/dbt/config/project.py @@ -9,9 +9,7 @@ import hashlib import os -from dbt.clients.system import resolve_path_from_base -from dbt.clients.system import path_exists -from dbt.clients.system import load_file_contents +from dbt.clients.storage_adapter import StorageAdapter as SA from dbt.clients.yaml_helper import load_yaml_text from dbt.contracts.connection import QueryComment from dbt.exceptions import DbtProjectError @@ -77,16 +75,16 @@ class IsFQNResource(Protocol): def _load_yaml(path): - contents = load_file_contents(path) + contents = SA.adapter.load_file_contents(path) return load_yaml_text(contents) def package_data_from_root(project_root): - package_filepath = resolve_path_from_base( + package_filepath = SA.adapter.resolve_path_from_base( 'packages.yml', project_root ) - if path_exists(package_filepath): + if SA.adapter.path_exists(package_filepath): packages_dict = _load_yaml(package_filepath) else: packages_dict = None @@ -149,7 +147,7 @@ def _raw_project_from(project_root: str) -> Dict[str, Any]: project_yaml_filepath = os.path.join(project_root, 'dbt_project.yml') # get the project.yml contents - if not path_exists(project_yaml_filepath): + if not SA.adapter.path_exists(project_yaml_filepath): # type: ignore # noqa raise DbtProjectError( 'no dbt_project.yml found at expected path {}' .format(project_yaml_filepath) diff --git a/core/dbt/config/selectors.py b/core/dbt/config/selectors.py index 272a62edba4..8329d7a4510 100644 --- a/core/dbt/config/selectors.py +++ b/core/dbt/config/selectors.py @@ -7,11 +7,7 @@ from .renderer import SelectorRenderer -from dbt.clients.system import ( - load_file_contents, - path_exists, - resolve_path_from_base, -) +from dbt.clients.storage_adapter import StorageAdapter as SA from dbt.contracts.selection import SelectorFile from dbt.exceptions import DbtSelectorsError, RuntimeException from dbt.graph import parse_from_selectors_definition, SelectionSpec @@ -75,7 +71,7 @@ def from_path( cls, path: Path, renderer: SelectorRenderer, ) -> 'SelectorConfig': try: - data = load_yaml_text(load_file_contents(str(path))) + data = load_yaml_text(SA.adapter.load_file_contents(str(path))) # type: ignore except (ValidationError, RuntimeException) as exc: raise DbtSelectorsError( f'Could not read selector file: {exc}', @@ -91,12 +87,12 @@ def from_path( def selector_data_from_root(project_root: str) -> Dict[str, Any]: - selector_filepath = resolve_path_from_base( + selector_filepath = SA.adapter.resolve_path_from_base( # type: ignore 'selectors.yml', project_root ) - if path_exists(selector_filepath): - selectors_dict = load_yaml_text(load_file_contents(selector_filepath)) + if SA.adapter.path_exists(selector_filepath): # type: ignore + selectors_dict = load_yaml_text(SA.adapter.load_file_contents(selector_filepath)) # type: ignore # noqa else: selectors_dict = None return selectors_dict From a1af19af68572f3f85dff0e0d94bc4b87c8467e3 Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Wed, 4 Aug 2021 08:29:12 -0500 Subject: [PATCH 2/4] Local FS storage adapter, pass 2- better logic, lots of edge cases fixed, type hinting --- .../internal_storage/local_filesystem.py | 707 +++++------------- core/dbt/clients/git.py | 4 +- core/dbt/clients/storage.py | 27 + core/dbt/clients/storage_adapter.py | 26 - core/dbt/clients/storage_adapter_type.py | 37 + core/dbt/compilation.py | 6 +- core/dbt/config/profile.py | 4 +- core/dbt/config/project.py | 11 +- core/dbt/config/selectors.py | 11 +- core/dbt/deps/base.py | 6 +- core/dbt/deps/registry.py | 3 +- core/dbt/parser/manifest.py | 6 +- core/dbt/parser/read_files.py | 6 +- core/dbt/task/debug.py | 3 +- core/dbt/task/deps.py | 4 +- core/dbt/task/init.py | 3 +- 16 files changed, 283 insertions(+), 581 deletions(-) create mode 100644 core/dbt/clients/storage.py delete mode 100644 core/dbt/clients/storage_adapter.py create mode 100644 core/dbt/clients/storage_adapter_type.py diff --git a/core/dbt/adapters/internal_storage/local_filesystem.py b/core/dbt/adapters/internal_storage/local_filesystem.py index 98692d605cd..519f457d4e9 100644 --- a/core/dbt/adapters/internal_storage/local_filesystem.py +++ b/core/dbt/adapters/internal_storage/local_filesystem.py @@ -1,164 +1,136 @@ -""" -## -# Storage adaptor to hold all state in the local filesystem -# -# TODO: -# Simplify public methods -## -""" -import errno -import fnmatch -import json -import os -import os.path -import re -import shutil -import subprocess -import sys -import tarfile -import requests -import stat -from typing import Type, NoReturn, List, Optional, Dict, Any, Tuple, Callable, Union - -import dbt.exceptions -import dbt.utils +from pathlib import Path +from shutil import rmtree +from stat import S_IRUSR, S_IWUSR +from sys import platform +from typing import Any, Union +from typing_extensions import Literal from dbt.logger import GLOBAL_LOGGER as logger -if sys.platform == "win32": +# windows platforms as returned by sys.platform +# via https://stackoverflow.com/a/13874620 +WINDOWS_PLATFORMS = ("win32", "cygwin", "msys") + +# load ctypes for windows platforms +if platform in WINDOWS_PLATFORMS: from ctypes import WinDLL, c_bool else: WinDLL = None c_bool = None -class Adapter: - - # Re-write as regex searches on key - def find_matching( - root_path: str, - relative_paths_to_search: List[str], - file_pattern: str, - ) -> List[Dict[str, str]]: - """ - Given an absolute `root_path`, a list of relative paths to that - absolute root path (`relative_paths_to_search`), and a `file_pattern` - like '*.sql', returns information about the files. For example: - - > find_matching('/root/path', ['models'], '*.sql') - - [ { 'absolute_path': '/root/path/models/model_one.sql', - 'relative_path': 'model_one.sql', - 'searched_path': 'models' }, - { 'absolute_path': '/root/path/models/subdirectory/model_two.sql', - 'relative_path': 'subdirectory/model_two.sql', - 'searched_path': 'models' } ] - """ - matching = [] - root_path = os.path.normpath(root_path) - regex = fnmatch.translate(file_pattern) - reobj = re.compile(regex, re.IGNORECASE) - - for relative_path_to_search in relative_paths_to_search: - absolute_path_to_search = os.path.join(root_path, relative_path_to_search) - walk_results = os.walk(absolute_path_to_search) - - for current_path, subdirectories, local_files in walk_results: - for local_file in local_files: - absolute_path = os.path.join(current_path, local_file) - relative_path = os.path.relpath( - absolute_path, absolute_path_to_search - ) - if reobj.match(local_file): - matching.append( - { - "searched_path": relative_path_to_search, - "absolute_path": absolute_path, - "relative_path": relative_path, - } - ) - - return matching - - # switch to read - def load_file_contents(path: str, strip: bool = True) -> str: - # sys.stdout.write("Using local file adapter\n") - path = Adapter._convert_path(path) - with open(path, "rb") as handle: - to_return = handle.read().decode("utf-8") - - if strip: - to_return = to_return.strip() - - return to_return - - # switch to write - def make_directory(path: str) -> None: - """ - Make a directory and any intermediate directories that don't already - exist. This function handles the case where two threads try to create - a directory at once. - """ - path = Adapter._convert_path(path) - if not os.path.exists(path): - # concurrent writes that try to create the same dir can fail - try: - os.makedirs(path) - - except OSError as e: - if e.errno == errno.EEXIST: - pass - else: - raise e - - # switch to write - def make_file(path: str, contents: str = "", overwrite: bool = False) -> bool: - """ - Make a file at `path` assuming that the directory it resides in already - exists. The file is saved with contents `contents` - """ - if overwrite or not os.path.exists(path): - path = Adapter._convert_path(path) - with open(path, "w") as fh: - fh.write(contents) - return True +def ready_check() -> bool: + """Ensures the adapter is ready for use. + + Returns: + `True` if the resource is ready to be used. + `False` if the resource is not ready to be used. + + Raises: + TBD - TODO: How best to report back errors here? + It should never fail for a filesystem (unless we're using something very exotic), + but for databases this should be our primary source of troubleshooting information. + + """ + return True + + +def read( + path: str, + strip: bool = True, +) -> str: + """Reads the content of a file on the filesystem. + + Args: + path: Full path of file to be read. + strip: Wether or not to strip whitespace. + + Returns: + Content of the file + + """ + # create a concrete path object + path: Path = Path(path) + + # read the file in as a string, or none if not found + file_content = path.read_text(encoding="utf-8") + + # conditionally strip whitespace + file_content = file_content.strip() if strip else file_content + + return file_content + + +def write( + path: str, + content: Union[str, bytes, None], + overwrite: bool = False, +) -> bool: + """Writes the given content out to a resource on the filesystem. + Since there are many different ways to approach filesystem operations, It's best to enumerate + the rules(tm): + + If the path to the file/dir doesn't exist, it will be created. + If content is `None` a directory will be created instead of a file. + If contet is not a `str` or `bytes` the string representation of the object will be written. + If the content is `str` it will be encoded as utf-8 + + Overwrites are only supported for files and are toggled by the overwrite parameter. + + All logical cases outside those outlined above will result in failure + + Args: + path: Full path of resource to be written. + content: Data to be written. + overwrite: Wether or not to overwrite if a file already exists at this path. + parser: A parser to apply to file data. + + Returns: + `True` for success, `False` otherwise. + + """ + # TODO: double check I hit all possible permutations here! (IK) + + # create a concrete path object + path: Path = Path(path) + + # handle overwrite errors for files and directories + if path.exists() and (overwrite is False or path.is_dir() is True): + logger.debug(f"{path} already exists") return False - # move to codebase (symlinks only used in deps) - def make_symlink(source: str, link_path: str) -> None: - """ - Create a symlink at `link_path` referring to `source`. - """ - if not Adapter.supports_symlinks(): - dbt.exceptions.system_error("create a symbolic link") + # handle trying to write file content to a path that is a directory + if path.is_dir() and content is not None: + logger.debug(f"{path} is a directory, but file content was specified") + return False - os.symlink(source, link_path) + # create a directory if the content is `None` + if content is None: + path.mkdir(parents=True, exist_ok=True) - # move to codebase (symlinks only used in deps) - def supports_symlinks() -> bool: - return getattr(os, "symlink", None) is not None + # create an empty file if the content is an empty string + elif content == "" and path.exists() is False: + path.touch() - # switch to write - def write_file(path: str, contents: str = "") -> bool: - path = Adapter._convert_path(path) + # write out to a file + else: try: - Adapter.make_directory(os.path.dirname(path)) - with open(path, "w", encoding="utf-8") as f: - f.write(str(contents)) - except Exception as exc: - # note that you can't just catch FileNotFound, because sometimes - # windows apparently raises something else. - # It's also not sufficient to look at the path length, because - # sometimes windows fails to write paths that are less than the length - # limit. So on windows, suppress all errors that happen from writing - # to disk. - if os.name == "nt": + path.parent.mkdir(parents=True, exist_ok=True) + if type(content) == bytes: + path.write_bytes(content) + else: + path.write_text(str(content), encoding="utf-8") + except Exception as e: + # TODO: The block below was c/p'd directly from the old system client. + # We should examine if this actually makes sense to log file write failures and + # try to keep going. + if platform in WINDOWS_PLATFORMS: # sometimes we get a winerror of 3 which means the path was # definitely too long, but other times we don't and it means the # path was just probably too long. This is probably based on the # windows/python version. - if getattr(exc, "winerror", 0) == 3: + if getattr(e, "winerror", 0) == 3: reason = "Path was too long" else: reason = "Path was possibly too long" @@ -166,392 +138,81 @@ def write_file(path: str, contents: str = "") -> bool: # continue. logger.debug( f"Could not write to path {path}({len(path)} characters): " - f"{reason}\nexception: {exc}" + f"{reason}\nexception: {e}" ) else: raise - return True - - # switch to read - def read_json(path: str) -> Dict[str, Any]: - return json.loads(Adapter.load_file_contents(path)) - - # switch to write - def write_json(path: str, data: Dict[str, Any]) -> bool: - return Adapter.write_file(path, json.dumps(data, cls=dbt.utils.JSONEncoder)) - - # helper - @staticmethod - def _windows_rmdir_readonly( - func: Callable[[str], Any], path: str, exc: Tuple[Any, OSError, Any] - ): - exception_val = exc[1] - if exception_val.errno == errno.EACCES: - os.chmod(path, stat.S_IWUSR) - func(path) - else: - raise - - # move to codebase (sys client) - @staticmethod - def resolve_path_from_base(path_to_resolve: str, base_path: str) -> str: - """ - If path-to_resolve is a relative path, create an absolute path - with base_path as the base. - - If path_to_resolve is an absolute path or a user path (~), just - resolve it to an absolute path and return. - """ - return os.path.abspath( - os.path.join(base_path, os.path.expanduser(path_to_resolve)) - ) - - # switch to delete - def rmdir(path: str) -> None: - """ - Recursively deletes a directory. Includes an error handler to retry with - different permissions on Windows. Otherwise, removing directories (eg. - cloned via git) can cause rmtree to throw a PermissionError exception - """ - path = Adapter._convert_path(path) - if sys.platform == "win32": - onerror = Adapter._windows_rmdir_readonly - else: - onerror = None - - shutil.rmtree(path, onerror=onerror) - - # helper - def _win_prepare_path(path: str) -> str: - """Given a windows path, prepare it for use by making sure it is absolute - and normalized. - """ - path = os.path.normpath(path) - - # if a path starts with '\', splitdrive() on it will return '' for the - # drive, but the prefix requires a drive letter. So let's add the drive - # letter back in. - # Unless it starts with '\\'. In that case, the path is a UNC mount point - # and splitdrive will be fine. - if not path.startswith("\\\\") and path.startswith("\\"): - curdrive = os.path.splitdrive(os.getcwd())[0] - path = curdrive + path - - # now our path is either an absolute UNC path or relative to the current - # directory. If it's relative, we need to make it absolute or the prefix - # won't work. `ntpath.abspath` allegedly doesn't always play nice with long - # paths, so do this instead. - if not os.path.splitdrive(path)[0]: - path = os.path.join(os.getcwd(), path) - - return path - - # helper - def _supports_long_paths() -> bool: - if sys.platform != "win32": - return True - # Eryk Sun says to use `WinDLL('ntdll')` instead of `windll.ntdll` because - # of pointer caching in a comment here: - # https://stackoverflow.com/a/35097999/11262881 - # I don't know exaclty what he means, but I am inclined to believe him as - # he's pretty active on Python windows bugs! - try: - dll = WinDLL("ntdll") - except OSError: # I don't think this happens? you need ntdll to run python - return False - # not all windows versions have it at all - if not hasattr(dll, "RtlAreLongPathsEnabled"): - return False - # tell windows we want to get back a single unsigned byte (a bool). - dll.RtlAreLongPathsEnabled.restype = c_bool - return dll.RtlAreLongPathsEnabled() - - # helper - def _convert_path(path: str) -> str: - """Convert a path that dbt has, which might be >260 characters long, to one - that will be writable/readable on Windows. - - On other platforms, this is a no-op. - """ - # some parts of python seem to append '\*.*' to strings, better safe than - # sorry. - if len(path) < 250: - return path - if Adapter._supports_long_paths(): - return path - - prefix = "\\\\?\\" - # Nothing to do - if path.startswith(prefix): - return path - - path = Adapter._win_prepare_path(path) - - # add the prefix. The check is just in case os.getcwd() does something - # unexpected - I believe this if-state should always be True though! - if not path.startswith(prefix): - path = prefix + path - return path - - # switch to delete - def remove_file(path: str) -> None: - path = Adapter._convert_path(path) - os.remove(path) - - # switch to read/delete by returning None if path doesn't exist - def path_exists(path: str) -> bool: - path = Adapter._convert_path(path) - return os.path.lexists(path) - - # move to codebase (symlinks only used in deps) - def path_is_symlink(path: str) -> bool: - path = Adapter._convert_path(path) - return os.path.islink(path) - - # move to codebase (sys client) - def open_dir_cmd() -> str: - # https://docs.python.org/2/library/sys.html#sys.platform - if sys.platform == "win32": - return "start" - - elif sys.platform == "darwin": - return "open" + return True + + +def delete(path: str) -> bool: + """Deletes the resource at the given path + + Args: + path: Full path of resource to be deleted + + """ + # create concrete path object + path: Path = Path(path) + + # ensure resource to be deleted exists + if not path.exists(): + return False + + # remove files + if path.is_file(): + path.unlink() + + # remove directories recuersivly, surprisingly obnoxious to do in a cross-platform safe manner + if path.is_dir(): + if platform in WINDOWS_PLATFORMS: + # error handling for permissions on windows platforms + def on_error(func, path, _): + path.chmod(path, S_IWUSR | S_IRUSR) + func(path) + + rmtree(path, onerror=on_error) else: - return "xdg-open" - - # helper` - def _handle_posix_cwd_error(exc: OSError, cwd: str, cmd: List[str]) -> NoReturn: - if exc.errno == errno.ENOENT: - message = "Directory does not exist" - elif exc.errno == errno.EACCES: - message = "Current user cannot access directory, check permissions" - elif exc.errno == errno.ENOTDIR: - message = "Not a directory" - else: - message = "Unknown OSError: {} - cwd".format(str(exc)) - raise dbt.exceptions.WorkingDirectoryError(cwd, cmd, message) - - # helper - def _handle_posix_cmd_error(exc: OSError, cwd: str, cmd: List[str]) -> NoReturn: - if exc.errno == errno.ENOENT: - message = "Could not find command, ensure it is in the user's PATH" - elif exc.errno == errno.EACCES: - message = "User does not have permissions for this command" - else: - message = "Unknown OSError: {} - cmd".format(str(exc)) - raise dbt.exceptions.ExecutableError(cwd, cmd, message) - - # helper - def _handle_posix_error(exc: OSError, cwd: str, cmd: List[str]) -> NoReturn: - """OSError handling for posix systems. - - Some things that could happen to trigger an OSError: - - cwd could not exist - - exc.errno == ENOENT - - exc.filename == cwd - - cwd could have permissions that prevent the current user moving to it - - exc.errno == EACCES - - exc.filename == cwd - - cwd could exist but not be a directory - - exc.errno == ENOTDIR - - exc.filename == cwd - - cmd[0] could not exist - - exc.errno == ENOENT - - exc.filename == None(?) - - cmd[0] could exist but have permissions that prevents the current - user from executing it (executable bit not set for the user) - - exc.errno == EACCES - - exc.filename == None(?) - """ - if getattr(exc, "filename", None) == cwd: - Adapter._handle_posix_cwd_error(exc, cwd, cmd) - else: - Adapter._handle_posix_cmd_error(exc, cwd, cmd) - - # helper - def _handle_windows_error(exc: OSError, cwd: str, cmd: List[str]) -> NoReturn: - cls: Type[dbt.exceptions.Exception] = dbt.exceptions.CommandError - if exc.errno == errno.ENOENT: - message = ( - "Could not find command, ensure it is in the user's PATH " - "and that the user has permissions to run it" - ) - cls = dbt.exceptions.ExecutableError - elif exc.errno == errno.ENOEXEC: - message = "Command was not executable, ensure it is valid" - cls = dbt.exceptions.ExecutableError - elif exc.errno == errno.ENOTDIR: - message = ( - "Unable to cd: path does not exist, user does not have" - " permissions, or not a directory" - ) - cls = dbt.exceptions.WorkingDirectoryError - else: - message = 'Unknown error: {} (errno={}: "{}")'.format( - str(exc), exc.errno, errno.errorcode.get(exc.errno, "") - ) - raise cls(cwd, cmd, message) - - # helper - def _interpret_oserror(exc: OSError, cwd: str, cmd: List[str]) -> NoReturn: - """Interpret an OSError exc and raise the appropriate dbt exception.""" - if len(cmd) == 0: - raise dbt.exceptions.CommandError(cwd, cmd) - - # all of these functions raise unconditionally - if os.name == "nt": - Adapter._handle_windows_error(exc, cwd, cmd) - else: - Adapter._handle_posix_error(exc, cwd, cmd) - - # this should not be reachable, raise _something_ at least! - raise dbt.exceptions.InternalException( - "Unhandled exception in _interpret_oserror: {}".format(exc) - ) - - # move to codebase (sys client) - def run_cmd( - cwd: str, cmd: List[str], env: Optional[Dict[str, Any]] = None - ) -> Tuple[bytes, bytes]: - logger.debug('Executing "{}"'.format(" ".join(cmd))) - if len(cmd) == 0: - raise dbt.exceptions.CommandError(cwd, cmd) - - # the env argument replaces the environment entirely, which has exciting - # consequences on Windows! Do an update instead. - full_env = env - if env is not None: - full_env = os.environ.copy() - full_env.update(env) + rmtree(path) - try: - exe_pth = shutil.which(cmd[0]) - if exe_pth: - cmd = [os.path.abspath(exe_pth)] + list(cmd[1:]) - proc = subprocess.Popen( - cmd, - cwd=cwd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - env=full_env, - ) - - out, err = proc.communicate() - except OSError as exc: - Adapter._interpret_oserror(exc, cwd, cmd) - - logger.debug('STDOUT: "{!s}"'.format(out)) - logger.debug('STDERR: "{!s}"'.format(err)) - - if proc.returncode != 0: - logger.debug("command return code={}".format(proc.returncode)) - raise dbt.exceptions.CommandResultError(cwd, cmd, proc.returncode, out, err) - - return out, err - - # move to codebase (sys client) - def download( - url: str, path: str, timeout: Optional[Union[float, tuple]] = None - ) -> None: - path = Adapter._convert_path(path) - connection_timeout = timeout or float(os.getenv("DBT_HTTP_TIMEOUT", 10)) - response = requests.get(url, timeout=connection_timeout) - with open(path, "wb") as handle: - for block in response.iter_content(1024 * 64): - handle.write(block) - - # Remove entirely (dead code I think) - def rename(from_path: str, to_path: str, force: bool = False) -> None: - from_path = Adapter._convert_path(from_path) - to_path = Adapter._convert_path(to_path) - is_symlink = Adapter.path_is_symlink(to_path) - - if os.path.exists(to_path) and force: - if is_symlink: - Adapter.remove_file(to_path) - else: - Adapter.rmdir(to_path) - - shutil.move(from_path, to_path) - - # move to codebase (sys client) - def untar_package( - tar_path: str, dest_dir: str, rename_to: Optional[str] = None - ) -> None: - tar_path = Adapter._convert_path(tar_path) - tar_dir_name = None - with tarfile.open(tar_path, "r") as tarball: - tarball.extractall(dest_dir) - tar_dir_name = os.path.commonprefix(tarball.getnames()) - if rename_to: - downloaded_path = os.path.join(dest_dir, tar_dir_name) - desired_path = os.path.join(dest_dir, rename_to) - dbt.clients.system.rename(downloaded_path, desired_path, force=True) - - # helper - def _chmod_and_retry(func, path, exc_info): - """Define an error handler to pass to shutil.rmtree. - On Windows, when a file is marked read-only as git likes to do, rmtree will - fail. To handle that, on errors try to make the file writable. - We want to retry most operations here, but listdir is one that we know will - be useless. - """ - if func is os.listdir or os.name != "nt": - raise - os.chmod(path, stat.S_IREAD | stat.S_IWRITE) - # on error,this will raise. - func(path) - - # helper - def _absnorm(path): - return os.path.normcase(os.path.abspath(path)) - - # move to codebase (move only used in install) - def move(src, dst): - """A re-implementation of shutil.move that properly removes the source - directory on windows when it has read-only files in it and the move is - between two drives. - - This is almost identical to the real shutil.move, except it uses our rmtree - and skips handling non-windows OSes since the existing one works ok there. - """ - src = Adapter._convert_path(src) - dst = Adapter._convert_path(dst) - if os.name != "nt": - return shutil.move(src, dst) - - if os.path.isdir(dst): - if Adapter._absnorm(src) == Adapter._absnorm(dst): - os.rename(src, dst) - return - - dst = os.path.join(dst, os.path.basename(src.rstrip("/\\"))) - if os.path.exists(dst): - raise EnvironmentError("Path '{}' already exists".format(dst)) + return True - try: - os.rename(src, dst) - except OSError: - # probably different drives - if os.path.isdir(src): - if Adapter._absnorm(dst + "\\").startswith( - Adapter._absnorm(src + "\\") - ): - # dst is inside src - raise EnvironmentError( - "Cannot move a directory '{}' into itself '{}'".format(src, dst) - ) - shutil.copytree(src, dst, symlinks=True) - Adapter.rmtree(src) - else: - shutil.copy2(src, dst) - os.unlink(src) - - # move to codebase (sys client) - def rmtree(path): - """Recursively remove path. On permissions errors on windows, try to remove - the read-only flag and try again. - """ - path = Adapter._convert_path(path) - return shutil.rmtree(path, onerror=Adapter._chmod_and_retry) + +def find(): + pass + + +def info(path: str) -> Union[dict, Literal[False]]: + """Provides information about what is found at the given path. + + If info is called on a directory the size will be `None` + if Info is called on a resource that does not exist the response will be `False` + + N.B: despite my best efforts, getting a reliable cross-platform file creation time is + absurdly difficult. + See these links for information if we ever decide we have to have this feature: + * https://bugs.python.org/issue39533 + * https://github.com/ipfs-shipyard/py-datastore/blob/e566d40a8ca81d8628147e255fe7830b5f928a43/datastore/filesystem/util/statx.py # noqa: E501 + * https://github.com/ckarageorgkaneen/pystatx + + Args: + path: Full path being queried. + + Returns: + On success: A dict containing information about what is found at the given path. + On failure: `False` + + """ + # create a concrete path object. + path: Path = Path(path) + + # return `False` if the resource doesn't exsist + if not path.exists(): + return False + + # calulate file size (`None` for dirs) + size = None if path.is_dir() else path.stat().st_size + + # return info on the resource + return {"size": size, "modified_at": path.stat().st_mtime} diff --git a/core/dbt/clients/git.py b/core/dbt/clients/git.py index 48247892f08..ad39ab51aef 100644 --- a/core/dbt/clients/git.py +++ b/core/dbt/clients/git.py @@ -2,7 +2,7 @@ import os.path from dbt.clients.system import run_cmd -from dbt.clients.storage_adapter import StorageAdapter as SA +from dbt.clients.storage import adapter as SA from dbt.logger import GLOBAL_LOGGER as logger import dbt.exceptions from packaging import version @@ -43,7 +43,7 @@ def clone(repo, cwd, dirname=None, remove_git_dir=False, revision=None, subdirec run_cmd(os.path.join(cwd, dirname or ''), ['git', 'sparse-checkout', 'set', subdirectory]) if remove_git_dir: - SA.adapter.rmdir(os.path.join(dirname, '.git')) + SA.rmdir(os.path.join(dirname, '.git')) return result diff --git a/core/dbt/clients/storage.py b/core/dbt/clients/storage.py new file mode 100644 index 00000000000..8da8479753c --- /dev/null +++ b/core/dbt/clients/storage.py @@ -0,0 +1,27 @@ +from importlib import import_module +from os import getenv +from dbt.logger import GLOBAL_LOGGER as logger +from .storage_adapter_type import StorageAdapterType +from typing import cast + +# TODO: +# ensure configured adapter has the correct module signature +# provide better not-ready error messagin + +# get configured storage adapter +_module_to_load = getenv( + "DBT_STORAGE_ADAPTER", "dbt.adapters.internal_storage.local_filesystem" +) + +# load module if it exists +try: + _adapter = cast(StorageAdapterType, import_module(_module_to_load)) + logger.debug(f"Storage adapter {_module_to_load} loaded") +except ModuleNotFoundError: + logger.error(f"Storage adapter {_module_to_load} not found") + +# run ready check +if not _adapter.ready_check(): + logger.error(f"Storage Adapter{_module_to_load} not ready") +else: + adapter = _adapter diff --git a/core/dbt/clients/storage_adapter.py b/core/dbt/clients/storage_adapter.py deleted file mode 100644 index e828d283b77..00000000000 --- a/core/dbt/clients/storage_adapter.py +++ /dev/null @@ -1,26 +0,0 @@ -from importlib import import_module -from dbt.adapters.internal_storage.local_filesystem import Adapter as configured_adapter - -DEFAULT_STORAGE_ADAPTER = "dbt.adapters.internal_storage.local_filesystem" -DEFAULT_STORAGE_ADAPTER_NAME = "local_filesystem_storage_adapter" - -import_module(DEFAULT_STORAGE_ADAPTER) - - -class StorageAdapter(): - def __init__(self) -> None: - # TODO: - # get the config - # ensure configured adapter exsists - # import configured adapter - # ensure configured adapter has the correct class signature - # ensure configured adapter can access it's datastore - pass - - # Dynamically generated class attribute that is the - # configured storage adapter - adapter = type( - DEFAULT_STORAGE_ADAPTER_NAME, - (configured_adapter,), - dict() - ) diff --git a/core/dbt/clients/storage_adapter_type.py b/core/dbt/clients/storage_adapter_type.py new file mode 100644 index 00000000000..02735b1c9e8 --- /dev/null +++ b/core/dbt/clients/storage_adapter_type.py @@ -0,0 +1,37 @@ +from typing import Any, Dict, Union +from typing_extensions import Literal + + +class StorageAdapterType: + """Class that defines type hinting for storage adapters. + + This is needed because importlib (used in the storage client) and mypy aren't exactly friends. + N.B: https://stackoverflow.com/questions/48976499/mypy-importlib-module-functions + + """ + + @staticmethod + def ready_check() -> bool: + pass + + @staticmethod + def read(path: str, strip: bool = ...) -> str: + pass + + @staticmethod + def write( + path: str, content: Union[str, Dict[str, Any], None], overwrite: bool = ... + ) -> bool: + pass + + @staticmethod + def delete(path: str) -> bool: + pass + + @staticmethod + def find() -> None: + pass + + @staticmethod + def info(path: str) -> Union[dict, Literal[False]]: + pass diff --git a/core/dbt/compilation.py b/core/dbt/compilation.py index 8cba4e3fbc6..9fa80e396c6 100644 --- a/core/dbt/compilation.py +++ b/core/dbt/compilation.py @@ -8,7 +8,7 @@ from dbt import flags from dbt.adapters.factory import get_adapter from dbt.clients import jinja -from dbt.clients.storage_adapter import StorageAdapter as SA +from dbt.clients.storage import adapter as SA from dbt.context.providers import generate_runtime_model from dbt.contracts.graph.manifest import Manifest from dbt.contracts.graph.compiled import ( @@ -153,8 +153,8 @@ def __init__(self, config): self.config = config def initialize(self): - SA.adapter.make_directory(self.config.target_path) - SA.adapter.make_directory(self.config.modules_path) + SA.write(self.config.target_path, None) + SA.write(self.config.modules_path, None) # creates a ModelContext which is converted to # a dict for jinja rendering of SQL diff --git a/core/dbt/config/profile.py b/core/dbt/config/profile.py index f4207270380..66c7c3e523c 100644 --- a/core/dbt/config/profile.py +++ b/core/dbt/config/profile.py @@ -4,7 +4,7 @@ from dbt.dataclass_schema import ValidationError -from dbt.clients.storage_adapter import StorageAdapter as SA +from dbt.clients.storage import adapter as SA from dbt.clients.yaml_helper import load_yaml_text from dbt.contracts.connection import Credentials, HasCredentials from dbt.contracts.project import ProfileConfig, UserConfig @@ -52,7 +52,7 @@ def read_profile(profiles_dir: str) -> Dict[str, Any]: contents = None if os.path.isfile(path): try: - contents = SA.adapter.load_file_contents(path, strip=False) # type: ignore + contents = SA.read(path, strip=False) yaml_content = load_yaml_text(contents) if not yaml_content: msg = f'The profiles.yml file at {path} is empty' diff --git a/core/dbt/config/project.py b/core/dbt/config/project.py index 58b13d1d814..41ef9dc6788 100644 --- a/core/dbt/config/project.py +++ b/core/dbt/config/project.py @@ -9,7 +9,8 @@ import hashlib import os -from dbt.clients.storage_adapter import StorageAdapter as SA +from dbt.clients.storage import adapter as SA +from dbt.clients import system from dbt.clients.yaml_helper import load_yaml_text from dbt.contracts.connection import QueryComment from dbt.exceptions import DbtProjectError @@ -75,16 +76,16 @@ class IsFQNResource(Protocol): def _load_yaml(path): - contents = SA.adapter.load_file_contents(path) + contents = SA.read(path) return load_yaml_text(contents) def package_data_from_root(project_root): - package_filepath = SA.adapter.resolve_path_from_base( + package_filepath = system.resolve_path_from_base( 'packages.yml', project_root ) - if SA.adapter.path_exists(package_filepath): + if SA.info(package_filepath): packages_dict = _load_yaml(package_filepath) else: packages_dict = None @@ -147,7 +148,7 @@ def _raw_project_from(project_root: str) -> Dict[str, Any]: project_yaml_filepath = os.path.join(project_root, 'dbt_project.yml') # get the project.yml contents - if not SA.adapter.path_exists(project_yaml_filepath): # type: ignore # noqa + if not SA.info(project_yaml_filepath): raise DbtProjectError( 'no dbt_project.yml found at expected path {}' .format(project_yaml_filepath) diff --git a/core/dbt/config/selectors.py b/core/dbt/config/selectors.py index 8329d7a4510..3a583d70539 100644 --- a/core/dbt/config/selectors.py +++ b/core/dbt/config/selectors.py @@ -7,7 +7,8 @@ from .renderer import SelectorRenderer -from dbt.clients.storage_adapter import StorageAdapter as SA +from dbt.clients.storage import adapter as SA +from dbt.clients import system from dbt.contracts.selection import SelectorFile from dbt.exceptions import DbtSelectorsError, RuntimeException from dbt.graph import parse_from_selectors_definition, SelectionSpec @@ -71,7 +72,7 @@ def from_path( cls, path: Path, renderer: SelectorRenderer, ) -> 'SelectorConfig': try: - data = load_yaml_text(SA.adapter.load_file_contents(str(path))) # type: ignore + data = load_yaml_text(SA.read(str(path))) # type: ignore except (ValidationError, RuntimeException) as exc: raise DbtSelectorsError( f'Could not read selector file: {exc}', @@ -87,12 +88,12 @@ def from_path( def selector_data_from_root(project_root: str) -> Dict[str, Any]: - selector_filepath = SA.adapter.resolve_path_from_base( # type: ignore + selector_filepath = system.resolve_path_from_base( # type: ignore 'selectors.yml', project_root ) - if SA.adapter.path_exists(selector_filepath): # type: ignore - selectors_dict = load_yaml_text(SA.adapter.load_file_contents(selector_filepath)) # type: ignore # noqa + if SA.info(selector_filepath): + selectors_dict = load_yaml_text(SA.read(selector_filepath)) # type: ignore # noqa else: selectors_dict = None return selectors_dict diff --git a/core/dbt/deps/base.py b/core/dbt/deps/base.py index 0c9cd4b3752..83b871a1d05 100644 --- a/core/dbt/deps/base.py +++ b/core/dbt/deps/base.py @@ -4,7 +4,7 @@ from contextlib import contextmanager from typing import List, Optional, Generic, TypeVar -from dbt.clients import system +from dbt.clients.storage import adapter as SA from dbt.contracts.project import ProjectPackageMetadata from dbt.logger import GLOBAL_LOGGER as logger @@ -30,13 +30,13 @@ def downloads_directory(): DOWNLOADS_PATH = tempfile.mkdtemp(prefix='dbt-downloads-') remove_downloads = True - system.make_directory(DOWNLOADS_PATH) + SA.write(DOWNLOADS_PATH, None) logger.debug("Set downloads directory='{}'".format(DOWNLOADS_PATH)) yield DOWNLOADS_PATH if remove_downloads: - system.rmtree(DOWNLOADS_PATH) + SA.delete(DOWNLOADS_PATH) DOWNLOADS_PATH = None diff --git a/core/dbt/deps/registry.py b/core/dbt/deps/registry.py index 4f98ce9ccab..f7cfd677b1c 100644 --- a/core/dbt/deps/registry.py +++ b/core/dbt/deps/registry.py @@ -3,6 +3,7 @@ from dbt import semver from dbt.clients import registry, system +from dbt.clients.storage import adapter as SA from dbt.contracts.project import ( RegistryPackageMetadata, RegistryPackage, @@ -58,7 +59,7 @@ def install(self, project, renderer): tar_path = os.path.realpath( os.path.join(get_downloads_path(), tar_name) ) - system.make_directory(os.path.dirname(tar_path)) + SA.write(os.path.dirname(tar_path), None) download_url = metadata.downloads.tarball system.download(download_url, tar_path) diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index 43b0d6c70ae..30f04dc4243 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -20,7 +20,7 @@ from dbt.node_types import NodeType from dbt.clients.jinja import get_rendered, MacroStack from dbt.clients.jinja_static import statically_extract_macro_calls -from dbt.clients.system import make_directory +from dbt.clients.storage import adapter as SA from dbt.config import Project, RuntimeConfig from dbt.context.docs import generate_runtime_docs from dbt.context.macro_resolver import MacroResolver, TestMacroNamespace @@ -435,9 +435,7 @@ def write_manifest_for_partial_parse(self): PARTIAL_PARSE_FILE_NAME) try: manifest_msgpack = self.manifest.to_msgpack() - make_directory(os.path.dirname(path)) - with open(path, 'wb') as fp: - fp.write(manifest_msgpack) + SA.write(path, manifest_msgpack) except Exception: raise diff --git a/core/dbt/parser/read_files.py b/core/dbt/parser/read_files.py index 24db6fe448e..932c379c764 100644 --- a/core/dbt/parser/read_files.py +++ b/core/dbt/parser/read_files.py @@ -1,4 +1,4 @@ -from dbt.clients.system import load_file_contents +from dbt.clients.storage import adapter as SA from dbt.contracts.files import ( FilePath, ParseFileType, SourceFile, FileHash, AnySourceFile, SchemaSourceFile ) @@ -12,7 +12,7 @@ def load_source_file( path: FilePath, parse_file_type: ParseFileType, project_name: str) -> AnySourceFile: - file_contents = load_file_contents(path.absolute_path, strip=False) + file_contents = SA.read(path.absolute_path, strip=False) checksum = FileHash.from_contents(file_contents) sf_cls = SchemaSourceFile if parse_file_type == ParseFileType.Schema else SourceFile source_file = sf_cls(path=path, checksum=checksum, @@ -54,7 +54,7 @@ def load_seed_source_file(match: FilePath, project_name) -> SourceFile: # We don't want to calculate a hash of this file. Use the path. source_file = SourceFile.big_seed(match) else: - file_contents = load_file_contents(match.absolute_path, strip=False) + file_contents = SA.read(match.absolute_path, strip=False) checksum = FileHash.from_contents(file_contents) source_file = SourceFile(path=match, checksum=checksum) source_file.contents = '' diff --git a/core/dbt/task/debug.py b/core/dbt/task/debug.py index 5143abd5d58..914087e01fa 100644 --- a/core/dbt/task/debug.py +++ b/core/dbt/task/debug.py @@ -7,6 +7,7 @@ from dbt.logger import GLOBAL_LOGGER as logger import dbt.clients.system import dbt.exceptions +from dbt.clients.storage import adapter as SA from dbt.adapters.factory import get_adapter, register_adapter from dbt.config import Project, Profile, PROFILES_DIR from dbt.config.renderer import DbtProjectYamlRenderer, ProfileRenderer @@ -255,7 +256,7 @@ def _load_profile(self): try: raw_profile_data = load_yaml_text( - dbt.clients.system.load_file_contents(self.profile_path) + SA.read(self.profile_path) ) except Exception: pass # we'll report this when we try to load the profile for real diff --git a/core/dbt/task/deps.py b/core/dbt/task/deps.py index c93dd680204..7c54b69fa04 100644 --- a/core/dbt/task/deps.py +++ b/core/dbt/task/deps.py @@ -9,7 +9,7 @@ from dbt.deps.resolver import resolve_packages from dbt.logger import GLOBAL_LOGGER as logger -from dbt.clients import system +from dbt.clients.storage import adapter as SA from dbt.task.base import BaseTask, move_to_nearest_project_dir @@ -43,7 +43,7 @@ def track_package_install( ) def run(self): - system.make_directory(self.config.modules_path) + SA.write(self.config.modules_path, None) packages = self.config.packages.packages if not packages: logger.info('Warning: No packages were found in packages.yml') diff --git a/core/dbt/task/init.py b/core/dbt/task/init.py index fadee6a760a..16c52acf979 100644 --- a/core/dbt/task/init.py +++ b/core/dbt/task/init.py @@ -3,6 +3,7 @@ import dbt.config import dbt.clients.system +from dbt.clients.storage import adapter as SA from dbt.version import _get_adapter_plugin_names from dbt.adapters.factory import load_plugin, get_include_paths @@ -50,7 +51,7 @@ def create_profiles_dir(self, profiles_dir): if not os.path.exists(profiles_dir): msg = "Creating dbt configuration folder at {}" logger.info(msg.format(profiles_dir)) - dbt.clients.system.make_directory(profiles_dir) + SA.write(profiles_dir, None) return True return False From ed3dbbdf970fa5d178b8da765c318f038f3b9322 Mon Sep 17 00:00:00 2001 From: Ian Knox <81931810+iknox-fa@users.noreply.github.com> Date: Thu, 5 Aug 2021 09:42:18 -0500 Subject: [PATCH 3/4] Update core/dbt/adapters/internal_storage/local_filesystem.py Co-authored-by: leahwicz <60146280+leahwicz@users.noreply.github.com> --- core/dbt/adapters/internal_storage/local_filesystem.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/dbt/adapters/internal_storage/local_filesystem.py b/core/dbt/adapters/internal_storage/local_filesystem.py index 519f457d4e9..557a53f4a20 100644 --- a/core/dbt/adapters/internal_storage/local_filesystem.py +++ b/core/dbt/adapters/internal_storage/local_filesystem.py @@ -164,7 +164,7 @@ def delete(path: str) -> bool: if path.is_file(): path.unlink() - # remove directories recuersivly, surprisingly obnoxious to do in a cross-platform safe manner + # remove directories recursively, surprisingly obnoxious to do in a cross-platform safe manner if path.is_dir(): if platform in WINDOWS_PLATFORMS: # error handling for permissions on windows platforms From db2ef4d08482c9696203d8aafad5afae0b428d34 Mon Sep 17 00:00:00 2001 From: Ian Knox <81931810+iknox-fa@users.noreply.github.com> Date: Thu, 5 Aug 2021 09:42:27 -0500 Subject: [PATCH 4/4] Update core/dbt/clients/storage.py Co-authored-by: leahwicz <60146280+leahwicz@users.noreply.github.com> --- core/dbt/clients/storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/dbt/clients/storage.py b/core/dbt/clients/storage.py index 8da8479753c..50e1280a9f2 100644 --- a/core/dbt/clients/storage.py +++ b/core/dbt/clients/storage.py @@ -6,7 +6,7 @@ # TODO: # ensure configured adapter has the correct module signature -# provide better not-ready error messagin +# provide better not-ready error messaging # get configured storage adapter _module_to_load = getenv(