From 477d74718adb9b3d68621910af061891b9bdcc17 Mon Sep 17 00:00:00 2001 From: majorosdonat Date: Tue, 15 Oct 2024 17:50:56 +0200 Subject: [PATCH] Add retry on error 502 and 504 (#42994) * Add retry on error 502 and 504 * fix mypy findings * Add pytest * Convert response code to HTTPStatus * Add docs to retriable exception * extend docs for AirflowHttpException * Fix syntax and typos * fix pytest * fix static checks * fix some static checks * Fix ruff * fix pre-commit --------- Co-authored-by: Majoros Donat (XC-DX/EET2-Bp) --- airflow/api_internal/internal_api_call.py | 33 ++++++++++++++++++-- tests/api_internal/test_internal_api_call.py | 23 ++++++++++++++ 2 files changed, 53 insertions(+), 3 deletions(-) diff --git a/airflow/api_internal/internal_api_call.py b/airflow/api_internal/internal_api_call.py index 8838377877bec..064834d7c8673 100644 --- a/airflow/api_internal/internal_api_call.py +++ b/airflow/api_internal/internal_api_call.py @@ -21,6 +21,7 @@ import json import logging from functools import wraps +from http import HTTPStatus from typing import Callable, TypeVar from urllib.parse import urlparse @@ -40,6 +41,14 @@ logger = logging.getLogger(__name__) +class AirflowHttpException(AirflowException): + """Raise when there is a problem during an http request on the internal API decorator.""" + + def __init__(self, message: str, status_code: HTTPStatus): + super().__init__(message) + self.status_code = status_code + + class InternalApiConfig: """Stores and caches configuration for Internal API.""" @@ -105,10 +114,27 @@ def internal_api_call(func: Callable[PS, RT]) -> Callable[PS, RT]: """ from requests.exceptions import ConnectionError + def _is_retryable_exception(exception: BaseException) -> bool: + """ + Evaluate which exception types to retry. + + This is especially demanded for cases where an application gateway or Kubernetes ingress can + not find a running instance of a webserver hosting the API (HTTP 502+504) or when the + HTTP request fails in general on network level. + + Note that we want to fail on other general errors on the webserver not to send bad requests in an endless loop. + """ + retryable_status_codes = (HTTPStatus.BAD_GATEWAY, HTTPStatus.GATEWAY_TIMEOUT) + return ( + isinstance(exception, AirflowHttpException) + and exception.status_code in retryable_status_codes + or isinstance(exception, (ConnectionError, NewConnectionError)) + ) + @tenacity.retry( stop=tenacity.stop_after_attempt(10), wait=tenacity.wait_exponential(min=1), - retry=tenacity.retry_if_exception_type((NewConnectionError, ConnectionError)), + retry=tenacity.retry_if_exception(_is_retryable_exception), before_sleep=tenacity.before_log(logger, logging.WARNING), ) def make_jsonrpc_request(method_name: str, params_json: str) -> bytes: @@ -126,9 +152,10 @@ def make_jsonrpc_request(method_name: str, params_json: str) -> bytes: internal_api_endpoint = InternalApiConfig.get_internal_api_endpoint() response = requests.post(url=internal_api_endpoint, data=json.dumps(data), headers=headers) if response.status_code != 200: - raise AirflowException( + raise AirflowHttpException( f"Got {response.status_code}:{response.reason} when sending " - f"the internal api request: {response.text}" + f"the internal api request: {response.text}", + HTTPStatus(response.status_code), ) return response.content diff --git a/tests/api_internal/test_internal_api_call.py b/tests/api_internal/test_internal_api_call.py index 880e1e89b0388..ff9c01e3b554d 100644 --- a/tests/api_internal/test_internal_api_call.py +++ b/tests/api_internal/test_internal_api_call.py @@ -25,6 +25,7 @@ import pytest import requests +from tenacity import RetryError from airflow.__main__ import configure_internal_api from airflow.api_internal.internal_api_call import InternalApiConfig, internal_api_call @@ -266,6 +267,28 @@ def test_remote_classmethod_call_with_params(self, mock_requests): assert call_kwargs["headers"]["Content-Type"] == "application/json" assert "Authorization" in call_kwargs["headers"] + @conf_vars( + { + ("core", "database_access_isolation"): "true", + ("core", "internal_api_url"): "http://localhost:8888", + ("database", "sql_alchemy_conn"): "none://", + } + ) + @mock.patch("airflow.api_internal.internal_api_call.requests") + @mock.patch("tenacity.time.sleep") + def test_retry_on_bad_gateway(self, mock_sleep, mock_requests): + configure_internal_api(Namespace(subcommand="dag-processor"), conf) + response = requests.Response() + response.status_code = 502 + response.reason = "Bad Gateway" + response._content = b"Bad Gateway" + + mock_sleep = lambda *_, **__: None # noqa: F841 + mock_requests.post.return_value = response + with pytest.raises(RetryError): + TestInternalApiCall.fake_method_with_params("fake-dag", task_id=123, session="session") + assert mock_requests.post.call_count == 10 + @conf_vars( { ("core", "database_access_isolation"): "true",