Skip to content

Commit

Permalink
Attempt changing over to system for metadata gathering (#692)
Browse files Browse the repository at this point in the history
  • Loading branch information
benc-db authored Jun 6, 2024
1 parent ef7782a commit 7fd525c
Show file tree
Hide file tree
Showing 11 changed files with 45 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Fixes

- Undo the removal of spark.sql.sources.partitionOverwriteMode = DYNAMIC ([688](https://github.com/databricks/dbt-databricks/pull/688))
- Migrate to using system.information_schema to fix issue with catalog renames ([692](https://github.com/databricks/dbt-databricks/pull/692))
- Cancel python jobs when dbt operation is canceled (thanks @gaoshihang for kicking this off!) ([693](https://github.com/databricks/dbt-databricks/pull/693))

## dbt-databricks 1.8.1 (May 29, 2024)
Expand Down
5 changes: 1 addition & 4 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,10 +749,7 @@ def _describe_relation(

@staticmethod
def _get_information_schema_views(adapter: DatabricksAdapter, kwargs: Dict[str, Any]) -> "Row":
row = get_first_row(adapter.execute_macro("get_view_description", kwargs=kwargs))
if "view_definition" in row.keys() and row["view_definition"] is not None:
return row
return get_first_row(adapter.execute_macro("get_view_description_alt", kwargs=kwargs))
return get_first_row(adapter.execute_macro("get_view_description", kwargs=kwargs))


class StreamingTableAPI(DeltaLiveTableAPIBase[StreamingTableConfig]):
Expand Down
18 changes: 10 additions & 8 deletions dbt/include/databricks/macros/adapters/catalog.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
{% set query %}
with tables as (
{{ databricks__get_catalog_tables_sql(information_schema) }}
{{ databricks__get_catalog_schemas_where_clause_sql(schemas) }}
{{ databricks__get_catalog_schemas_where_clause_sql(information_schema.database, schemas) }}
),
columns as (
{{ databricks__get_catalog_columns_sql(information_schema) }}
{{ databricks__get_catalog_schemas_where_clause_sql(schemas) }}
{{ databricks__get_catalog_schemas_where_clause_sql(information_schema.database, schemas) }}
)
{{ databricks__get_catalog_results_sql() }}
{%- endset -%}
Expand Down Expand Up @@ -44,7 +44,8 @@
last_altered as `stats:last_modified:value`,
'The timestamp for last update/change' as `stats:last_modified:description`,
(last_altered is not null and table_type not ilike '%VIEW%') as `stats:last_modified:include`
from {{ information_schema }}.tables
from `system`.`information_schema`.`tables`
where table_catalog = '{{ information_schema.database }}'
{%- endmacro %}

{% macro databricks__get_catalog_columns_sql(information_schema) -%}
Expand All @@ -56,7 +57,8 @@
ordinal_position as column_index,
lower(data_type) as column_type,
comment as column_comment
from {{ information_schema }}.columns
from `system`.`information_schema`.`columns`
where table_catalog = '{{ information_schema.database }}'
{%- endmacro %}

{% macro databricks__get_catalog_results_sql() -%}
Expand All @@ -66,15 +68,15 @@
order by column_index
{%- endmacro %}

{% macro databricks__get_catalog_schemas_where_clause_sql(schemas) -%}
where ({%- for relation in schemas -%}
{% macro databricks__get_catalog_schemas_where_clause_sql(catalog, schemas) -%}
where table_catalog = '{{ catalog }}' and ({%- for relation in schemas -%}
table_schema = lower('{{ relation[1] }}'){%- if not loop.last %} or {% endif -%}
{%- endfor -%})
{%- endmacro %}


{% macro databricks__get_catalog_relations_where_clause_sql(relations) -%}
where (
{% macro databricks__get_catalog_relations_where_clause_sql(catalog, relations) -%}
where table_catalog = '{{ catalog }}' and (
{%- for relation in relations -%}
{% if relation.schema and relation.identifier %}
(
Expand Down
21 changes: 7 additions & 14 deletions dbt/include/databricks/macros/adapters/metadata.sql
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@
table_name as identifier,
last_altered as last_modified,
{{ current_timestamp() }} as snapshotted_at
from {{ information_schema }}.tables
where (
from `system`.`information_schema`.`tables`
where table_catalog = '{{ information_schema.database }}' and
(
{%- for relation in relations -%}
(table_schema = '{{ relation.schema }}' and
table_name = '{{ relation.identifier }}'){%- if not loop.last %} or {% endif -%}
Expand All @@ -74,23 +75,15 @@
{% macro get_view_description(relation) %}
{% call statement('get_view_description', fetch_result=True) -%}
select *
from {{ relation.information_schema() }}.`views`
where table_schema = '{{ relation.schema }}'
from `system`.`information_schema`.`views`
where table_catalog = '{{ relation.database }}'
and table_schema = '{{ relation.schema }}'
and table_name = '{{ relation.identifier }}'
{%- endcall -%}

{% do return(load_result('get_view_description').table) %}
{% endmacro %}

{% macro get_view_description_alt(relation) %}
{% call statement('get_view_description_alt', fetch_result=True) -%}
select *
from `system`.`information_schema`.`views`
where table_catalog = '{{ relation.database }}'
and table_schema = '{{ relation.schema }}'
and table_name = '{{ relation.identifier }}'
{% endcall %}

{% do return(load_result('get_view_description_alt').table) %}
{% endmacro %}

Expand All @@ -101,7 +94,7 @@
if(table_type in ('EXTERNAL', 'MANAGED', 'MANAGED_SHALLOW_CLONE'), 'table', lower(table_type)) as table_type,
lower(data_source_format) as file_format,
table_owner
from `{{ relation.database }}`.`information_schema`.`tables`
from `system`.`information_schema`.`tables`
where table_schema = '{{ relation.schema }}'
{% if relation.identifier %}
and table_name = '{{ relation.identifier }}'
Expand Down
6 changes: 4 additions & 2 deletions dbt/include/databricks/macros/relations/tags.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@

{% macro fetch_tags_sql(relation) -%}
SELECT tag_name, tag_value
FROM `{{ relation.database }}`.`information_schema`.`table_tags`
WHERE schema_name = '{{ relation.schema }}' AND table_name = '{{ relation.identifier }}'
FROM `system`.`information_schema`.`table_tags`
WHERE catalog_name = '{{ relation.database }}'
AND schema_name = '{{ relation.schema }}'
AND table_name = '{{ relation.identifier }}'
{%- endmacro -%}

{% macro apply_tags(relation, set_tags, unset_tags=[]) -%}
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


def pytest_addoption(parser):
parser.addoption("--profile", action="store", default="databricks_uc_cluster", type=str)
parser.addoption("--profile", action="store", default="databricks_uc_sql_endpoint", type=str)


# Using @pytest.mark.skip_profile('databricks_cluster') uses the 'skip_by_adapter_type'
Expand Down
4 changes: 2 additions & 2 deletions tests/functional/adapter/incremental/test_incremental_tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_changing_tags(self, project):
util.write_file(fixtures.tags_b, "models", "schema.yml")
util.run_dbt(["run"])
results = project.run_sql(
"select tag_name, tag_value from {database}.information_schema.table_tags "
"select tag_name, tag_value from `system`.`information_schema`.`table_tags` "
"where schema_name = '{schema}' and table_name='merge_update_columns_sql'",
fetch="all",
)
Expand All @@ -42,7 +42,7 @@ def test_changing_tags(self, project):
util.write_file(fixtures.python_schema2, "models", "schema.yml")
util.run_dbt(["run"])
results = project.run_sql(
"select tag_name, tag_value from {database}.information_schema.table_tags "
"select tag_name, tag_value from `system`.`information_schema`.`table_tags` "
"where schema_name = '{schema}' and table_name='tags'",
fetch="all",
)
Expand Down
11 changes: 7 additions & 4 deletions tests/functional/adapter/materialized_view_tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@

def query_relation_type(project, relation: BaseRelation) -> Optional[str]:
table_type = project.run_sql(
f"select table_type from {relation.information_schema_only()}."
f"`tables` where table_schema = '{relation.schema}'"
"select table_type from `system`.`information_schema`.`tables`"
f" where table_catalog = '{relation.database}'"
f" and table_schema = '{relation.schema}'"
f" and table_name = '{relation.identifier}'",
fetch="one",
)[0]
Expand All @@ -17,8 +18,10 @@ def query_relation_type(project, relation: BaseRelation) -> Optional[str]:
return DatabricksRelationType.Table.value
else:
is_materialized = project.run_sql(
f"select is_materialized from {relation.information_schema_only()}."
f"`views` where table_name = '{relation.identifier}'",
"select is_materialized from `system`.`information_schema`.`views`"
f" where table_catalog = '{relation.database}'"
f" and table_schema = '{relation.schema}'"
f" and table_name = '{relation.identifier}'",
fetch="one",
)[0]
if is_materialized == "TRUE":
Expand Down
11 changes: 7 additions & 4 deletions tests/functional/adapter/streaming_tables/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@

def query_relation_type(project, relation: BaseRelation) -> Optional[str]:
table_type = project.run_sql(
f"select table_type from {relation.information_schema_only()}."
f"`tables` where table_schema = '{relation.schema}'"
"select table_type from `system`.`information_schema`.`tables`"
f" where table_catalog = '{relation.database}'"
f" and table_schema = '{relation.schema}'"
f" and table_name = '{relation.identifier}'",
fetch="one",
)[0]
Expand All @@ -17,8 +18,10 @@ def query_relation_type(project, relation: BaseRelation) -> Optional[str]:
return DatabricksRelationType.Table.value
else:
is_materialized = project.run_sql(
f"select is_materialized from {relation.information_schema_only()}."
f"`views` where table_name = '{relation.identifier}'",
"select is_materialized from `system`.`information_schema`.`views`"
f" where table_catalog = '{relation.database}'"
f" and table_schema = '{relation.schema}'"
f" and table_name = '{relation.identifier}'",
fetch="one",
)[0]
if is_materialized == "TRUE":
Expand Down
4 changes: 2 additions & 2 deletions tests/functional/adapter/tags/test_databricks_tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ def test_tags(self, project):
_ = util.run_dbt(["run"])
_ = util.run_dbt(["run"])
results = project.run_sql(
"select tag_name, tag_value from {database}.information_schema.table_tags "
"where schema_name = '{schema}' and table_name='tags'",
"select tag_name, tag_value from `system`.`information_schema`.`table_tags`"
" where schema_name = '{schema}' and table_name='tags'",
fetch="all",
)
assert len(results) == 2
Expand Down
5 changes: 3 additions & 2 deletions tests/unit/macros/relations/test_tags_macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ def test_macros_fetch_tags_sql(self, template_bundle):
sql = self.render_bundle(template_bundle, "fetch_tags_sql")
expected = (
"SELECT tag_name, tag_value "
"FROM `some_database`.`information_schema`.`table_tags` "
"WHERE schema_name = 'some_schema' AND table_name = 'some_table'"
"FROM `system`.`information_schema`.`table_tags` "
"WHERE catalog_name = 'some_database'"
" AND schema_name = 'some_schema' AND table_name = 'some_table'"
)
assert sql == expected

Expand Down

0 comments on commit 7fd525c

Please sign in to comment.