-
Notifications
You must be signed in to change notification settings - Fork 91
/
credentials.py
286 lines (236 loc) · 9.97 KB
/
credentials.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
import os
from dataclasses import dataclass
from dataclasses import field
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
from urllib.parse import urlparse
from dbt_common.dataclass_schema import dbtClassMixin
from dbt_common.exceptions import DbtRuntimeError
from dbt.adapters.contracts.connection import Credentials
from dbt.adapters.duckdb.secrets import DEFAULT_SECRET_PREFIX
from dbt.adapters.duckdb.secrets import Secret
@dataclass
class Attachment(dbtClassMixin):
# The path to the database to be attached (may be a URL)
path: str
# The type of the attached database (defaults to duckdb, but may be supported by an extension)
type: Optional[str] = None
# An optional alias for the attached database
alias: Optional[str] = None
# Whether the attached database is read-only or read/write
read_only: bool = False
def to_sql(self) -> str:
# remove query parameters (not supported in ATTACH)
parsed = urlparse(self.path)
path = self.path.replace(f"?{parsed.query}", "")
base = f"ATTACH '{path}'"
if self.alias:
base += f" AS {self.alias}"
options = []
if self.type:
options.append(f"TYPE {self.type}")
if self.read_only:
options.append("READ_ONLY")
if options:
joined = ", ".join(options)
base += f" ({joined})"
return base
@dataclass
class PluginConfig(dbtClassMixin):
module: str
alias: Optional[str] = None
# A plugin-specific set of configuration options
config: Optional[Dict[str, Any]] = None
@dataclass
class Remote(dbtClassMixin):
host: str
port: int
user: str
password: Optional[str] = None
@dataclass
class Retries(dbtClassMixin):
# The number of times to attempt the initial duckdb.connect call
# (to wait for another process to free the lock on the DB file)
connect_attempts: int = 1
# The number of times to attempt to execute a DuckDB query that throws
# one of the retryable exceptions
query_attempts: Optional[int] = None
# The list of exceptions that we are willing to retry on
retryable_exceptions: List[str] = field(default_factory=lambda: ["IOException"])
@dataclass
class Extension(dbtClassMixin):
name: str
repo: str
@dataclass
class DuckDBCredentials(Credentials):
database: str = "main"
schema: str = "main"
path: str = ":memory:"
# Any connection-time configuration information that we need to pass
# to DuckDB (e.g., if we need to enable using unsigned extensions)
config_options: Optional[Dict[str, Any]] = None
# any DuckDB extensions we want to install and load (httpfs, parquet, etc.)
extensions: Optional[List[Union[str, Dict[str, str]]]] = None
# any additional pragmas we want to configure on our DuckDB connections;
# a list of the built-in pragmas can be found here:
# https://duckdb.org/docs/sql/configuration
# (and extensions may add their own pragmas as well)
settings: Optional[Dict[str, Any]] = None
# secrets for connecting to cloud services AWS S3, Azure, Cloudfare R2,
# Google Cloud and Huggingface.
secrets: Optional[List[Dict[str, Any]]] = None
# the root path to use for any external materializations that are specified
# in this dbt project; defaults to "." (the current working directory)
external_root: str = "."
# identify whether to use the default credential provider chain for AWS/GCloud
# instead of statically defined environment variables
use_credential_provider: Optional[str] = None
# A list of additional databases that should be attached to the running
# DuckDB instance to make them available for use in models; see the
# schema for the Attachment dataclass above for what fields it can contain
attach: Optional[List[Attachment]] = None
# A list of filesystems to attach to the DuckDB database via the fsspec
# interface; see https://duckdb.org/docs/guides/python/filesystems.html
#
# Each dictionary entry must have a "fs" entry to indicate which
# fsspec implementation should be loaded, and then an arbitrary additional
# number of key-value pairs that will be passed as arguments to the fsspec
# registry method.
filesystems: Optional[List[Dict[str, Any]]] = None
# Used to configure remote environments/connections
remote: Optional[Remote] = None
# A list of dbt-duckdb plugins that can be used to customize the
# behavior of loading source data and/or storing the relations that are
# created by SQL or Python models; see the plugins module for more details.
plugins: Optional[List[PluginConfig]] = None
# Whether to disable transactions when executing SQL statements; this
# is useful when we would like the resulting DuckDB database file to
# be as small as possible.
disable_transactions: bool = False
# Whether to keep the DuckDB connection open between invocations of dbt
# (we do this automatically for in-memory or MD connections, but not for
# local DuckDB files, but this is a way to override that behavior)
keep_open: bool = False
# A list of paths to Python modules that should be loaded into the
# running Python environment when dbt is invoked; this is useful for
# loading custom dbt-duckdb plugins or locally defined modules that
# provide helper functions for dbt Python models.
module_paths: Optional[List[str]] = None
# An optional strategy for allowing retries when certain types of
# exceptions occur on a model run (e.g., IOExceptions that were caused
# by networking issues)
retries: Optional[Retries] = None
def __post_init__(self):
self.settings = self.settings or {}
self.secrets = self.secrets or []
self._secrets = []
# Add MotherDuck plugin if the path is a MotherDuck database
# and plugin was not specified in profile.yml
if self.is_motherduck:
if self.plugins is None:
self.plugins = []
if "motherduck" not in [plugin.module for plugin in self.plugins]:
self.plugins.append(PluginConfig(module="motherduck"))
# For backward compatibility, to be deprecated in the future
if self.use_credential_provider:
if self.use_credential_provider == "aws":
self.secrets.append({"type": "s3", "provider": "credential_chain"})
else:
raise ValueError(
"Unsupported value for use_credential_provider: "
+ self.use_credential_provider
)
if self.secrets:
self._secrets = [
Secret.create(
secret_type=secret.pop("type"),
name=secret.pop("name", f"{DEFAULT_SECRET_PREFIX}{num + 1}"),
**secret,
)
for num, secret in enumerate(self.secrets)
]
def secrets_sql(self) -> List[str]:
return [secret.to_sql() for secret in self._secrets]
@property
def motherduck_attach(self):
# Check if any MotherDuck paths are attached
attach = []
for attached_db in self.attach or []:
parsed = urlparse(attached_db.path)
if self._is_motherduck(parsed.scheme):
attach.append(attached_db)
return attach
@property
def is_motherduck_attach(self):
return len(self.motherduck_attach) > 0
@property
def is_motherduck(self):
parsed = urlparse(self.path)
return self._is_motherduck(parsed.scheme) or self.is_motherduck_attach
@staticmethod
def _is_motherduck(scheme: str) -> bool:
return scheme in {"md", "motherduck"}
@classmethod
def __pre_deserialize__(cls, data: Dict[Any, Any]) -> Dict[Any, Any]:
data = super().__pre_deserialize__(data)
path = data.get("path")
path_db = None
if path is None or path == ":memory:":
path_db = "memory"
else:
parsed = urlparse(path)
base_file = os.path.basename(parsed.path)
path_db = os.path.splitext(base_file)[0]
# For MotherDuck, turn on disable_transactions unless
# it's explicitly set already by the user
if cls._is_motherduck(parsed.scheme):
if "disable_transactions" not in data:
data["disable_transactions"] = True
if path_db == "":
path_db = "my_db"
if path_db and "database" not in data:
data["database"] = path_db
elif path_db and data["database"] != path_db:
if not data.get("remote"):
raise DbtRuntimeError(
"Inconsistency detected between 'path' and 'database' fields in profile; "
f"the 'database' property must be set to '{path_db}' to match the 'path'"
)
elif not path_db:
raise DbtRuntimeError(
"Unable to determine target database name from 'path' field in profile"
)
return data
@property
def unique_field(self) -> str:
"""
This property returns a unique field for the database connection.
If the connection is remote, it returns the host and port as a string.
If the connection is local, it returns the path and external root as a string.
"""
if self.remote:
return self.remote.host + str(self.remote.port)
else:
return self.path + self.external_root
@property
def type(self):
return "duckdb"
def _connection_keys(self):
return (
"database",
"schema",
"path",
"config_options",
"extensions",
"settings",
"external_root",
"use_credential_provider",
"attach",
"filesystems",
"remote",
"plugins",
"disable_transactions",
)