Skip to content

Commit

Permalink
support retryable exceptions during query execution (#368)
Browse files Browse the repository at this point in the history
Co-authored-by: VersusFacit <[email protected]>
Co-authored-by: Mike Alfare <[email protected]>
  • Loading branch information
3 people authored Dec 17, 2024
1 parent e3964d7 commit a8f34ae
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 2 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20241204-185912.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Add retry logic for retryable exceptions.
time: 2024-12-04T18:59:12.48816-08:00
custom:
Author: 'colin-rogers-dbt '
Issue: "368"
65 changes: 63 additions & 2 deletions dbt/adapters/sql/connections.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
import abc
import time
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, TYPE_CHECKING
from typing import (
Any,
Dict,
Iterable,
Iterator,
List,
Optional,
Tuple,
TYPE_CHECKING,
Type,
)

from dbt_common.events.contextvars import get_node_info
from dbt_common.events.functions import fire_event
Expand All @@ -18,6 +28,7 @@
SQLCommit,
SQLQuery,
SQLQueryStatus,
AdapterEventDebug,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -61,7 +72,50 @@ def add_query(
auto_begin: bool = True,
bindings: Optional[Any] = None,
abridge_sql_log: bool = False,
retryable_exceptions: Tuple[Type[Exception], ...] = tuple(),
retry_limit: int = 1,
) -> Tuple[Connection, Any]:
"""
Retry function encapsulated here to avoid commitment to some
user-facing interface. Right now, Redshift commits to a 1 second
retry timeout so this serves as a default.
"""

def _execute_query_with_retry(
cursor: Any,
sql: str,
bindings: Optional[Any],
retryable_exceptions: Tuple[Type[Exception], ...],
retry_limit: int,
attempt: int,
):
"""
A success sees the try exit cleanly and avoid any recursive
retries. Failure begins a sleep and retry routine.
"""
try:
cursor.execute(sql, bindings)
except retryable_exceptions as e:
# Cease retries and fail when limit is hit.
if attempt >= retry_limit:
raise e

fire_event(
AdapterEventDebug(
message=f"Got a retryable error {type(e)}. {retry_limit-attempt} retries left. Retrying in 1 second.\nError:\n{e}"
)
)
time.sleep(1)

return _execute_query_with_retry(
cursor=cursor,
sql=sql,
bindings=bindings,
retryable_exceptions=retryable_exceptions,
retry_limit=retry_limit,
attempt=attempt + 1,
)

connection = self.get_thread_connection()
if auto_begin and connection.transaction_open is False:
self.begin()
Expand Down Expand Up @@ -90,7 +144,14 @@ def add_query(
pre = time.perf_counter()

cursor = connection.handle.cursor()
cursor.execute(sql, bindings)
_execute_query_with_retry(
cursor=cursor,
sql=sql,
bindings=bindings,
retryable_exceptions=retryable_exceptions,
retry_limit=retry_limit,
attempt=1,
)

result = self.get_response(cursor)

Expand Down

0 comments on commit a8f34ae

Please sign in to comment.