Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use show schemas for snowflake list_schemas (#2166) #2171

Merged
merged 4 commits into from
Mar 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
## dbt 0.16.next (Release TBD)

### Breaking changes
- When overriding the snowflake__list_schemas macro, you must now run a result with a column named 'name' instead of the first column ([#2171](https://github.com/fishtown-analytics/dbt/pull/2171))
- dbt no longer supports databases with greater than 10,000 schemas ([#2171](https://github.com/fishtown-analytics/dbt/pull/2171))

### Features
- Remove the requirement to have a passphrase when using Snowflake key pair authentication ([#1805](https://github.com/fishtown-analytics/dbt/issues/1805), [#2164](https://github.com/fishtown-analytics/dbt/pull/2164))
- Adding optional "sslmode" parameter for postgres ([#2152](https://github.com/fishtown-analytics/dbt/issues/2152), [#2154](https://github.com/fishtown-analytics/dbt/pull/2154))

### Under the hood
- Use `show terse schemas in database` (chosen based on data collected by Michael Weinberg) instead of `select ... from information_schema.schemata` when collecting the list of schemas in a database ([#2166](https://github.com/fishtown-analytics/dbt/issues/2166), [#2171](https://github.com/fishtown-analytics/dbt/pull/2171))
- Parallelize filling the cache and listing schemas in each database during startup ([#2127](https://github.com/fishtown-analytics/dbt/issues/2127), [#2157](https://github.com/fishtown-analytics/dbt/pull/2157))

Contributors:
- [@mhmcdonald](https://github.com/mhmcdonald) ([#2164](https://github.com/fishtown-analytics/dbt/pull/2164))
- [@dholleran-lendico](https://github.com/dholleran-lendico) ([#2154](https://github.com/fishtown-analytics/dbt/pull/2154))

## dbt 0.16.0b3 (February 26, 2020)

### Breaking changes
Expand All @@ -7,10 +25,8 @@
### Features
- Add a "docs" field to models, with a "show" subfield ([#1671](https://github.com/fishtown-analytics/dbt/issues/1671), [#2107](https://github.com/fishtown-analytics/dbt/pull/2107))
- Add a dbt-{dbt_version} user agent field to the bigquery connector ([#2121](https://github.com/fishtown-analytics/dbt/issues/2121), [#2146](https://github.com/fishtown-analytics/dbt/pull/2146))
- Adding optional "sslmode" parameter for postgres ([#2152](https://github.com/fishtown-analytics/dbt/issues/2152), [#2154](https://github.com/fishtown-analytics/dbt/pull/2154))
- Add support for `generate_database_name` macro ([#1695](https://github.com/fishtown-analytics/dbt/issues/1695), [#2143](https://github.com/fishtown-analytics/dbt/pull/2143))
- Expand the search path for schema.yml (and by extension, the default docs path) to include macro-paths and analysis-paths (in addition to source-paths, data-paths, and snapshot-paths) ([#2155](https://github.com/fishtown-analytics/dbt/issues/2155), [#2160](https://github.com/fishtown-analytics/dbt/pull/2160))
- Remove the requirement to have a passphrase when using Snowflake key pair authentication ([#1804](https://github.com/fishtown-analytics/dbt/issues/1805), [#2164](https://github.com/fishtown-analytics/dbt/pull/2164))

### Fixes
- Fix issue where dbt did not give an error in the presence of duplicate doc names ([#2054](https://github.com/fishtown-analytics/dbt/issues/2054), [#2080](https://github.com/fishtown-analytics/dbt/pull/2080))
Expand All @@ -19,14 +35,9 @@
- Fix an issue where dbt rendered source test args, fix issue where dbt ran an extra compile pass over the wrapped SQL. ([#2114](https://github.com/fishtown-analytics/dbt/issues/2114), [#2150](https://github.com/fishtown-analytics/dbt/pull/2150))
- Set more upper bounds for jinja2,requests, and idna dependencies, upgrade snowflake-connector-python ([#2147](https://github.com/fishtown-analytics/dbt/issues/2147), [#2151](https://github.com/fishtown-analytics/dbt/pull/2151))

### Under the hood
- Parallelize filling the cache and listing schemas in each database during startup ([#2127](https://github.com/fishtown-analytics/dbt/issues/2127), [#2157](https://github.com/fishtown-analytics/dbt/pull/2157))

Contributors:
- [@bubbomb](https://github.com/bubbomb) ([#2080](https://github.com/fishtown-analytics/dbt/pull/2080))
- [@sonac](https://github.com/sonac) ([#2078](https://github.com/fishtown-analytics/dbt/pull/2078))
- [@mhmcdonald](https://github.com/mhmcdonald) ([#2164](https://github.com/fishtown-analytics/dbt/pull/2164))
- [@dholleran-lendico](https://github.com/dholleran-lendico) ([#2154](https://github.com/fishtown-analytics/dbt/pull/2154))

## dbt 0.16.0b1 (February 11, 2020)

Expand Down
6 changes: 5 additions & 1 deletion core/dbt/adapters/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
from dbt.contracts.connection import Credentials # noqa
from dbt.adapters.base.meta import available # noqa
from dbt.adapters.base.connections import BaseConnectionManager # noqa
from dbt.adapters.base.relation import BaseRelation, RelationType # noqa
from dbt.adapters.base.relation import ( # noqa
BaseRelation,
RelationType,
SchemaSearchMap,
)
from dbt.adapters.base.column import Column # noqa
from dbt.adapters.base.impl import BaseAdapter # noqa
from dbt.adapters.base.plugin import AdapterPlugin # noqa
62 changes: 7 additions & 55 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from dbt.adapters.base.connections import BaseConnectionManager, Connection
from dbt.adapters.base.meta import AdapterMeta, available
from dbt.adapters.base.relation import (
ComponentName, BaseRelation, InformationSchema
ComponentName, BaseRelation, InformationSchema, SchemaSearchMap
)
from dbt.adapters.base import Column as BaseColumn
from dbt.adapters.cache import RelationsCache
Expand Down Expand Up @@ -120,55 +120,6 @@ def _relation_name(rel: Optional[BaseRelation]) -> str:
return str(rel)


class SchemaSearchMap(Dict[InformationSchema, Set[Optional[str]]]):
"""A utility class to keep track of what information_schema tables to
search for what schemas
"""
def add(self, relation: BaseRelation):
key = relation.information_schema_only()
if key not in self:
self[key] = set()
lowered: Optional[str] = None
if relation.schema is not None:
lowered = relation.schema.lower()
self[key].add(lowered)

def search(self):
for information_schema_name, schemas in self.items():
for schema in schemas:
yield information_schema_name, schema

def schemas_searched(self):
result: Set[Tuple[str, str]] = set()
for information_schema_name, schemas in self.items():
result.update(
(information_schema_name.database, schema)
for schema in schemas
)
return result

def flatten(self):
new = self.__class__()

# make sure we don't have duplicates
seen = {r.database.lower() for r in self if r.database}
if len(seen) > 1:
raise_compiler_error(str(seen))

for information_schema_name, schema in self.search():
path = {
'database': information_schema_name.database,
'schema': schema
}
new.add(information_schema_name.incorporate(
path=path,
quote_policy={'database': False},
include_policy={'database': False},
))

return new


class BaseAdapter(metaclass=AdapterMeta):
"""The BaseAdapter provides an abstract base class for adapters.

Expand Down Expand Up @@ -1010,11 +961,12 @@ def execute_macro(

macro_function = MacroGenerator(macro, macro_context)

try:
result = macro_function(**kwargs)
finally:
if release:
self.release_connection()
with self.connections.exception_handler(f'macro {macro_name}'):
try:
result = macro_function(**kwargs)
finally:
if release:
self.release_connection()
return result

@classmethod
Expand Down
59 changes: 58 additions & 1 deletion core/dbt/adapters/base/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from collections.abc import Mapping, Hashable
from dataclasses import dataclass, fields
from typing import (
Optional, TypeVar, Generic, Any, Type, Dict, Union, List, Iterator, Tuple
Optional, TypeVar, Generic, Any, Type, Dict, Union, List, Iterator, Tuple,
Set
)
from typing_extensions import Protocol

Expand Down Expand Up @@ -496,3 +497,59 @@ def _render_iterator(self):
for k, v in super()._render_iterator():
yield k, v
yield None, self.information_schema_view


class SchemaSearchMap(Dict[InformationSchema, Set[Optional[str]]]):
"""A utility class to keep track of what information_schema tables to
search for what schemas
"""
def add(self, relation: BaseRelation, preserve_case=False):
key = relation.information_schema_only()
if key not in self:
self[key] = set()
schema: Optional[str] = None
if relation.schema is not None:
if preserve_case:
schema = relation.schema
else:
schema = relation.schema.lower()
self[key].add(schema)

def search(self) -> Iterator[Tuple[InformationSchema, Optional[str]]]:
for information_schema_name, schemas in self.items():
for schema in schemas:
yield information_schema_name, schema

def schemas_searched(self) -> Set[Tuple[str, Optional[str]]]:
result: Set[Tuple[str, Optional[str]]] = set()
for information_schema_name, schemas in self.items():
if information_schema_name.database is None:
raise InternalException(
'Got a None database in an information schema!'
)
result.update(
(information_schema_name.database, schema)
for schema in schemas
)
return result

def flatten(self):
new = self.__class__()

# make sure we don't have duplicates
seen = {r.database.lower() for r in self if r.database}
if len(seen) > 1:
dbt.exceptions.raise_compiler_error(str(seen))

for information_schema_name, schema in self.search():
path = {
'database': information_schema_name.database,
'schema': schema
}
new.add(information_schema_name.incorporate(
path=path,
quote_policy={'database': False},
include_policy={'database': False},
))

return new
17 changes: 0 additions & 17 deletions core/dbt/deprecations.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,6 @@ def show(self, *args, **kwargs) -> None:
active_deprecations.add(self.name)


class GenerateSchemaNameSingleArgDeprecated(DBTDeprecation):
_name = 'generate-schema-name-single-arg'

_description = '''\
As of dbt v0.14.0, the `generate_schema_name` macro accepts a second "node"
argument. The one-argument form of `generate_schema_name` is deprecated,
and will become unsupported in a future release.



For more information, see:

https://docs.getdbt.com/v0.14/docs/upgrading-to-014
'''


class MaterializationReturnDeprecation(DBTDeprecation):
_name = 'materialization-return'

Expand Down Expand Up @@ -166,7 +150,6 @@ def warn(name, *args, **kwargs):
active_deprecations: Set[str] = set()

deprecations_list: List[DBTDeprecation] = [
GenerateSchemaNameSingleArgDeprecated(),
MaterializationReturnDeprecation(),
NotADictionaryDeprecation(),
ColumnQuotingDeprecation(),
Expand Down
43 changes: 33 additions & 10 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Optional, Dict, List, Set, Tuple, Iterable

from dbt.task.base import ConfiguredTask
from dbt.adapters.base import SchemaSearchMap
from dbt.adapters.factory import get_adapter
from dbt.logger import (
GLOBAL_LOGGER as logger,
Expand Down Expand Up @@ -376,25 +377,38 @@ def interpret_results(self, results):
return len(failures) == 0

def get_model_schemas(
self, selected_uids: Iterable[str]
) -> Set[Tuple[str, str]]:
self, adapter, selected_uids: Iterable[str]
) -> SchemaSearchMap:
if self.manifest is None:
raise InternalException('manifest was None in get_model_schemas')
search_map = SchemaSearchMap()

schemas: Set[Tuple[str, str]] = set()
for node in self.manifest.nodes.values():
if node.unique_id not in selected_uids:
continue
if node.is_refable and not node.is_ephemeral:
schemas.add((node.database, node.schema))
relation = adapter.Relation.create_from(self.config, node)
# we're going to be creating these schemas, so preserve the
# case.
search_map.add(relation, preserve_case=True)

return schemas
return search_map

def create_schemas(self, adapter, selected_uids: Iterable[str]):
required_schemas = self.get_model_schemas(selected_uids)
required_databases = set(db for db, _ in required_schemas)
required_schemas = self.get_model_schemas(adapter, selected_uids)
# we want the string form of the information schema database
required_databases: List[str] = []
for info in required_schemas:
include_policy = info.include_policy.replace(
schema=False, identifier=False, database=True
)
db_only = info.replace(
include_policy=include_policy,
information_schema_view=None,
)
required_databases.append(str(db_only))

existing_schemas_lowered: Set[Tuple[str, str]] = set()
existing_schemas_lowered: Set[Tuple[str, Optional[str]]] = set()

def list_schemas(db: str) -> List[Tuple[str, str]]:
with adapter.connection_named(f'list_{db}'):
Expand All @@ -418,8 +432,17 @@ def create_schema(db: str, schema: str) -> None:
for ls_future in as_completed(list_futures):
existing_schemas_lowered.update(ls_future.result())

for db, schema in required_schemas:
db_schema = (db.lower(), schema.lower())
for info, schema in required_schemas.search():
if info.database is None:
raise InternalException(
'Got an information schema with no database!'
)
db: str = info.database
lower_schema: Optional[str] = None
if schema is not None:
lower_schema = schema.lower()

db_schema = (db.lower(), lower_schema)
if db_schema not in existing_schemas_lowered:
existing_schemas_lowered.add(db_schema)
create_futures.append(
Expand Down
8 changes: 6 additions & 2 deletions plugins/bigquery/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
import dbt.clients.agate_helper
import dbt.links

from dbt.adapters.base import BaseAdapter, available, RelationType
from dbt.adapters.base.impl import SchemaSearchMap
from dbt.adapters.base import (
BaseAdapter, available, RelationType, SchemaSearchMap
)
from dbt.adapters.bigquery.relation import (
BigQueryRelation, BigQueryInformationSchema
)
Expand Down Expand Up @@ -179,6 +180,9 @@ def rename_relation(

@available
def list_schemas(self, database: str) -> List[str]:
# the database string we get here is potentially quoted. Strip that off
# for the API call.
database = database.strip('`')
conn = self.connections.get_thread_connection()
client = conn.handle

Expand Down
22 changes: 20 additions & 2 deletions plugins/snowflake/dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from typing import Mapping, Any, Optional
from typing import Mapping, Any, Optional, List

import agate

from dbt.adapters.sql import SQLAdapter
from dbt.adapters.sql.impl import LIST_SCHEMAS_MACRO_NAME
from dbt.adapters.snowflake import SnowflakeConnectionManager
from dbt.adapters.snowflake import SnowflakeRelation
from dbt.adapters.snowflake import SnowflakeColumn
from dbt.contracts.graph.manifest import Manifest
from dbt.exceptions import RuntimeException
from dbt.exceptions import RuntimeException, DatabaseException
from dbt.utils import filter_null_values


Expand Down Expand Up @@ -81,3 +82,20 @@ def post_model_hook(
) -> None:
if context is not None:
self._use_warehouse(context)

def list_schemas(self, database: str) -> List[str]:
try:
results = self.execute_macro(
LIST_SCHEMAS_MACRO_NAME,
kwargs={'database': database}
)
except DatabaseException as exc:
msg = (
f'Database error while listing schemas in database '
f'"{database}"\n{exc}'
)
raise RuntimeException(msg)
# this uses 'show terse schemas in database', and the column name we
# want is 'name'

return [row['name'] for row in results]
Loading