Skip to content

Commit

Permalink
update to add catalog integration client
Browse files Browse the repository at this point in the history
  • Loading branch information
colin-rogers-dbt committed Dec 6, 2024
1 parent 41bc4a4 commit 8386b5f
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 85 deletions.
2 changes: 1 addition & 1 deletion dbt/adapters/base/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dbt.adapters.base.meta import available
from dbt.adapters.base.column import Column
from dbt.adapters.base.catalog import ExternalCatalogIntegration
from dbt.adapters.base.catalog import CatalogIntegration
from dbt.adapters.base.connections import BaseConnectionManager
from dbt.adapters.base.impl import (
AdapterConfig,
Expand Down
59 changes: 0 additions & 59 deletions dbt/adapters/base/catalog.py

This file was deleted.

14 changes: 10 additions & 4 deletions dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
)

from dbt.adapters.base.column import Column as BaseColumn
from dbt.adapters.base.catalog import ExternalCatalogIntegration
from dbt.adapters.base.connections import (
AdapterResponse,
BaseConnectionManager,
Expand All @@ -66,6 +65,8 @@
)
from dbt.adapters.cache import RelationsCache, _make_ref_key_dict
from dbt.adapters.capability import Capability, CapabilityDict
from dbt.adapters.clients import catalogs as catalogs_client
from dbt.adapters.contracts.catalog import CatalogIntegration
from dbt.adapters.contracts.connection import Credentials
from dbt.adapters.contracts.macros import MacroResolverProtocol
from dbt.adapters.contracts.relation import RelationConfig
Expand All @@ -89,7 +90,7 @@
SnapshotTargetNotSnapshotTableError,
UnexpectedNonTimestampError,
)
from dbt.adapters.protocol import AdapterConfig, MacroContextGeneratorCallable
from dbt.adapters.protocol import AdapterConfig, MacroContextGeneratorCallable, CatalogIntegrationConfig

if TYPE_CHECKING:
import agate
Expand Down Expand Up @@ -261,13 +262,12 @@ class BaseAdapter(metaclass=AdapterMeta):
Macros:
- get_catalog
External Catalog support: Attach an implementation of ExternalCatalogIntegration
"""

Relation: Type[BaseRelation] = BaseRelation
Column: Type[BaseColumn] = BaseColumn
ConnectionManager: Type[BaseConnectionManager]
ExternalCatalogIntegration: Type[ExternalCatalogIntegration]
CatalogIntegrations: Dict[str, Type[CatalogIntegration]]

# A set of clobber config fields accepted by this adapter
# for use in materializations
Expand All @@ -294,7 +294,13 @@ def __init__(self, config, mp_context: SpawnContext) -> None:
self._macro_resolver: Optional[MacroResolverProtocol] = None
self._macro_context_generator: Optional[MacroContextGeneratorCallable] = None
self.behavior = DEFAULT_BASE_BEHAVIOR_FLAGS # type: ignore
self.add_catalog_integrations(config.catalog_integrations)

def add_catalog_integrations(self, catalog_integrations: Optional[List[CatalogIntegrationConfig]]) -> None:
if catalog_integrations:
for integration_config in catalog_integrations:
integration = self.CatalogIntegrations[integration_config.type](integration_config)
catalogs_client.add_catalog(integration)
###
# Methods to set / access a macro resolver
###
Expand Down
3 changes: 2 additions & 1 deletion dbt/adapters/base/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from dbt_common.exceptions import CompilationError, DbtRuntimeError
from dbt_common.utils import deep_merge, filter_null_values

from dbt.adapters.base import CatalogIntegration
from dbt.adapters.contracts.relation import (
ComponentName,
HasQuoting,
Expand Down Expand Up @@ -72,7 +73,7 @@ class BaseRelation(FakeAPIObject, Hashable):
# e.g. adding RelationType.View in dbt-postgres requires that you define:
# include/postgres/macros/relations/view/replace.sql::postgres__get_replace_view_sql()
replaceable_relations: SerializableIterable = field(default_factory=frozenset)
catalog: Optional[str] = None
catalog: Optional[CatalogIntegration] = None

def _is_exactish_match(self, field: ComponentName, value: str) -> bool:
if self.dbt_created and self.quote_policy.get_part(field) is False:
Expand Down
2 changes: 0 additions & 2 deletions dbt/adapters/capability.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ class Capability(str, Enum):
"""Indicates support for getting catalog information including table-level and column-level metadata for a single
relation."""

CreateExternalCatalog = "CreateExternalCatalog"

MicrobatchConcurrency = "MicrobatchConcurrency"
"""Indicates support running the microbatch incremental materialization strategy concurrently across threads."""

Expand Down
24 changes: 24 additions & 0 deletions dbt/adapters/clients/catalogs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from dbt.adapters.contracts.catalog import CatalogIntegration


class CatalogIntegrations:
def get(self, name: str) -> CatalogIntegration:
return self.integrations[name]

@property
def integrations(self) -> dict[str, CatalogIntegration]:
return self.integrations

def add_integration(self, integration: CatalogIntegration):
self.integrations[integration.name] = integration


_CATALOG_CLIENT = CatalogIntegrations()


def get_catalog(integration_name: str) -> CatalogIntegration:
return _CATALOG_CLIENT.get(integration_name)


def add_catalog(integration: CatalogIntegration):
_CATALOG_CLIENT.add_integration(integration)
36 changes: 36 additions & 0 deletions dbt/adapters/contracts/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import abc
from enum import Enum
from typing import Optional

from dbt.adapters.protocol import CatalogIntegrationConfig
from dbt.adapters.relation_configs.formats import TableFormat


class CatalogIntegrationType(Enum):
iceberg_rest = 'iceberg_rest'
glue = 'glue'
unity = 'unity'


class CatalogIntegration(abc.ABC):
"""
An external catalog integration is a connection to an external catalog that can be used to
interact with the catalog. This class is an abstract base class that should be subclassed by
specific integrations in the adapters.
"""
name: str
table_format: TableFormat
type: CatalogIntegrationType
external_volume: Optional[str] = None
namespace: Optional[str] = None

def __init__(
self, integration_config: CatalogIntegrationConfig
):
self.name = integration_config.name
self.table_format = TableFormat(integration_config.table_format)
self.type = CatalogIntegrationType(integration_config.type)
self.external_volume = integration_config.external_volume
self.namespace = integration_config.namespace

4 changes: 2 additions & 2 deletions dbt/adapters/contracts/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
ValidatedStringMixin,
dbtClassMixin,
)
from dbt_config.catalog_config import ExternalCatalogConfig


# TODO: this is a very bad dependency - shared global state
Expand All @@ -30,6 +29,7 @@
from mashumaro.jsonschema.annotations import Pattern
from typing_extensions import Protocol, Annotated

from dbt.adapters.contracts.catalog import CatalogIntegrations
from dbt.adapters.events.types import NewConnectionOpening
from dbt.adapters.utils import translate_aliases

Expand Down Expand Up @@ -229,4 +229,4 @@ class AdapterRequiredConfig(HasCredentials, Protocol):
cli_vars: Dict[str, Any]
target_path: str
log_cache_events: bool
catalogs = Optional[ExternalCatalogConfig]
catalog_integrations: Optional[CatalogIntegrations]
1 change: 0 additions & 1 deletion dbt/adapters/contracts/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from dbt_common.dataclass_schema import StrEnum, dbtClassMixin
from dbt_common.exceptions import CompilationError, DataclassNotDictError
from dbt_common.utils import deep_merge
from dbt_config.catalog_config import ExternalCatalog
from typing_extensions import Protocol


Expand Down
39 changes: 24 additions & 15 deletions dbt/adapters/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,19 @@ class ConnectionManagerProtocol(Protocol):
class ColumnProtocol(Protocol):
pass

class ExternalCatalogIntegrationProtocol(Protocol):

class CatalogIntegrationProtocol(Protocol):
pass


class CatalogIntegrationConfig(Protocol):
name: str
table_format: str
type: str
external_volume: Optional[str]
namespace: Optional[str]


Self = TypeVar("Self", bound="RelationProtocol")


Expand All @@ -54,27 +63,27 @@ def get_default_quote_policy(cls) -> Policy: ...

@classmethod
def create_from(
cls: Type[Self],
quoting: HasQuoting,
relation_config: RelationConfig,
**kwargs: Any,
cls: Type[Self],
quoting: HasQuoting,
relation_config: RelationConfig,
**kwargs: Any,
) -> Self: ...


AdapterConfig_T = TypeVar("AdapterConfig_T", bound=AdapterConfig)
ConnectionManager_T = TypeVar("ConnectionManager_T", bound=ConnectionManagerProtocol)
Relation_T = TypeVar("Relation_T", bound=RelationProtocol)
Column_T = TypeVar("Column_T", bound=ColumnProtocol)
ExtCatInteg_T = TypeVar("ExtCatInteg_T", bound=ExternalCatalogIntegrationProtocol)
ExtCatInteg_T = TypeVar("ExtCatInteg_T", bound=CatalogIntegrationProtocol)


class MacroContextGeneratorCallable(Protocol):
def __call__(
self,
macro_protocol: MacroProtocol,
config: AdapterRequiredConfig,
macro_resolver: MacroResolverProtocol,
package_name: Optional[str],
self,
macro_protocol: MacroProtocol,
config: AdapterRequiredConfig,
macro_resolver: MacroResolverProtocol,
package_name: Optional[str],
) -> Dict[str, Any]: ...


Expand All @@ -97,7 +106,7 @@ class AdapterProtocol( # type: ignore[misc]
Relation: Type[Relation_T]
ConnectionManager: Type[ConnectionManager_T]
connections: ConnectionManager_T
ExternalCatalogIntegration: Type[ExtCatInteg_T]
CatalogIntegration: Type[ExtCatInteg_T]

def __init__(self, config: AdapterRequiredConfig) -> None: ...

Expand All @@ -108,8 +117,8 @@ def get_macro_resolver(self) -> Optional[MacroResolverProtocol]: ...
def clear_macro_resolver(self) -> None: ...

def set_macro_context_generator(
self,
macro_context_generator: MacroContextGeneratorCallable,
self,
macro_context_generator: MacroContextGeneratorCallable,
) -> None: ...

@classmethod
Expand Down Expand Up @@ -152,5 +161,5 @@ def close(cls, connection: Connection) -> Connection: ...
def commit_if_has_connection(self) -> None: ...

def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[AdapterResponse, "agate.Table"]: ...
19 changes: 19 additions & 0 deletions dbt/adapters/relation_configs/formats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11
from typing_extensions import Self


class TableFormat(StrEnum):
"""
Some platforms may refer to this 'Object' or 'File Format'.
Data practitioners and interfaces refer to this as 'Table Format's, hence the term's use here.
"""

DEFAULT = "default"
ICEBERG = "iceberg"

@classmethod
def default(cls) -> Self:
return cls("default")

def __str__(self):
return self.value

0 comments on commit 8386b5f

Please sign in to comment.