Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add retry to SQL-based alerting celery task #10542

Merged
merged 8 commits into from
Aug 10, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 46 additions & 27 deletions superset/tasks/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@
from retry.api import retry_call
from selenium.common.exceptions import WebDriverException
from selenium.webdriver import chrome, firefox
from sqlalchemy.orm import Session
from sqlalchemy.exc import NoSuchColumnError, ResourceClosedError
from werkzeug.http import parse_cookie

from superset import app, db, security_manager, thumbnail_cache
from superset.extensions import celery_app
from superset.models.alerts import Alert, AlertLog
from superset.models.core import Database
from superset.models.dashboard import Dashboard
from superset.models.schedules import (
EmailDeliveryType,
Expand All @@ -79,6 +80,7 @@
logger = logging.getLogger("tasks.email_reports")
logger.setLevel(logging.INFO)

stats_logger = current_app.config["STATS_LOGGER"]
EMAIL_PAGE_RENDER_WAIT = config["EMAIL_PAGE_RENDER_WAIT"]
WEBDRIVER_BASEURL = config["WEBDRIVER_BASEURL"]
WEBDRIVER_BASEURL_USER_FRIENDLY = config["WEBDRIVER_BASEURL_USER_FRIENDLY"]
Expand Down Expand Up @@ -533,6 +535,9 @@ def schedule_email_report( # pylint: disable=unused-argument
name="alerts.run_query",
bind=True,
soft_time_limit=config["EMAIL_ASYNC_TIME_LIMIT_SEC"],
autoretry_for=(NoSuchColumnError, ResourceClosedError,),
retry_kwargs={"max_retries": 5},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a todo and the link to the issue - to get rid of retry once the underlying issue is resolved

retry_backoff=True,
)
def schedule_alert_query( # pylint: disable=unused-argument
task: Task,
Expand All @@ -542,24 +547,33 @@ def schedule_alert_query( # pylint: disable=unused-argument
is_test_alert: Optional[bool] = False,
) -> None:
model_cls = get_scheduler_model(report_type)
dbsession = db.create_scoped_session()
schedule = dbsession.query(model_cls).get(schedule_id)

# The user may have disabled the schedule. If so, ignore this
if not schedule or not schedule.active:
logger.info("Ignoring deactivated alert")
return
try:
schedule = db.create_scoped_session().query(model_cls).get(schedule_id)

if report_type == ScheduleType.alert:
if is_test_alert and recipients:
deliver_alert(schedule.id, recipients)
# The user may have disabled the schedule. If so, ignore this
if not schedule or not schedule.active:
logger.info("Ignoring deactivated alert")
return

if run_alert_query(schedule.id, dbsession):
# deliver_dashboard OR deliver_slice
return
else:
raise RuntimeError("Unknown report type")
if report_type == ScheduleType.alert:
if is_test_alert and recipients:
deliver_alert(schedule.id, recipients)
return

if run_alert_query(
schedule.id, schedule.database_id, schedule.sql, schedule.label
):
# deliver_dashboard OR deliver_slice
return
else:
raise RuntimeError("Unknown report type")
except NoSuchColumnError as column_error:
stats_logger.incr("run_alert_task.failure.NoSuchColumnError")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the API follows a fixed pattern on statsd metrics: https://github.com/apache/incubator-superset/blob/master/superset/views/base_api.py#L173

So we use ..error|success|init. My take is that it's desirable to just incr an error counter (at the method level is granular enough) and also log the error with a more detailed message or exception

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case we want to differentiate between NoSuchColumnError & ResourceClosedError - still researching the error
s/failure/error - is a good idea
we should probably use snake case for the metric name

raise column_error
except ResourceClosedError as resource_error:
stats_logger.incr("run_alert_task.failure.ResourceClosedError")
raise resource_error


class AlertState:
Expand All @@ -569,7 +583,7 @@ class AlertState:


def deliver_alert(alert_id: int, recipients: Optional[str] = None) -> None:
alert = db.session.query(Alert).get(alert_id)
alert = db.create_scoped_session().query(Alert).get(alert_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting, @john-bodley submitted this: #10427

is this necessary because it's called by a celery task, and we want to make sure the session gets removed on task completion? What's the effect if this is called by a web request with proper session scope?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I'm actually going to revert this change and just use regular session. Both types of sessions cause errors with the underlying cause remaining unknown for now.


logging.info("Triggering alert: %s", alert)
img_data = None
Expand Down Expand Up @@ -618,51 +632,56 @@ def deliver_alert(alert_id: int, recipients: Optional[str] = None) -> None:
_deliver_email(recipients, deliver_as_group, subject, body, data, images)


def run_alert_query(alert_id: int, dbsession: Session) -> Optional[bool]:
def run_alert_query(
alert_id: int, database_id: int, sql: str, label: str
) -> Optional[bool]:
"""
Execute alert.sql and return value if any rows are returned
"""
alert = db.session.query(Alert).get(alert_id)

logger.info("Processing alert ID: %i", alert.id)
database = alert.database
dbsession = db.create_scoped_session()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we call it db_session

logger.info("Processing alert ID: %i", alert_id)
database = dbsession.query(Database).get(database_id)
if not database:
logger.error("Alert database not preset")
return None

if not alert.sql:
if not sql:
logger.error("Alert SQL not preset")
return None

parsed_query = ParsedQuery(alert.sql)
parsed_query = ParsedQuery(sql)
sql = parsed_query.stripped()

state = None
dttm_start = datetime.utcnow()

df = pd.DataFrame()
try:
logger.info("Evaluating SQL for alert %s", alert)
logger.info("Evaluating SQL for alert <%s:%s>", alert_id, label)
df = database.get_df(sql)
except Exception as exc: # pylint: disable=broad-except
state = AlertState.ERROR
logging.exception(exc)
logging.error("Failed at evaluating alert: %s (%s)", alert.label, alert.id)
logging.error("Failed at evaluating alert: %s (%s)", label, alert_id)

dttm_end = datetime.utcnow()
last_eval_dttm = datetime.utcnow()

if state != AlertState.ERROR:
alert.last_eval_dttm = datetime.utcnow()
if not df.empty:
# Looking for truthy cells
for row in df.to_records():
if any(row):
state = AlertState.TRIGGER
deliver_alert(alert.id)
deliver_alert(alert_id)
break
if not state:
state = AlertState.PASS

dbsession.commit()
alert = dbsession.query(Alert).get(alert_id)
if state != AlertState.ERROR:
alert.last_eval_dttm = last_eval_dttm
alert.last_state = state
alert.logs.append(
AlertLog(
Expand Down
130 changes: 66 additions & 64 deletions tests/alerts_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,41 +38,42 @@ def setup_database():
slice_id = db.session.query(Slice).all()[0].id
database_id = utils.get_example_database().id

alert1 = Alert(
id=1,
label="alert_1",
active=True,
crontab="*/1 * * * *",
sql="SELECT 0",
alert_type="email",
slice_id=slice_id,
database_id=database_id,
)
alert2 = Alert(
id=2,
label="alert_2",
active=True,
crontab="*/1 * * * *",
sql="SELECT 55",
alert_type="email",
slice_id=slice_id,
database_id=database_id,
)
alert3 = Alert(
id=3,
label="alert_3",
active=False,
crontab="*/1 * * * *",
sql="UPDATE 55",
alert_type="email",
slice_id=slice_id,
database_id=database_id,
)
alert4 = Alert(id=4, active=False, label="alert_4", database_id=-1)
alert5 = Alert(id=5, active=False, label="alert_5", database_id=database_id)

for num in range(1, 6):
eval(f"db.session.add(alert{num})")
alerts = [
Alert(
id=1,
label="alert_1",
active=True,
crontab="*/1 * * * *",
sql="SELECT 0",
alert_type="email",
slice_id=slice_id,
database_id=database_id,
),
Alert(
id=2,
label="alert_2",
active=True,
crontab="*/1 * * * *",
sql="SELECT 55",
alert_type="email",
slice_id=slice_id,
database_id=database_id,
),
Alert(
id=3,
label="alert_3",
active=False,
crontab="*/1 * * * *",
sql="UPDATE 55",
alert_type="email",
slice_id=slice_id,
database_id=database_id,
),
Alert(id=4, active=False, label="alert_4", database_id=-1),
Alert(id=5, active=False, label="alert_5", database_id=database_id),
]

db.session.bulk_save_objects(alerts)
db.session.commit()
yield db.session

Expand All @@ -82,45 +83,46 @@ def setup_database():

@patch("superset.tasks.schedules.deliver_alert")
@patch("superset.tasks.schedules.logging.Logger.error")
def test_run_alert_query(mock_error, mock_deliver, setup_database):
database = setup_database
run_alert_query(database.query(Alert).filter_by(id=1).one().id, database)
alert1 = database.query(Alert).filter_by(id=1).one()
assert mock_deliver.call_count == 0
assert len(alert1.logs) == 1
assert alert1.logs[0].alert_id == 1
assert alert1.logs[0].state == "pass"

run_alert_query(database.query(Alert).filter_by(id=2).one().id, database)
alert2 = database.query(Alert).filter_by(id=2).one()
assert mock_deliver.call_count == 1
assert len(alert2.logs) == 1
assert alert2.logs[0].alert_id == 2
assert alert2.logs[0].state == "trigger"

run_alert_query(database.query(Alert).filter_by(id=3).one().id, database)
alert3 = database.query(Alert).filter_by(id=3).one()
assert mock_deliver.call_count == 1
def test_run_alert_query(mock_error, mock_deliver_alert, setup_database):
dbsession = setup_database

# Test passing alert with null SQL result
alert1 = dbsession.query(Alert).filter_by(id=1).one()
run_alert_query(alert1.id, alert1.database_id, alert1.sql, alert1.label)
assert mock_deliver_alert.call_count == 0
assert mock_error.call_count == 0

# Test passing alert with True SQL result
alert2 = dbsession.query(Alert).filter_by(id=2).one()
run_alert_query(alert2.id, alert2.database_id, alert2.sql, alert2.label)
assert mock_deliver_alert.call_count == 1
assert mock_error.call_count == 0

# Test passing alert with error in SQL query
alert3 = dbsession.query(Alert).filter_by(id=3).one()
run_alert_query(alert3.id, alert3.database_id, alert3.sql, alert3.label)
assert mock_deliver_alert.call_count == 1
assert mock_error.call_count == 2
assert len(alert3.logs) == 1
assert alert3.logs[0].alert_id == 3
assert alert3.logs[0].state == "error"

run_alert_query(database.query(Alert).filter_by(id=4).one().id, database)
assert mock_deliver.call_count == 1
# Test passing alert with invalid database
alert4 = dbsession.query(Alert).filter_by(id=4).one()
run_alert_query(alert4.id, alert4.database_id, alert4.sql, alert4.label)
assert mock_deliver_alert.call_count == 1
assert mock_error.call_count == 3

run_alert_query(database.query(Alert).filter_by(id=5).one().id, database)
assert mock_deliver.call_count == 1
# Test passing alert with no SQL statement
alert5 = dbsession.query(Alert).filter_by(id=5).one()
run_alert_query(alert5.id, alert5.database_id, alert5.sql, alert5.label)
assert mock_deliver_alert.call_count == 1
assert mock_error.call_count == 4


@patch("superset.tasks.schedules.deliver_alert")
@patch("superset.tasks.schedules.run_alert_query")
def test_schedule_alert_query(mock_run_alert, mock_deliver_alert, setup_database):
database = setup_database
active_alert = database.query(Alert).filter_by(id=1).one()
inactive_alert = database.query(Alert).filter_by(id=3).one()
dbsession = setup_database
active_alert = dbsession.query(Alert).filter_by(id=1).one()
inactive_alert = dbsession.query(Alert).filter_by(id=3).one()

# Test that inactive alerts are no processed
schedule_alert_query(report_type=ScheduleType.alert, schedule_id=inactive_alert.id)
Expand Down