Skip to content

Commit

Permalink
move the sql for getting different rows into dbt proper, from the tes…
Browse files Browse the repository at this point in the history
…t suite. Bump pytest dependency.
  • Loading branch information
Jacob Beck committed Jul 2, 2020
1 parent 3af8a22 commit 4cf2b78
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 40 deletions.
58 changes: 58 additions & 0 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,64 @@ def post_model_hook(self, config: Mapping[str, Any], context: Any) -> None:
"""
pass

def get_rows_different_sql(
self,
relation_a: BaseRelation,
relation_b: BaseRelation,
column_names: Optional[List[str]] = None,
except_operator: str = 'EXCEPT',
) -> str:
"""Generate SQL for a query that returns a single row with a two
columns: the number of rows that are different between the two
relations and the number of mismatched rows.
"""
# This method only really exists for test reasons.
names: List[str]
if column_names is None:
columns = self.get_columns_in_relation(relation_a)
names = sorted((self.quote(c.name) for c in columns))
else:
names = sorted((self.quote(n) for n in column_names))
columns_csv = ', '.join(names)

sql = COLUMNS_EQUAL_SQL.format(
columns=columns_csv,
relation_a=str(relation_a),
relation_b=str(relation_b),
except_op=except_operator,
)

return sql


COLUMNS_EQUAL_SQL = '''
with diff_count as (
SELECT
1 as id,
COUNT(*) as num_missing FROM (
(SELECT {columns} FROM {relation_a} {except_op}
SELECT {columns} FROM {relation_b})
UNION ALL
(SELECT {columns} FROM {relation_b} {except_op}
SELECT {columns} FROM {relation_a})
) as a
), table_a as (
SELECT COUNT(*) as num_rows FROM {relation_a}
), table_b as (
SELECT COUNT(*) as num_rows FROM {relation_b}
), row_count_diff as (
select
1 as id,
table_a.num_rows - table_b.num_rows as difference
from table_a, table_b
)
select
row_count_diff.difference as row_count_difference,
diff_count.num_missing as num_mismatched
from row_count_diff
join diff_count using (id)
'''.strip()


def catch_as_completed(
futures # typing: List[Future[agate.Table]]
Expand Down
4 changes: 4 additions & 0 deletions core/dbt/adapters/base/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ def __eq__(self, other):
def get_default_quote_policy(cls) -> Policy:
return cls._get_field_named('quote_policy').default

@classmethod
def get_default_include_policy(cls) -> Policy:
return cls._get_field_named('include_policy').default

def get(self, key, default=None):
"""Override `.get` to return a metadata object so we don't break
dbt_utils.
Expand Down
2 changes: 1 addition & 1 deletion dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
freezegun==0.3.12
pytest==4.4.0
pytest==5.4.3
flake8>=3.5.0
pytz==2017.2
bumpversion==0.5.3
Expand Down
14 changes: 14 additions & 0 deletions plugins/bigquery/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,3 +738,17 @@ def grant_access_to(self, entity, entity_type, role, grant_target_dict):
access_entries.append(AccessEntry(role, entity_type, entity))
dataset.access_entries = access_entries
client.update_dataset(dataset, ['access_entries'])

def get_rows_different_sql(
self,
relation_a: BigQueryRelation,
relation_b: BigQueryRelation,
column_names: Optional[List[str]] = None,
except_operator='EXCEPT DISTINCT'
) -> str:
return super().get_rows_different_sql(
relation_a=relation_a,
relation_b=relation_b,
column_names=column_names,
except_operator=except_operator,
)
43 changes: 4 additions & 39 deletions test/integration/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,46 +880,11 @@ def get_models_in_schema(self, schema=None):
def _assertTablesEqualSql(self, relation_a, relation_b, columns=None):
if columns is None:
columns = self.get_relation_columns(relation_a)
column_names = [c[0] for c in columns]

columns_csv = ', '.join([self.adapter.quote(record[0]) for record in columns])

if self.adapter_type == 'bigquery':
except_operator = 'EXCEPT DISTINCT'
else:
except_operator = 'EXCEPT'

sql = """
with diff_count as (
SELECT
1 as id,
COUNT(*) as num_missing FROM (
(SELECT {columns} FROM {relation_a} {except_op}
SELECT {columns} FROM {relation_b})
UNION ALL
(SELECT {columns} FROM {relation_b} {except_op}
SELECT {columns} FROM {relation_a})
) as a
), table_a as (
SELECT COUNT(*) as num_rows FROM {relation_a}
), table_b as (
SELECT COUNT(*) as num_rows FROM {relation_b}
), row_count_diff as (
select
1 as id,
table_a.num_rows - table_b.num_rows as difference
from table_a, table_b
)
select
row_count_diff.difference as row_count_difference,
diff_count.num_missing as num_mismatched
from row_count_diff
join diff_count using (id)
""".strip().format(
columns=columns_csv,
relation_a=str(relation_a),
relation_b=str(relation_b),
except_op=except_operator
)
sql = self.adapter.get_rows_different_sql(
relation_a, relation_b, column_names
)

return sql

Expand Down

0 comments on commit 4cf2b78

Please sign in to comment.