diff --git a/.changes/unreleased/Under the Hood-20241204-185912.yaml b/.changes/unreleased/Under the Hood-20241204-185912.yaml new file mode 100644 index 00000000..5c731703 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20241204-185912.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: Add retry logic for retryable exceptions. +time: 2024-12-04T18:59:12.48816-08:00 +custom: + Author: 'colin-rogers-dbt ' + Issue: "368" diff --git a/dbt/adapters/sql/connections.py b/dbt/adapters/sql/connections.py index baccddc9..04b5e401 100644 --- a/dbt/adapters/sql/connections.py +++ b/dbt/adapters/sql/connections.py @@ -1,6 +1,16 @@ import abc import time -from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, TYPE_CHECKING +from typing import ( + Any, + Dict, + Iterable, + Iterator, + List, + Optional, + Tuple, + TYPE_CHECKING, + Type, +) from dbt_common.events.contextvars import get_node_info from dbt_common.events.functions import fire_event @@ -18,6 +28,7 @@ SQLCommit, SQLQuery, SQLQueryStatus, + AdapterEventDebug, ) if TYPE_CHECKING: @@ -61,7 +72,50 @@ def add_query( auto_begin: bool = True, bindings: Optional[Any] = None, abridge_sql_log: bool = False, + retryable_exceptions: Tuple[Type[Exception], ...] = tuple(), + retry_limit: int = 1, ) -> Tuple[Connection, Any]: + """ + Retry function encapsulated here to avoid commitment to some + user-facing interface. Right now, Redshift commits to a 1 second + retry timeout so this serves as a default. + """ + + def _execute_query_with_retry( + cursor: Any, + sql: str, + bindings: Optional[Any], + retryable_exceptions: Tuple[Type[Exception], ...], + retry_limit: int, + attempt: int, + ): + """ + A success sees the try exit cleanly and avoid any recursive + retries. Failure begins a sleep and retry routine. + """ + try: + cursor.execute(sql, bindings) + except retryable_exceptions as e: + # Cease retries and fail when limit is hit. + if attempt >= retry_limit: + raise e + + fire_event( + AdapterEventDebug( + message=f"Got a retryable error {type(e)}. {retry_limit-attempt} retries left. Retrying in 1 second.\nError:\n{e}" + ) + ) + time.sleep(1) + + return _execute_query_with_retry( + cursor=cursor, + sql=sql, + bindings=bindings, + retryable_exceptions=retryable_exceptions, + retry_limit=retry_limit, + attempt=attempt + 1, + ) + connection = self.get_thread_connection() if auto_begin and connection.transaction_open is False: self.begin() @@ -90,7 +144,14 @@ def add_query( pre = time.perf_counter() cursor = connection.handle.cursor() - cursor.execute(sql, bindings) + _execute_query_with_retry( + cursor=cursor, + sql=sql, + bindings=bindings, + retryable_exceptions=retryable_exceptions, + retry_limit=retry_limit, + attempt=1, + ) result = self.get_response(cursor)