Skip to content

Commit

Permalink
Merge branch 'fix_null_equality_110' of https://github.com/dbt-labs/d…
Browse files Browse the repository at this point in the history
…bt-adapters into fix_null_equality_110
  • Loading branch information
adrianburusdbt committed Dec 19, 2024
2 parents e8843fb + cc9fb55 commit 6fd7240
Show file tree
Hide file tree
Showing 14 changed files with 412 additions and 59 deletions.
10 changes: 10 additions & 0 deletions .changes/1.11.0.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
## dbt-adapters 1.11.0 - December 17, 2024

### Features

- Add new hard_deletes="new_record" mode for snapshots. ([#317](https://github.com/dbt-labs/dbt-adapters/issues/317))
- Introduce new Capability for MicrobatchConcurrency support ([#359](https://github.com/dbt-labs/dbt-adapters/issues/359))

### Under the Hood

- Add retry logic for retryable exceptions. ([#368](https://github.com/dbt-labs/dbt-adapters/issues/368))
1 change: 1 addition & 0 deletions .changes/1.12.0.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
## dbt-adapters 1.12.0 - December 18, 2024
6 changes: 0 additions & 6 deletions .changes/unreleased/Features-20241104-120653.yaml

This file was deleted.

6 changes: 0 additions & 6 deletions .changes/unreleased/Features-20241120-112806.yaml

This file was deleted.

6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241216-172047.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add function to run custom sql for getting freshness info
time: 2024-12-16T17:20:47.065611-08:00
custom:
Author: ChenyuLInx
Issue: "8797"
3 changes: 3 additions & 0 deletions .github/workflows/_generate-changelog.yml
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ jobs:
if: ${{ needs.temp-branch.outputs.name != '' && inputs.merge }}
runs-on: ${{ vars.DEFAULT_RUNNER }}
steps:
- uses: actions/checkout@v4
with:
ref: ${{ needs.temp-branch.outputs.name }}
- uses: everlytic/[email protected]
with:
source_ref: ${{ needs.temp-branch.outputs.name }}
Expand Down
44 changes: 39 additions & 5 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,45 @@ jobs:
branch: ${{ needs.generate-changelog.outputs.branch-name }}
secrets: inherit

publish-pypi:
package:
if: ${{ inputs.pypi-public == true }}
needs: generate-changelog
uses: ./.github/workflows/_publish-pypi.yml
uses: ./.github/workflows/_package-directory.yml
with:
package: ${{ inputs.package }}
deploy-to: ${{ inputs.deploy-to }}
branch: ${{ needs.generate-changelog.outputs.branch-name }}

publish-pypi:
if: ${{ inputs.pypi-public == true }}
needs: [package, generate-changelog]
runs-on: ${{ vars.DEFAULT_RUNNER }}
environment:
name: ${{ inputs.deploy-to }}
url: ${{ vars.PYPI_PROJECT_URL }}/${{ inputs.package }}
permissions:
# this permission is required for trusted publishing
# see https://github.com/marketplace/actions/pypi-publish
id-token: write
steps:
- uses: actions/checkout@v4
with:
ref: ${{ needs.generate-changelog.outputs.branch-name }}
- uses: actions/setup-python@v5
with:
python-version: ${{ vars.DEFAULT_PYTHON_VERSION }}
- uses: pypa/hatch@install
# hatch will build using test PyPI first and fall back to prod PyPI when deploying to test
# this is done via environment variables in the test environment in GitHub
- run: hatch build && hatch run build:check-all
working-directory: ./${{ needs.package.outputs.directory }}
- uses: pypa/gh-action-pypi-publish@release/v1
with:
repository-url: ${{ vars.PYPI_REPOSITORY_URL }}
packages-dir: ./${{ needs.package.outputs.directory }}dist/
- id: version
run: echo "version=$(hatch version)" >> $GITHUB_OUTPUT
working-directory: ./${{ needs.package.outputs.directory }}
- uses: nick-fields/retry@v3
with:
timeout_seconds: 10
retry_wait_seconds: 10
max_attempts: 15 # 5 minutes: (10s timeout + 10s delay) * 15 attempts
command: wget ${{ vars.PYPI_PROJECT_URL }}/${{ steps.version.outputs.version }}
17 changes: 15 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,21 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html),
and is generated by [Changie](https://github.com/miniscruff/changie).

## dbt-adapters 1.12.0 - December 18, 2024



## dbt-adapters 1.11.0 - December 17, 2024

### Features

- Add new hard_deletes="new_record" mode for snapshots. ([#317](https://github.com/dbt-labs/dbt-adapters/issues/317))
- Introduce new Capability for MicrobatchConcurrency support ([#359](https://github.com/dbt-labs/dbt-adapters/issues/359))

### Under the Hood

- Add retry logic for retryable exceptions. ([#368](https://github.com/dbt-labs/dbt-adapters/issues/368))

## dbt-adapters 1.10.4 - November 11, 2024

### Features
Expand Down Expand Up @@ -37,8 +52,6 @@ and is generated by [Changie](https://github.com/miniscruff/changie).
- Negate the check for microbatch behavior flag in determining builtins ([#349](https://github.com/dbt-labs/dbt-adapters/issues/349))
- Move require_batched_execution_for_custom_microbatch_strategy flag to global ([#351](https://github.com/dbt-labs/dbt-adapters/issues/351))



## dbt-adapters 1.8.0 - October 29, 2024

### Fixes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from typing import Type
from unittest.mock import MagicMock

from dbt_common.exceptions import DbtRuntimeError
import pytest

from dbt.adapters.base.impl import BaseAdapter


class BaseCalculateFreshnessMethod:
"""Tests the behavior of the calculate_freshness_from_customsql method for the relevant adapters.
The base method is meant to throw the appropriate custom exception when calculate_freshness_from_customsql
fails.
"""

@pytest.fixture(scope="class")
def valid_sql(self) -> str:
"""Returns a valid statement for issuing as a validate_sql query.
Ideally this would be checkable for non-execution. For example, we could use a
CREATE TABLE statement with an assertion that no table was created. However,
for most adapter types this is unnecessary - the EXPLAIN keyword has exactly the
behavior we want, and here we are essentially testing to make sure it is
supported. As such, we return a simple SELECT query, and leave it to
engine-specific test overrides to specify more detailed behavior as appropriate.
"""

return "select now()"

@pytest.fixture(scope="class")
def invalid_sql(self) -> str:
"""Returns an invalid statement for issuing a bad validate_sql query."""

return "Let's run some invalid SQL and see if we get an error!"

@pytest.fixture(scope="class")
def expected_exception(self) -> Type[Exception]:
"""Returns the Exception type thrown by a failed query.
Defaults to dbt_common.exceptions.DbtRuntimeError because that is the most common
base exception for adapters to throw."""
return DbtRuntimeError

@pytest.fixture(scope="class")
def mock_relation(self):
mock = MagicMock()
mock.__str__ = lambda x: "test.table"
return mock

def test_calculate_freshness_from_custom_sql_success(
self, adapter: BaseAdapter, valid_sql: str, mock_relation
) -> None:
with adapter.connection_named("test_freshness_custom_sql"):
adapter.calculate_freshness_from_custom_sql(mock_relation, valid_sql)

def test_calculate_freshness_from_custom_sql_failure(
self,
adapter: BaseAdapter,
invalid_sql: str,
expected_exception: Type[Exception],
mock_relation,
) -> None:
with pytest.raises(expected_exception=expected_exception):
with adapter.connection_named("test_infreshness_custom_sql"):
adapter.calculate_freshness_from_custom_sql(mock_relation, invalid_sql)


class TestCalculateFreshnessMethod(BaseCalculateFreshnessMethod):
pass
2 changes: 1 addition & 1 deletion dbt/adapters/__about__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.10.4"
version = "1.12.0"
77 changes: 40 additions & 37 deletions dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
GET_CATALOG_MACRO_NAME = "get_catalog"
GET_CATALOG_RELATIONS_MACRO_NAME = "get_catalog_relations"
FRESHNESS_MACRO_NAME = "collect_freshness"
CUSTOM_SQL_FRESHNESS_MACRO_NAME = "collect_freshness_custom_sql"
GET_RELATION_LAST_MODIFIED_MACRO_NAME = "get_relation_last_modified"
DEFAULT_BASE_BEHAVIOR_FLAGS = [
{
Expand Down Expand Up @@ -1327,6 +1328,31 @@ def cancel_open_connections(self):
"""Cancel all open connections."""
return self.connections.cancel_open()

def _process_freshness_execution(
self,
macro_name: str,
kwargs: Dict[str, Any],
macro_resolver: Optional[MacroResolverProtocol] = None,
) -> Tuple[Optional[AdapterResponse], FreshnessResponse]:
"""Execute and process a freshness macro to generate a FreshnessResponse"""
import agate

result = self.execute_macro(macro_name, kwargs=kwargs, macro_resolver=macro_resolver)

if isinstance(result, agate.Table):
warn_or_error(CollectFreshnessReturnSignature())
table = result
adapter_response = None
else:
adapter_response, table = result.response, result.table

# Process the results table
if len(table) != 1 or len(table[0]) != 2:
raise MacroResultError(macro_name, table)

freshness_response = self._create_freshness_response(table[0][0], table[0][1])
return adapter_response, freshness_response

def calculate_freshness(
self,
source: BaseRelation,
Expand All @@ -1335,49 +1361,26 @@ def calculate_freshness(
macro_resolver: Optional[MacroResolverProtocol] = None,
) -> Tuple[Optional[AdapterResponse], FreshnessResponse]:
"""Calculate the freshness of sources in dbt, and return it"""
import agate

kwargs: Dict[str, Any] = {
kwargs = {
"source": source,
"loaded_at_field": loaded_at_field,
"filter": filter,
}
return self._process_freshness_execution(FRESHNESS_MACRO_NAME, kwargs, macro_resolver)

# run the macro
# in older versions of dbt-core, the 'collect_freshness' macro returned the table of results directly
# starting in v1.5, by default, we return both the table and the adapter response (metadata about the query)
result: Union[
AttrDict, # current: contains AdapterResponse + "agate.Table"
"agate.Table", # previous: just table
]
result = self.execute_macro(
FRESHNESS_MACRO_NAME, kwargs=kwargs, macro_resolver=macro_resolver
)
if isinstance(result, agate.Table):
warn_or_error(CollectFreshnessReturnSignature())
adapter_response = None
table = result
else:
adapter_response, table = result.response, result.table # type: ignore[attr-defined]
# now we have a 1-row table of the maximum `loaded_at_field` value and
# the current time according to the db.
if len(table) != 1 or len(table[0]) != 2:
raise MacroResultError(FRESHNESS_MACRO_NAME, table)
if table[0][0] is None:
# no records in the table, so really the max_loaded_at was
# infinitely long ago. Just call it 0:00 January 1 year UTC
max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC)
else:
max_loaded_at = _utc(table[0][0], source, loaded_at_field)

snapshotted_at = _utc(table[0][1], source, loaded_at_field)
age = (snapshotted_at - max_loaded_at).total_seconds()
freshness: FreshnessResponse = {
"max_loaded_at": max_loaded_at,
"snapshotted_at": snapshotted_at,
"age": age,
def calculate_freshness_from_custom_sql(
self,
source: BaseRelation,
sql: str,
macro_resolver: Optional[MacroResolverProtocol] = None,
) -> Tuple[Optional[AdapterResponse], FreshnessResponse]:
kwargs = {
"source": source,
"loaded_at_query": sql,
}
return adapter_response, freshness
return self._process_freshness_execution(
CUSTOM_SQL_FRESHNESS_MACRO_NAME, kwargs, macro_resolver
)

def calculate_freshness_from_metadata_batch(
self,
Expand Down
65 changes: 63 additions & 2 deletions dbt/adapters/sql/connections.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
import abc
import time
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, TYPE_CHECKING
from typing import (
Any,
Dict,
Iterable,
Iterator,
List,
Optional,
Tuple,
TYPE_CHECKING,
Type,
)

from dbt_common.events.contextvars import get_node_info
from dbt_common.events.functions import fire_event
Expand All @@ -18,6 +28,7 @@
SQLCommit,
SQLQuery,
SQLQueryStatus,
AdapterEventDebug,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -61,7 +72,50 @@ def add_query(
auto_begin: bool = True,
bindings: Optional[Any] = None,
abridge_sql_log: bool = False,
retryable_exceptions: Tuple[Type[Exception], ...] = tuple(),
retry_limit: int = 1,
) -> Tuple[Connection, Any]:
"""
Retry function encapsulated here to avoid commitment to some
user-facing interface. Right now, Redshift commits to a 1 second
retry timeout so this serves as a default.
"""

def _execute_query_with_retry(
cursor: Any,
sql: str,
bindings: Optional[Any],
retryable_exceptions: Tuple[Type[Exception], ...],
retry_limit: int,
attempt: int,
):
"""
A success sees the try exit cleanly and avoid any recursive
retries. Failure begins a sleep and retry routine.
"""
try:
cursor.execute(sql, bindings)
except retryable_exceptions as e:
# Cease retries and fail when limit is hit.
if attempt >= retry_limit:
raise e

fire_event(
AdapterEventDebug(
message=f"Got a retryable error {type(e)}. {retry_limit-attempt} retries left. Retrying in 1 second.\nError:\n{e}"
)
)
time.sleep(1)

return _execute_query_with_retry(
cursor=cursor,
sql=sql,
bindings=bindings,
retryable_exceptions=retryable_exceptions,
retry_limit=retry_limit,
attempt=attempt + 1,
)

connection = self.get_thread_connection()
if auto_begin and connection.transaction_open is False:
self.begin()
Expand Down Expand Up @@ -90,7 +144,14 @@ def add_query(
pre = time.perf_counter()

cursor = connection.handle.cursor()
cursor.execute(sql, bindings)
_execute_query_with_retry(
cursor=cursor,
sql=sql,
bindings=bindings,
retryable_exceptions=retryable_exceptions,
retry_limit=retry_limit,
attempt=1,
)

result = self.get_response(cursor)

Expand Down
Loading

0 comments on commit 6fd7240

Please sign in to comment.