From 8386b5f48186f5cfac364fe6e9b7f074c58fcea2 Mon Sep 17 00:00:00 2001 From: Colin Date: Thu, 5 Dec 2024 16:57:08 -0800 Subject: [PATCH] update to add catalog integration client --- dbt/adapters/base/__init__.py | 2 +- dbt/adapters/base/catalog.py | 59 ------------------------ dbt/adapters/base/impl.py | 14 ++++-- dbt/adapters/base/relation.py | 3 +- dbt/adapters/capability.py | 2 - dbt/adapters/clients/catalogs.py | 24 ++++++++++ dbt/adapters/contracts/catalog.py | 36 +++++++++++++++ dbt/adapters/contracts/connection.py | 4 +- dbt/adapters/contracts/relation.py | 1 - dbt/adapters/protocol.py | 39 ++++++++++------ dbt/adapters/relation_configs/formats.py | 19 ++++++++ 11 files changed, 118 insertions(+), 85 deletions(-) delete mode 100644 dbt/adapters/base/catalog.py create mode 100644 dbt/adapters/clients/catalogs.py create mode 100644 dbt/adapters/contracts/catalog.py create mode 100644 dbt/adapters/relation_configs/formats.py diff --git a/dbt/adapters/base/__init__.py b/dbt/adapters/base/__init__.py index 0600fc00..947cda22 100644 --- a/dbt/adapters/base/__init__.py +++ b/dbt/adapters/base/__init__.py @@ -1,6 +1,6 @@ from dbt.adapters.base.meta import available from dbt.adapters.base.column import Column -from dbt.adapters.base.catalog import ExternalCatalogIntegration +from dbt.adapters.base.catalog import CatalogIntegration from dbt.adapters.base.connections import BaseConnectionManager from dbt.adapters.base.impl import ( AdapterConfig, diff --git a/dbt/adapters/base/catalog.py b/dbt/adapters/base/catalog.py deleted file mode 100644 index a1aa66e6..00000000 --- a/dbt/adapters/base/catalog.py +++ /dev/null @@ -1,59 +0,0 @@ -import abc -from typing import Self, ValuesView - -from dbt_config.catalog_config import ExternalCatalog - -from dbt.adapters.base import BaseRelation, BaseConnectionManager - - -class ExternalCatalogIntegration(abc.ABC): - name: str - external_catalog: ExternalCatalog - _connection_manager: BaseConnectionManager - _exists: bool - - @classmethod - def create(cls, external_catalog: ExternalCatalog, connection_manager: BaseConnectionManager) -> Self: - integration = ExternalCatalogIntegration() - integration.external_catalog = external_catalog - integration.name = external_catalog.name - _connection_manager = connection_manager - return integration - - @abc.abstractmethod - def _exists(self) -> bool: - pass - - def exists(self) -> bool: - return self._exists - @abc.abstractmethod - def relation_exists(self, relation: BaseRelation) -> bool: - pass - - @abc.abstractmethod - def refresh_relation(self, table_name: str) -> None: - pass - - @abc.abstractmethod - def create_relation(self, table_name: str) -> None: - pass - - -class ExternalCatalogIntegrations: - def get(self, name: str) -> ExternalCatalogIntegration: - return self.integrations[name] - - @property - def integrations(self) -> dict[str, ExternalCatalogIntegration]: - return self.integrations - - @classmethod - def from_json_strings(cls, json_strings: ValuesView[str], - integration_class: ExternalCatalogIntegration, - connection_manager: BaseConnectionManager) -> Self: - new_instance = cls() - for json_string in json_strings: - external_catalog = ExternalCatalog.model_validate_json(json_string) - integration = integration_class.create(external_catalog, connection_manager) - new_instance.integrations[integration.name] = integration - return new_instance diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index 25c5e5fd..4972a3bc 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -50,7 +50,6 @@ ) from dbt.adapters.base.column import Column as BaseColumn -from dbt.adapters.base.catalog import ExternalCatalogIntegration from dbt.adapters.base.connections import ( AdapterResponse, BaseConnectionManager, @@ -66,6 +65,8 @@ ) from dbt.adapters.cache import RelationsCache, _make_ref_key_dict from dbt.adapters.capability import Capability, CapabilityDict +from dbt.adapters.clients import catalogs as catalogs_client +from dbt.adapters.contracts.catalog import CatalogIntegration from dbt.adapters.contracts.connection import Credentials from dbt.adapters.contracts.macros import MacroResolverProtocol from dbt.adapters.contracts.relation import RelationConfig @@ -89,7 +90,7 @@ SnapshotTargetNotSnapshotTableError, UnexpectedNonTimestampError, ) -from dbt.adapters.protocol import AdapterConfig, MacroContextGeneratorCallable +from dbt.adapters.protocol import AdapterConfig, MacroContextGeneratorCallable, CatalogIntegrationConfig if TYPE_CHECKING: import agate @@ -261,13 +262,12 @@ class BaseAdapter(metaclass=AdapterMeta): Macros: - get_catalog - External Catalog support: Attach an implementation of ExternalCatalogIntegration """ Relation: Type[BaseRelation] = BaseRelation Column: Type[BaseColumn] = BaseColumn ConnectionManager: Type[BaseConnectionManager] - ExternalCatalogIntegration: Type[ExternalCatalogIntegration] + CatalogIntegrations: Dict[str, Type[CatalogIntegration]] # A set of clobber config fields accepted by this adapter # for use in materializations @@ -294,7 +294,13 @@ def __init__(self, config, mp_context: SpawnContext) -> None: self._macro_resolver: Optional[MacroResolverProtocol] = None self._macro_context_generator: Optional[MacroContextGeneratorCallable] = None self.behavior = DEFAULT_BASE_BEHAVIOR_FLAGS # type: ignore + self.add_catalog_integrations(config.catalog_integrations) + def add_catalog_integrations(self, catalog_integrations: Optional[List[CatalogIntegrationConfig]]) -> None: + if catalog_integrations: + for integration_config in catalog_integrations: + integration = self.CatalogIntegrations[integration_config.type](integration_config) + catalogs_client.add_catalog(integration) ### # Methods to set / access a macro resolver ### diff --git a/dbt/adapters/base/relation.py b/dbt/adapters/base/relation.py index b60528fe..8fe724ce 100644 --- a/dbt/adapters/base/relation.py +++ b/dbt/adapters/base/relation.py @@ -18,6 +18,7 @@ from dbt_common.exceptions import CompilationError, DbtRuntimeError from dbt_common.utils import deep_merge, filter_null_values +from dbt.adapters.base import CatalogIntegration from dbt.adapters.contracts.relation import ( ComponentName, HasQuoting, @@ -72,7 +73,7 @@ class BaseRelation(FakeAPIObject, Hashable): # e.g. adding RelationType.View in dbt-postgres requires that you define: # include/postgres/macros/relations/view/replace.sql::postgres__get_replace_view_sql() replaceable_relations: SerializableIterable = field(default_factory=frozenset) - catalog: Optional[str] = None + catalog: Optional[CatalogIntegration] = None def _is_exactish_match(self, field: ComponentName, value: str) -> bool: if self.dbt_created and self.quote_policy.get_part(field) is False: diff --git a/dbt/adapters/capability.py b/dbt/adapters/capability.py index ca61f752..f0243053 100644 --- a/dbt/adapters/capability.py +++ b/dbt/adapters/capability.py @@ -21,8 +21,6 @@ class Capability(str, Enum): """Indicates support for getting catalog information including table-level and column-level metadata for a single relation.""" - CreateExternalCatalog = "CreateExternalCatalog" - MicrobatchConcurrency = "MicrobatchConcurrency" """Indicates support running the microbatch incremental materialization strategy concurrently across threads.""" diff --git a/dbt/adapters/clients/catalogs.py b/dbt/adapters/clients/catalogs.py new file mode 100644 index 00000000..d5cbe65a --- /dev/null +++ b/dbt/adapters/clients/catalogs.py @@ -0,0 +1,24 @@ +from dbt.adapters.contracts.catalog import CatalogIntegration + + +class CatalogIntegrations: + def get(self, name: str) -> CatalogIntegration: + return self.integrations[name] + + @property + def integrations(self) -> dict[str, CatalogIntegration]: + return self.integrations + + def add_integration(self, integration: CatalogIntegration): + self.integrations[integration.name] = integration + + +_CATALOG_CLIENT = CatalogIntegrations() + + +def get_catalog(integration_name: str) -> CatalogIntegration: + return _CATALOG_CLIENT.get(integration_name) + + +def add_catalog(integration: CatalogIntegration): + _CATALOG_CLIENT.add_integration(integration) diff --git a/dbt/adapters/contracts/catalog.py b/dbt/adapters/contracts/catalog.py new file mode 100644 index 00000000..db27df0f --- /dev/null +++ b/dbt/adapters/contracts/catalog.py @@ -0,0 +1,36 @@ +import abc +from enum import Enum +from typing import Optional + +from dbt.adapters.protocol import CatalogIntegrationConfig +from dbt.adapters.relation_configs.formats import TableFormat + + +class CatalogIntegrationType(Enum): + iceberg_rest = 'iceberg_rest' + glue = 'glue' + unity = 'unity' + + +class CatalogIntegration(abc.ABC): + """ + An external catalog integration is a connection to an external catalog that can be used to + interact with the catalog. This class is an abstract base class that should be subclassed by + specific integrations in the adapters. + + """ + name: str + table_format: TableFormat + type: CatalogIntegrationType + external_volume: Optional[str] = None + namespace: Optional[str] = None + + def __init__( + self, integration_config: CatalogIntegrationConfig + ): + self.name = integration_config.name + self.table_format = TableFormat(integration_config.table_format) + self.type = CatalogIntegrationType(integration_config.type) + self.external_volume = integration_config.external_volume + self.namespace = integration_config.namespace + diff --git a/dbt/adapters/contracts/connection.py b/dbt/adapters/contracts/connection.py index 1c317c53..a40d0672 100644 --- a/dbt/adapters/contracts/connection.py +++ b/dbt/adapters/contracts/connection.py @@ -19,7 +19,6 @@ ValidatedStringMixin, dbtClassMixin, ) -from dbt_config.catalog_config import ExternalCatalogConfig # TODO: this is a very bad dependency - shared global state @@ -30,6 +29,7 @@ from mashumaro.jsonschema.annotations import Pattern from typing_extensions import Protocol, Annotated +from dbt.adapters.contracts.catalog import CatalogIntegrations from dbt.adapters.events.types import NewConnectionOpening from dbt.adapters.utils import translate_aliases @@ -229,4 +229,4 @@ class AdapterRequiredConfig(HasCredentials, Protocol): cli_vars: Dict[str, Any] target_path: str log_cache_events: bool - catalogs = Optional[ExternalCatalogConfig] + catalog_integrations: Optional[CatalogIntegrations] diff --git a/dbt/adapters/contracts/relation.py b/dbt/adapters/contracts/relation.py index 636ea9cc..7bb0c531 100644 --- a/dbt/adapters/contracts/relation.py +++ b/dbt/adapters/contracts/relation.py @@ -10,7 +10,6 @@ from dbt_common.dataclass_schema import StrEnum, dbtClassMixin from dbt_common.exceptions import CompilationError, DataclassNotDictError from dbt_common.utils import deep_merge -from dbt_config.catalog_config import ExternalCatalog from typing_extensions import Protocol diff --git a/dbt/adapters/protocol.py b/dbt/adapters/protocol.py index 58ff001b..7a793219 100644 --- a/dbt/adapters/protocol.py +++ b/dbt/adapters/protocol.py @@ -41,10 +41,19 @@ class ConnectionManagerProtocol(Protocol): class ColumnProtocol(Protocol): pass -class ExternalCatalogIntegrationProtocol(Protocol): + +class CatalogIntegrationProtocol(Protocol): pass +class CatalogIntegrationConfig(Protocol): + name: str + table_format: str + type: str + external_volume: Optional[str] + namespace: Optional[str] + + Self = TypeVar("Self", bound="RelationProtocol") @@ -54,10 +63,10 @@ def get_default_quote_policy(cls) -> Policy: ... @classmethod def create_from( - cls: Type[Self], - quoting: HasQuoting, - relation_config: RelationConfig, - **kwargs: Any, + cls: Type[Self], + quoting: HasQuoting, + relation_config: RelationConfig, + **kwargs: Any, ) -> Self: ... @@ -65,16 +74,16 @@ def create_from( ConnectionManager_T = TypeVar("ConnectionManager_T", bound=ConnectionManagerProtocol) Relation_T = TypeVar("Relation_T", bound=RelationProtocol) Column_T = TypeVar("Column_T", bound=ColumnProtocol) -ExtCatInteg_T = TypeVar("ExtCatInteg_T", bound=ExternalCatalogIntegrationProtocol) +ExtCatInteg_T = TypeVar("ExtCatInteg_T", bound=CatalogIntegrationProtocol) class MacroContextGeneratorCallable(Protocol): def __call__( - self, - macro_protocol: MacroProtocol, - config: AdapterRequiredConfig, - macro_resolver: MacroResolverProtocol, - package_name: Optional[str], + self, + macro_protocol: MacroProtocol, + config: AdapterRequiredConfig, + macro_resolver: MacroResolverProtocol, + package_name: Optional[str], ) -> Dict[str, Any]: ... @@ -97,7 +106,7 @@ class AdapterProtocol( # type: ignore[misc] Relation: Type[Relation_T] ConnectionManager: Type[ConnectionManager_T] connections: ConnectionManager_T - ExternalCatalogIntegration: Type[ExtCatInteg_T] + CatalogIntegration: Type[ExtCatInteg_T] def __init__(self, config: AdapterRequiredConfig) -> None: ... @@ -108,8 +117,8 @@ def get_macro_resolver(self) -> Optional[MacroResolverProtocol]: ... def clear_macro_resolver(self) -> None: ... def set_macro_context_generator( - self, - macro_context_generator: MacroContextGeneratorCallable, + self, + macro_context_generator: MacroContextGeneratorCallable, ) -> None: ... @classmethod @@ -152,5 +161,5 @@ def close(cls, connection: Connection) -> Connection: ... def commit_if_has_connection(self) -> None: ... def execute( - self, sql: str, auto_begin: bool = False, fetch: bool = False + self, sql: str, auto_begin: bool = False, fetch: bool = False ) -> Tuple[AdapterResponse, "agate.Table"]: ... diff --git a/dbt/adapters/relation_configs/formats.py b/dbt/adapters/relation_configs/formats.py new file mode 100644 index 00000000..f440e530 --- /dev/null +++ b/dbt/adapters/relation_configs/formats.py @@ -0,0 +1,19 @@ +from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11 +from typing_extensions import Self + + +class TableFormat(StrEnum): + """ + Some platforms may refer to this 'Object' or 'File Format'. + Data practitioners and interfaces refer to this as 'Table Format's, hence the term's use here. + """ + + DEFAULT = "default" + ICEBERG = "iceberg" + + @classmethod + def default(cls) -> Self: + return cls("default") + + def __str__(self): + return self.value