diff --git a/.changes/1.13.0.md b/.changes/1.13.0.md new file mode 100644 index 00000000..2fade0c2 --- /dev/null +++ b/.changes/1.13.0.md @@ -0,0 +1,13 @@ +## dbt-adapters 1.13.0 - December 19, 2024 + +### Features + +- Add function to run custom sql for getting freshness info ([#8797](https://github.com/dbt-labs/dbt-adapters/issues/8797)) + +### Fixes + +- Use `sql` instead of `compiled_code` within the default `get_limit_sql` macro ([#372](https://github.com/dbt-labs/dbt-adapters/issues/372)) + +### Under the Hood + +- Adapter tests for new snapshot configs ([#380](https://github.com/dbt-labs/dbt-adapters/issues/380)) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 04a14545..421a66ad 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -58,11 +58,45 @@ jobs: branch: ${{ needs.generate-changelog.outputs.branch-name }} secrets: inherit - publish-pypi: + package: if: ${{ inputs.pypi-public == true }} - needs: generate-changelog - uses: ./.github/workflows/_publish-pypi.yml + uses: ./.github/workflows/_package-directory.yml with: package: ${{ inputs.package }} - deploy-to: ${{ inputs.deploy-to }} - branch: ${{ needs.generate-changelog.outputs.branch-name }} + + publish-pypi: + if: ${{ inputs.pypi-public == true }} + needs: [package, generate-changelog] + runs-on: ${{ vars.DEFAULT_RUNNER }} + environment: + name: ${{ inputs.deploy-to }} + url: ${{ vars.PYPI_PROJECT_URL }}/${{ inputs.package }} + permissions: + # this permission is required for trusted publishing + # see https://github.com/marketplace/actions/pypi-publish + id-token: write + steps: + - uses: actions/checkout@v4 + with: + ref: ${{ needs.generate-changelog.outputs.branch-name }} + - uses: actions/setup-python@v5 + with: + python-version: ${{ vars.DEFAULT_PYTHON_VERSION }} + - uses: pypa/hatch@install + # hatch will build using test PyPI first and fall back to prod PyPI when deploying to test + # this is done via environment variables in the test environment in GitHub + - run: hatch build && hatch run build:check-all + working-directory: ./${{ needs.package.outputs.directory }} + - uses: pypa/gh-action-pypi-publish@release/v1 + with: + repository-url: ${{ vars.PYPI_REPOSITORY_URL }} + packages-dir: ./${{ needs.package.outputs.directory }}dist/ + - id: version + run: echo "version=$(hatch version)" >> $GITHUB_OUTPUT + working-directory: ./${{ needs.package.outputs.directory }} + - uses: nick-fields/retry@v3 + with: + timeout_seconds: 10 + retry_wait_seconds: 10 + max_attempts: 15 # 5 minutes: (10s timeout + 10s delay) * 15 attempts + command: wget ${{ vars.PYPI_PROJECT_URL }}/${{ steps.version.outputs.version }} diff --git a/CHANGELOG.md b/CHANGELOG.md index 47a19fcb..fde4210c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,9 +5,23 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html), and is generated by [Changie](https://github.com/miniscruff/changie). -## dbt-adapters 1.12.0 - December 18, 2024 +## dbt-adapters 1.13.0 - December 19, 2024 + +### Features + +- Add function to run custom sql for getting freshness info ([#8797](https://github.com/dbt-labs/dbt-adapters/issues/8797)) + +### Fixes +- Use `sql` instead of `compiled_code` within the default `get_limit_sql` macro ([#372](https://github.com/dbt-labs/dbt-adapters/issues/372)) +### Under the Hood + +- Adapter tests for new snapshot configs ([#380](https://github.com/dbt-labs/dbt-adapters/issues/380)) + + + +## dbt-adapters 1.12.0 - December 18, 2024 ## dbt-adapters 1.11.0 - December 17, 2024 diff --git a/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/fixtures.py b/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/fixtures.py new file mode 100644 index 00000000..cec28a7d --- /dev/null +++ b/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/fixtures.py @@ -0,0 +1,430 @@ +create_seed_sql = """ +create table {schema}.seed ( + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + updated_at TIMESTAMP +); +""" + +create_snapshot_expected_sql = """ +create table {schema}.snapshot_expected ( + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + + -- snapshotting fields + updated_at TIMESTAMP, + test_valid_from TIMESTAMP, + test_valid_to TIMESTAMP, + test_scd_id TEXT, + test_updated_at TIMESTAMP +); +""" + + +seed_insert_sql = """ +-- seed inserts +-- use the same email for two users to verify that duplicated check_cols values +-- are handled appropriately +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(1, 'Judith', 'Kennedy', '(not provided)', 'Female', '54.60.24.128', '2015-12-24 12:19:28'), +(2, 'Arthur', 'Kelly', '(not provided)', 'Male', '62.56.24.215', '2015-10-28 16:22:15'), +(3, 'Rachel', 'Moreno', 'rmoreno2@msu.edu', 'Female', '31.222.249.23', '2016-04-05 02:05:30'), +(4, 'Ralph', 'Turner', 'rturner3@hp.com', 'Male', '157.83.76.114', '2016-08-08 00:06:51'), +(5, 'Laura', 'Gonzales', 'lgonzales4@howstuffworks.com', 'Female', '30.54.105.168', '2016-09-01 08:25:38'), +(6, 'Katherine', 'Lopez', 'klopez5@yahoo.co.jp', 'Female', '169.138.46.89', '2016-08-30 18:52:11'), +(7, 'Jeremy', 'Hamilton', 'jhamilton6@mozilla.org', 'Male', '231.189.13.133', '2016-07-17 02:09:46'), +(8, 'Heather', 'Rose', 'hrose7@goodreads.com', 'Female', '87.165.201.65', '2015-12-29 22:03:56'), +(9, 'Gregory', 'Kelly', 'gkelly8@trellian.com', 'Male', '154.209.99.7', '2016-03-24 21:18:16'), +(10, 'Rachel', 'Lopez', 'rlopez9@themeforest.net', 'Female', '237.165.82.71', '2016-08-20 15:44:49'), +(11, 'Donna', 'Welch', 'dwelcha@shutterfly.com', 'Female', '103.33.110.138', '2016-02-27 01:41:48'), +(12, 'Russell', 'Lawrence', 'rlawrenceb@qq.com', 'Male', '189.115.73.4', '2016-06-11 03:07:09'), +(13, 'Michelle', 'Montgomery', 'mmontgomeryc@scientificamerican.com', 'Female', '243.220.95.82', '2016-06-18 16:27:19'), +(14, 'Walter', 'Castillo', 'wcastillod@pagesperso-orange.fr', 'Male', '71.159.238.196', '2016-10-06 01:55:44'), +(15, 'Robin', 'Mills', 'rmillse@vkontakte.ru', 'Female', '172.190.5.50', '2016-10-31 11:41:21'), +(16, 'Raymond', 'Holmes', 'rholmesf@usgs.gov', 'Male', '148.153.166.95', '2016-10-03 08:16:38'), +(17, 'Gary', 'Bishop', 'gbishopg@plala.or.jp', 'Male', '161.108.182.13', '2016-08-29 19:35:20'), +(18, 'Anna', 'Riley', 'arileyh@nasa.gov', 'Female', '253.31.108.22', '2015-12-11 04:34:27'), +(19, 'Sarah', 'Knight', 'sknighti@foxnews.com', 'Female', '222.220.3.177', '2016-09-26 00:49:06'), +(20, 'Phyllis', 'Fox', null, 'Female', '163.191.232.95', '2016-08-21 10:35:19'); +""" + + +populate_snapshot_expected_sql = """ +-- populate snapshot table +insert into {schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + null::timestamp as test_valid_to, + updated_at as test_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as test_scd_id +from {schema}.seed; +""" + +populate_snapshot_expected_valid_to_current_sql = """ +-- populate snapshot table +insert into {schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + date('2099-12-31') as test_valid_to, + updated_at as test_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as test_scd_id +from {schema}.seed; +""" + +snapshot_actual_sql = """ +{% snapshot snapshot_actual %} + + {{ + config( + unique_key='id || ' ~ "'-'" ~ ' || first_name', + ) + }} + + select * from {{target.database}}.{{target.schema}}.seed + +{% endsnapshot %} +""" + +snapshots_yml = """ +snapshots: + - name: snapshot_actual + config: + strategy: timestamp + updated_at: updated_at + snapshot_meta_column_names: + dbt_valid_to: test_valid_to + dbt_valid_from: test_valid_from + dbt_scd_id: test_scd_id + dbt_updated_at: test_updated_at +""" + +snapshots_no_column_names_yml = """ +snapshots: + - name: snapshot_actual + config: + strategy: timestamp + updated_at: updated_at +""" + +ref_snapshot_sql = """ +select * from {{ ref('snapshot_actual') }} +""" + + +invalidate_sql = """ +-- update records 11 - 21. Change email and updated_at field +update {schema}.seed set + updated_at = updated_at + interval '1 hour', + email = case when id = 20 then 'pfoxj@creativecommons.org' else 'new_' || email end +where id >= 10 and id <= 20; + + +-- invalidate records 11 - 21 +update {schema}.snapshot_expected set + test_valid_to = updated_at + interval '1 hour' +where id >= 10 and id <= 20; + +""" + +update_sql = """ +-- insert v2 of the 11 - 21 records + +insert into {schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + null::timestamp as test_valid_to, + updated_at as test_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as test_scd_id +from {schema}.seed +where id >= 10 and id <= 20; +""" + +# valid_to_current fixtures + +snapshots_valid_to_current_yml = """ +snapshots: + - name: snapshot_actual + config: + strategy: timestamp + updated_at: updated_at + dbt_valid_to_current: "date('2099-12-31')" + snapshot_meta_column_names: + dbt_valid_to: test_valid_to + dbt_valid_from: test_valid_from + dbt_scd_id: test_scd_id + dbt_updated_at: test_updated_at +""" + +update_with_current_sql = """ +-- insert v2 of the 11 - 21 records + +insert into {schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + date('2099-12-31') as test_valid_to, + updated_at as test_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as test_scd_id +from {schema}.seed +where id >= 10 and id <= 20; +""" + + +# multi-key snapshot fixtures + +create_multi_key_seed_sql = """ +create table {schema}.seed ( + id1 INTEGER, + id2 INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + updated_at TIMESTAMP +); +""" + + +create_multi_key_snapshot_expected_sql = """ +create table {schema}.snapshot_expected ( + id1 INTEGER, + id2 INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + + -- snapshotting fields + updated_at TIMESTAMP, + test_valid_from TIMESTAMP, + test_valid_to TIMESTAMP, + test_scd_id TEXT, + test_updated_at TIMESTAMP +); +""" + +seed_multi_key_insert_sql = """ +-- seed inserts +-- use the same email for two users to verify that duplicated check_cols values +-- are handled appropriately +insert into {schema}.seed (id1, id2, first_name, last_name, email, gender, ip_address, updated_at) values +(1, 100, 'Judith', 'Kennedy', '(not provided)', 'Female', '54.60.24.128', '2015-12-24 12:19:28'), +(2, 200, 'Arthur', 'Kelly', '(not provided)', 'Male', '62.56.24.215', '2015-10-28 16:22:15'), +(3, 300, 'Rachel', 'Moreno', 'rmoreno2@msu.edu', 'Female', '31.222.249.23', '2016-04-05 02:05:30'), +(4, 400, 'Ralph', 'Turner', 'rturner3@hp.com', 'Male', '157.83.76.114', '2016-08-08 00:06:51'), +(5, 500, 'Laura', 'Gonzales', 'lgonzales4@howstuffworks.com', 'Female', '30.54.105.168', '2016-09-01 08:25:38'), +(6, 600, 'Katherine', 'Lopez', 'klopez5@yahoo.co.jp', 'Female', '169.138.46.89', '2016-08-30 18:52:11'), +(7, 700, 'Jeremy', 'Hamilton', 'jhamilton6@mozilla.org', 'Male', '231.189.13.133', '2016-07-17 02:09:46'), +(8, 800, 'Heather', 'Rose', 'hrose7@goodreads.com', 'Female', '87.165.201.65', '2015-12-29 22:03:56'), +(9, 900, 'Gregory', 'Kelly', 'gkelly8@trellian.com', 'Male', '154.209.99.7', '2016-03-24 21:18:16'), +(10, 1000, 'Rachel', 'Lopez', 'rlopez9@themeforest.net', 'Female', '237.165.82.71', '2016-08-20 15:44:49'), +(11, 1100, 'Donna', 'Welch', 'dwelcha@shutterfly.com', 'Female', '103.33.110.138', '2016-02-27 01:41:48'), +(12, 1200, 'Russell', 'Lawrence', 'rlawrenceb@qq.com', 'Male', '189.115.73.4', '2016-06-11 03:07:09'), +(13, 1300, 'Michelle', 'Montgomery', 'mmontgomeryc@scientificamerican.com', 'Female', '243.220.95.82', '2016-06-18 16:27:19'), +(14, 1400, 'Walter', 'Castillo', 'wcastillod@pagesperso-orange.fr', 'Male', '71.159.238.196', '2016-10-06 01:55:44'), +(15, 1500, 'Robin', 'Mills', 'rmillse@vkontakte.ru', 'Female', '172.190.5.50', '2016-10-31 11:41:21'), +(16, 1600, 'Raymond', 'Holmes', 'rholmesf@usgs.gov', 'Male', '148.153.166.95', '2016-10-03 08:16:38'), +(17, 1700, 'Gary', 'Bishop', 'gbishopg@plala.or.jp', 'Male', '161.108.182.13', '2016-08-29 19:35:20'), +(18, 1800, 'Anna', 'Riley', 'arileyh@nasa.gov', 'Female', '253.31.108.22', '2015-12-11 04:34:27'), +(19, 1900, 'Sarah', 'Knight', 'sknighti@foxnews.com', 'Female', '222.220.3.177', '2016-09-26 00:49:06'), +(20, 2000, 'Phyllis', 'Fox', null, 'Female', '163.191.232.95', '2016-08-21 10:35:19'); +""" + +populate_multi_key_snapshot_expected_sql = """ +-- populate snapshot table +insert into {schema}.snapshot_expected ( + id1, + id2, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id1, + id2, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + null::timestamp as test_valid_to, + updated_at as test_updated_at, + md5(id1::text || '|' || id2::text || '|' || updated_at::text) as test_scd_id +from {schema}.seed; +""" + +model_seed_sql = """ +select * from {{target.database}}.{{target.schema}}.seed +""" + +snapshots_multi_key_yml = """ +snapshots: + - name: snapshot_actual + relation: "ref('seed')" + config: + strategy: timestamp + updated_at: updated_at + unique_key: + - id1 + - id2 + snapshot_meta_column_names: + dbt_valid_to: test_valid_to + dbt_valid_from: test_valid_from + dbt_scd_id: test_scd_id + dbt_updated_at: test_updated_at +""" + +invalidate_multi_key_sql = """ +-- update records 11 - 21. Change email and updated_at field +update {schema}.seed set + updated_at = updated_at + interval '1 hour', + email = case when id1 = 20 then 'pfoxj@creativecommons.org' else 'new_' || email end +where id1 >= 10 and id1 <= 20; + + +-- invalidate records 11 - 21 +update {schema}.snapshot_expected set + test_valid_to = updated_at + interval '1 hour' +where id1 >= 10 and id1 <= 20; + +""" + +update_multi_key_sql = """ +-- insert v2 of the 11 - 21 records + +insert into {schema}.snapshot_expected ( + id1, + id2, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id1, + id2, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + null::timestamp as test_valid_to, + updated_at as test_updated_at, + md5(id1::text || '|' || id2::text || '|' || updated_at::text) as test_scd_id +from {schema}.seed +where id1 >= 10 and id1 <= 20; +""" diff --git a/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/test_various_configs.py b/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/test_various_configs.py new file mode 100644 index 00000000..d4b162a9 --- /dev/null +++ b/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/test_various_configs.py @@ -0,0 +1,254 @@ +import datetime + +import pytest + +from dbt.tests.util import ( + check_relations_equal, + get_manifest, + run_dbt, + run_dbt_and_capture, + run_sql_with_adapter, + update_config_file, +) +from tests.functional.adapter.simple_snapshot.fixtures import ( + create_multi_key_seed_sql, + create_multi_key_snapshot_expected_sql, + create_seed_sql, + create_snapshot_expected_sql, + invalidate_multi_key_sql, + invalidate_sql, + model_seed_sql, + populate_multi_key_snapshot_expected_sql, + populate_snapshot_expected_sql, + populate_snapshot_expected_valid_to_current_sql, + ref_snapshot_sql, + seed_insert_sql, + seed_multi_key_insert_sql, + snapshot_actual_sql, + snapshots_multi_key_yml, + snapshots_no_column_names_yml, + snapshots_valid_to_current_yml, + snapshots_yml, + update_multi_key_sql, + update_sql, + update_with_current_sql, +) + + +class BaseSnapshotColumnNames: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": snapshots_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + def test_snapshot_column_names(self, project): + project.run_sql(create_seed_sql) + project.run_sql(create_snapshot_expected_sql) + project.run_sql(seed_insert_sql) + project.run_sql(populate_snapshot_expected_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + project.run_sql(invalidate_sql) + project.run_sql(update_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + + +class BaseSnapshotColumnNamesFromDbtProject: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": snapshots_no_column_names_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "snapshots": { + "test": { + "+snapshot_meta_column_names": { + "dbt_valid_to": "test_valid_to", + "dbt_valid_from": "test_valid_from", + "dbt_scd_id": "test_scd_id", + "dbt_updated_at": "test_updated_at", + } + } + } + } + + def test_snapshot_column_names_from_project(self, project): + project.run_sql(create_seed_sql) + project.run_sql(create_snapshot_expected_sql) + project.run_sql(seed_insert_sql) + project.run_sql(populate_snapshot_expected_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + project.run_sql(invalidate_sql) + project.run_sql(update_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + + +class BaseSnapshotInvalidColumnNames: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": snapshots_no_column_names_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "snapshots": { + "test": { + "+snapshot_meta_column_names": { + "dbt_valid_to": "test_valid_to", + "dbt_valid_from": "test_valid_from", + "dbt_scd_id": "test_scd_id", + "dbt_updated_at": "test_updated_at", + } + } + } + } + + def test_snapshot_invalid_column_names(self, project): + project.run_sql(create_seed_sql) + project.run_sql(create_snapshot_expected_sql) + project.run_sql(seed_insert_sql) + project.run_sql(populate_snapshot_expected_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + manifest = get_manifest(project.project_root) + snapshot_node = manifest.nodes["snapshot.test.snapshot_actual"] + snapshot_node.config.snapshot_meta_column_names == { + "dbt_valid_to": "test_valid_to", + "dbt_valid_from": "test_valid_from", + "dbt_scd_id": "test_scd_id", + "dbt_updated_at": "test_updated_at", + } + + project.run_sql(invalidate_sql) + project.run_sql(update_sql) + + # Change snapshot_meta_columns and look for an error + different_columns = { + "snapshots": { + "test": { + "+snapshot_meta_column_names": { + "dbt_valid_to": "test_valid_to", + "dbt_updated_at": "test_updated_at", + } + } + } + } + update_config_file(different_columns, "dbt_project.yml") + + results, log_output = run_dbt_and_capture(["snapshot"], expect_pass=False) + assert len(results) == 1 + assert "Compilation Error in snapshot snapshot_actual" in log_output + assert "Snapshot target is missing configured columns" in log_output + + +class BaseSnapshotDbtValidToCurrent: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": snapshots_valid_to_current_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + def test_valid_to_current(self, project): + project.run_sql(create_seed_sql) + project.run_sql(create_snapshot_expected_sql) + project.run_sql(seed_insert_sql) + project.run_sql(populate_snapshot_expected_valid_to_current_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + original_snapshot = run_sql_with_adapter( + project.adapter, + "select id, test_scd_id, test_valid_to from {schema}.snapshot_actual", + "all", + ) + assert original_snapshot[0][2] == datetime.datetime(2099, 12, 31, 0, 0) + assert original_snapshot[9][2] == datetime.datetime(2099, 12, 31, 0, 0) + + project.run_sql(invalidate_sql) + project.run_sql(update_with_current_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + updated_snapshot = run_sql_with_adapter( + project.adapter, + "select id, test_scd_id, test_valid_to from {schema}.snapshot_actual", + "all", + ) + assert updated_snapshot[0][2] == datetime.datetime(2099, 12, 31, 0, 0) + # Original row that was updated now has a non-current (2099/12/31) date + assert updated_snapshot[9][2] == datetime.datetime(2016, 8, 20, 16, 44, 49) + # Updated row has a current date + assert updated_snapshot[20][2] == datetime.datetime(2099, 12, 31, 0, 0) + + check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + + +# This uses snapshot_meta_column_names, yaml-only snapshot def, +# and multiple keys +class BaseSnapshotMultiUniqueKey: + @pytest.fixture(scope="class") + def models(self): + return { + "seed.sql": model_seed_sql, + "snapshots.yml": snapshots_multi_key_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + def test_multi_column_unique_key(self, project): + project.run_sql(create_multi_key_seed_sql) + project.run_sql(create_multi_key_snapshot_expected_sql) + project.run_sql(seed_multi_key_insert_sql) + project.run_sql(populate_multi_key_snapshot_expected_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + project.run_sql(invalidate_multi_key_sql) + project.run_sql(update_multi_key_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) diff --git a/dbt-tests-adapter/dbt/tests/adapter/utils/test_source_freshness_custom_info.py b/dbt-tests-adapter/dbt/tests/adapter/utils/test_source_freshness_custom_info.py new file mode 100644 index 00000000..b4f15dab --- /dev/null +++ b/dbt-tests-adapter/dbt/tests/adapter/utils/test_source_freshness_custom_info.py @@ -0,0 +1,70 @@ +from typing import Type +from unittest.mock import MagicMock + +from dbt_common.exceptions import DbtRuntimeError +import pytest + +from dbt.adapters.base.impl import BaseAdapter + + +class BaseCalculateFreshnessMethod: + """Tests the behavior of the calculate_freshness_from_customsql method for the relevant adapters. + + The base method is meant to throw the appropriate custom exception when calculate_freshness_from_customsql + fails. + """ + + @pytest.fixture(scope="class") + def valid_sql(self) -> str: + """Returns a valid statement for issuing as a validate_sql query. + + Ideally this would be checkable for non-execution. For example, we could use a + CREATE TABLE statement with an assertion that no table was created. However, + for most adapter types this is unnecessary - the EXPLAIN keyword has exactly the + behavior we want, and here we are essentially testing to make sure it is + supported. As such, we return a simple SELECT query, and leave it to + engine-specific test overrides to specify more detailed behavior as appropriate. + """ + + return "select now()" + + @pytest.fixture(scope="class") + def invalid_sql(self) -> str: + """Returns an invalid statement for issuing a bad validate_sql query.""" + + return "Let's run some invalid SQL and see if we get an error!" + + @pytest.fixture(scope="class") + def expected_exception(self) -> Type[Exception]: + """Returns the Exception type thrown by a failed query. + + Defaults to dbt_common.exceptions.DbtRuntimeError because that is the most common + base exception for adapters to throw.""" + return DbtRuntimeError + + @pytest.fixture(scope="class") + def mock_relation(self): + mock = MagicMock() + mock.__str__ = lambda x: "test.table" + return mock + + def test_calculate_freshness_from_custom_sql_success( + self, adapter: BaseAdapter, valid_sql: str, mock_relation + ) -> None: + with adapter.connection_named("test_freshness_custom_sql"): + adapter.calculate_freshness_from_custom_sql(mock_relation, valid_sql) + + def test_calculate_freshness_from_custom_sql_failure( + self, + adapter: BaseAdapter, + invalid_sql: str, + expected_exception: Type[Exception], + mock_relation, + ) -> None: + with pytest.raises(expected_exception=expected_exception): + with adapter.connection_named("test_infreshness_custom_sql"): + adapter.calculate_freshness_from_custom_sql(mock_relation, invalid_sql) + + +class TestCalculateFreshnessMethod(BaseCalculateFreshnessMethod): + pass diff --git a/dbt/adapters/__about__.py b/dbt/adapters/__about__.py index 134ed009..667df30e 100644 --- a/dbt/adapters/__about__.py +++ b/dbt/adapters/__about__.py @@ -1 +1 @@ -version = "1.12.0" +version = "1.13.0" diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index ae172635..8474b39d 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -97,6 +97,7 @@ GET_CATALOG_MACRO_NAME = "get_catalog" GET_CATALOG_RELATIONS_MACRO_NAME = "get_catalog_relations" FRESHNESS_MACRO_NAME = "collect_freshness" +CUSTOM_SQL_FRESHNESS_MACRO_NAME = "collect_freshness_custom_sql" GET_RELATION_LAST_MODIFIED_MACRO_NAME = "get_relation_last_modified" DEFAULT_BASE_BEHAVIOR_FLAGS = [ { @@ -1327,6 +1328,31 @@ def cancel_open_connections(self): """Cancel all open connections.""" return self.connections.cancel_open() + def _process_freshness_execution( + self, + macro_name: str, + kwargs: Dict[str, Any], + macro_resolver: Optional[MacroResolverProtocol] = None, + ) -> Tuple[Optional[AdapterResponse], FreshnessResponse]: + """Execute and process a freshness macro to generate a FreshnessResponse""" + import agate + + result = self.execute_macro(macro_name, kwargs=kwargs, macro_resolver=macro_resolver) + + if isinstance(result, agate.Table): + warn_or_error(CollectFreshnessReturnSignature()) + table = result + adapter_response = None + else: + adapter_response, table = result.response, result.table + + # Process the results table + if len(table) != 1 or len(table[0]) != 2: + raise MacroResultError(macro_name, table) + + freshness_response = self._create_freshness_response(table[0][0], table[0][1]) + return adapter_response, freshness_response + def calculate_freshness( self, source: BaseRelation, @@ -1335,49 +1361,26 @@ def calculate_freshness( macro_resolver: Optional[MacroResolverProtocol] = None, ) -> Tuple[Optional[AdapterResponse], FreshnessResponse]: """Calculate the freshness of sources in dbt, and return it""" - import agate - - kwargs: Dict[str, Any] = { + kwargs = { "source": source, "loaded_at_field": loaded_at_field, "filter": filter, } + return self._process_freshness_execution(FRESHNESS_MACRO_NAME, kwargs, macro_resolver) - # run the macro - # in older versions of dbt-core, the 'collect_freshness' macro returned the table of results directly - # starting in v1.5, by default, we return both the table and the adapter response (metadata about the query) - result: Union[ - AttrDict, # current: contains AdapterResponse + "agate.Table" - "agate.Table", # previous: just table - ] - result = self.execute_macro( - FRESHNESS_MACRO_NAME, kwargs=kwargs, macro_resolver=macro_resolver - ) - if isinstance(result, agate.Table): - warn_or_error(CollectFreshnessReturnSignature()) - adapter_response = None - table = result - else: - adapter_response, table = result.response, result.table # type: ignore[attr-defined] - # now we have a 1-row table of the maximum `loaded_at_field` value and - # the current time according to the db. - if len(table) != 1 or len(table[0]) != 2: - raise MacroResultError(FRESHNESS_MACRO_NAME, table) - if table[0][0] is None: - # no records in the table, so really the max_loaded_at was - # infinitely long ago. Just call it 0:00 January 1 year UTC - max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC) - else: - max_loaded_at = _utc(table[0][0], source, loaded_at_field) - - snapshotted_at = _utc(table[0][1], source, loaded_at_field) - age = (snapshotted_at - max_loaded_at).total_seconds() - freshness: FreshnessResponse = { - "max_loaded_at": max_loaded_at, - "snapshotted_at": snapshotted_at, - "age": age, + def calculate_freshness_from_custom_sql( + self, + source: BaseRelation, + sql: str, + macro_resolver: Optional[MacroResolverProtocol] = None, + ) -> Tuple[Optional[AdapterResponse], FreshnessResponse]: + kwargs = { + "source": source, + "loaded_at_query": sql, } - return adapter_response, freshness + return self._process_freshness_execution( + CUSTOM_SQL_FRESHNESS_MACRO_NAME, kwargs, macro_resolver + ) def calculate_freshness_from_metadata_batch( self, diff --git a/dbt/include/global_project/macros/adapters/freshness.sql b/dbt/include/global_project/macros/adapters/freshness.sql index f18499a2..1af6165c 100644 --- a/dbt/include/global_project/macros/adapters/freshness.sql +++ b/dbt/include/global_project/macros/adapters/freshness.sql @@ -14,3 +14,19 @@ {% endcall %} {{ return(load_result('collect_freshness')) }} {% endmacro %} + +{% macro collect_freshness_custom_sql(source, loaded_at_query) %} + {{ return(adapter.dispatch('collect_freshness_custom_sql', 'dbt')(source, loaded_at_query))}} +{% endmacro %} + +{% macro default__collect_freshness_custom_sql(source, loaded_at_query) %} + {% call statement('collect_freshness_custom_sql', fetch_result=True, auto_begin=False) -%} + with source_query as ( + {{ loaded_at_query }} + ) + select + (select * from source_query) as max_loaded_at, + {{ current_timestamp() }} as snapshotted_at + {% endcall %} + {{ return(load_result('collect_freshness_custom_sql')) }} +{% endmacro %} diff --git a/dbt/include/global_project/macros/adapters/show.sql b/dbt/include/global_project/macros/adapters/show.sql index 3a5faa98..fb17bb96 100644 --- a/dbt/include/global_project/macros/adapters/show.sql +++ b/dbt/include/global_project/macros/adapters/show.sql @@ -19,7 +19,7 @@ {%- endmacro -%} {% macro default__get_limit_sql(sql, limit) %} - {{ compiled_code }} + {{ sql }} {% if limit is not none %} limit {{ limit }} {%- endif -%} diff --git a/tests/unit/test_base_adapter.py b/tests/unit/test_base_adapter.py index 5fa109b7..3d763710 100644 --- a/tests/unit/test_base_adapter.py +++ b/tests/unit/test_base_adapter.py @@ -4,6 +4,12 @@ from dbt.adapters.base.impl import BaseAdapter, ConstraintSupport +from datetime import datetime +from unittest.mock import MagicMock, patch +import agate +import pytz +from dbt.adapters.contracts.connection import AdapterResponse + class TestBaseAdapterConstraintRendering: @pytest.fixture(scope="class") @@ -234,3 +240,145 @@ def test_render_raw_model_constraints_unsupported( rendered_constraints = BaseAdapter.render_raw_model_constraints(constraints) assert rendered_constraints == [] + + +class TestCalculateFreshnessFromCustomSQL: + @pytest.fixture + def adapter(self): + # Create mock config and context + config = MagicMock() + + # Create test adapter class that implements abstract methods + class TestAdapter(BaseAdapter): + def convert_boolean_type(self, *args, **kwargs): + return None + + def convert_date_type(self, *args, **kwargs): + return None + + def convert_datetime_type(self, *args, **kwargs): + return None + + def convert_number_type(self, *args, **kwargs): + return None + + def convert_text_type(self, *args, **kwargs): + return None + + def convert_time_type(self, *args, **kwargs): + return None + + def create_schema(self, *args, **kwargs): + return None + + def date_function(self, *args, **kwargs): + return None + + def drop_relation(self, *args, **kwargs): + return None + + def drop_schema(self, *args, **kwargs): + return None + + def expand_column_types(self, *args, **kwargs): + return None + + def get_columns_in_relation(self, *args, **kwargs): + return None + + def is_cancelable(self, *args, **kwargs): + return False + + def list_relations_without_caching(self, *args, **kwargs): + return [] + + def list_schemas(self, *args, **kwargs): + return [] + + def quote(self, *args, **kwargs): + return "" + + def rename_relation(self, *args, **kwargs): + return None + + def truncate_relation(self, *args, **kwargs): + return None + + return TestAdapter(config, MagicMock()) + + @pytest.fixture + def mock_relation(self): + mock = MagicMock() + mock.__str__ = lambda x: "test.table" + return mock + + @patch("dbt.adapters.base.BaseAdapter.execute_macro") + def test_calculate_freshness_from_customsql_success( + self, mock_execute_macro, adapter, mock_relation + ): + """Test successful freshness calculation from custom SQL""" + + # Setup test data + current_time = datetime.now(pytz.UTC) + last_modified = datetime(2023, 1, 1, tzinfo=pytz.UTC) + + # Create mock agate table with test data + mock_table = agate.Table.from_object( + [{"last_modified": last_modified, "snapshotted_at": current_time}] + ) + + # Configure mock execute_macro + mock_execute_macro.return_value = MagicMock( + response=AdapterResponse("SUCCESS"), table=mock_table + ) + + # Execute method under test + adapter_response, freshness_response = adapter.calculate_freshness_from_custom_sql( + source=mock_relation, sql="SELECT max(updated_at) as last_modified" + ) + + # Verify execute_macro was called correctly + mock_execute_macro.assert_called_once_with( + "collect_freshness_custom_sql", + kwargs={ + "source": mock_relation, + "loaded_at_query": "SELECT max(updated_at) as last_modified", + }, + macro_resolver=None, + ) + + # Verify adapter response + assert adapter_response._message == "SUCCESS" + + # Verify freshness response + assert freshness_response["max_loaded_at"] == last_modified + assert freshness_response["snapshotted_at"] == current_time + assert isinstance(freshness_response["age"], float) + + @patch("dbt.adapters.base.BaseAdapter.execute_macro") + def test_calculate_freshness_from_customsql_null_last_modified( + self, mock_execute_macro, adapter, mock_relation + ): + """Test freshness calculation when last_modified is NULL""" + + current_time = datetime.now(pytz.UTC) + + # Create mock table with NULL last_modified + mock_table = agate.Table.from_object( + [{"last_modified": None, "snapshotted_at": current_time}] + ) + + mock_execute_macro.return_value = MagicMock( + response=AdapterResponse("SUCCESS"), table=mock_table + ) + + # Execute method + _, freshness_response = adapter.calculate_freshness_from_custom_sql( + source=mock_relation, sql="SELECT max(updated_at) as last_modified" + ) + + # Verify NULL last_modified is handled by using datetime.min + expected_min_date = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC) + assert freshness_response["max_loaded_at"] == expected_min_date + assert freshness_response["snapshotted_at"] == current_time + assert isinstance(freshness_response["age"], float)