Skip to content

Commit

Permalink
fix(embedded): Retry when executing alert queries to avoid sending tr…
Browse files Browse the repository at this point in the history
…ansient errors to users as alert failure notifications (#20419)

Co-authored-by: Rui Zhao <[email protected]>
  • Loading branch information
zhaorui2022 and Rui Zhao authored Jul 5, 2022
1 parent 662bab1 commit 818962c
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 1 deletion.
3 changes: 3 additions & 0 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,9 @@ def SQL_QUERY_MUTATOR( # pylint: disable=invalid-name,unused-argument
# If set to true no notification is sent, the worker will just log a message.
# Useful for debugging
ALERT_REPORTS_NOTIFICATION_DRY_RUN = False
# Max tries to run queries to prevent false errors caused by transient errors
# being returned to users. Set to a value >1 to enable retries.
ALERT_REPORTS_QUERY_EXECUTION_MAX_TRIES = 1

# A custom prefix to use on all Alerts & Reports emails
EMAIL_REPORTS_SUBJECT_PREFIX = "[Report] "
Expand Down
9 changes: 8 additions & 1 deletion superset/reports/commands/alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
AlertValidatorConfigError,
)
from superset.utils.core import override_user
from superset.utils.retries import retry_call

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -171,7 +172,13 @@ def validate(self) -> None:
"""
Validate the query result as a Pandas DataFrame
"""
df = self._execute_query()
# When there are transient errors when executing queries, users will get
# notified with the error stacktrace which can be avoided by retrying
df = retry_call(
self._execute_query,
exception=AlertQueryError,
max_tries=app.config["ALERT_REPORTS_QUERY_EXECUTION_MAX_TRIES"],
)

if df.empty and self._is_validator_not_null:
self._result = None
Expand Down
119 changes: 119 additions & 0 deletions tests/integration_tests/reports/alert_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=invalid-name, unused-argument, import-outside-toplevel

import pandas as pd
from pytest_mock import MockFixture


def test_execute_query_succeeded_no_retry(
mocker: MockFixture, app_context: None
) -> None:

from superset.reports.commands.alert import AlertCommand

execute_query_mock = mocker.patch(
"superset.reports.commands.alert.AlertCommand._execute_query",
side_effect=lambda: pd.DataFrame([{"sample_col": 0}]),
)

command = AlertCommand(report_schedule=mocker.Mock())

command.validate()

assert execute_query_mock.call_count == 1


def test_execute_query_succeeded_with_retries(
mocker: MockFixture, app_context: None
) -> None:
from superset.reports.commands.alert import AlertCommand, AlertQueryError

execute_query_mock = mocker.patch(
"superset.reports.commands.alert.AlertCommand._execute_query"
)

query_executed_count = 0
# Should match the value defined in superset_test_config.py
expected_max_retries = 3

def _mocked_execute_query() -> pd.DataFrame:
nonlocal query_executed_count
query_executed_count += 1

if query_executed_count < expected_max_retries:
raise AlertQueryError()
else:
return pd.DataFrame([{"sample_col": 0}])

execute_query_mock.side_effect = _mocked_execute_query
execute_query_mock.__name__ = "mocked_execute_query"

command = AlertCommand(report_schedule=mocker.Mock())

command.validate()

assert execute_query_mock.call_count == expected_max_retries


def test_execute_query_failed_no_retry(mocker: MockFixture, app_context: None) -> None:
from superset.reports.commands.alert import AlertCommand, AlertQueryTimeout

execute_query_mock = mocker.patch(
"superset.reports.commands.alert.AlertCommand._execute_query"
)

def _mocked_execute_query() -> None:
raise AlertQueryTimeout

execute_query_mock.side_effect = _mocked_execute_query
execute_query_mock.__name__ = "mocked_execute_query"

command = AlertCommand(report_schedule=mocker.Mock())

try:
command.validate()
except AlertQueryTimeout:
pass

assert execute_query_mock.call_count == 1


def test_execute_query_failed_max_retries(
mocker: MockFixture, app_context: None
) -> None:
from superset.reports.commands.alert import AlertCommand, AlertQueryError

execute_query_mock = mocker.patch(
"superset.reports.commands.alert.AlertCommand._execute_query"
)

def _mocked_execute_query() -> None:
raise AlertQueryError

execute_query_mock.side_effect = _mocked_execute_query
execute_query_mock.__name__ = "mocked_execute_query"

command = AlertCommand(report_schedule=mocker.Mock())

try:
command.validate()
except AlertQueryError:
pass

# Should match the value defined in superset_test_config.py
assert execute_query_mock.call_count == 3
2 changes: 2 additions & 0 deletions tests/integration_tests/superset_test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ def GET_FEATURE_FLAGS_FUNC(ff):

ALERT_REPORTS_WORKING_TIME_OUT_KILL = True

ALERT_REPORTS_QUERY_EXECUTION_MAX_TRIES = 3


class CeleryConfig(object):
BROKER_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_CELERY_DB}"
Expand Down

0 comments on commit 818962c

Please sign in to comment.