Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-4343] Show warning in UI if scheduler is not running #5127

Merged
merged 1 commit into from
May 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 36 additions & 6 deletions airflow/jobs/base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@

from sqlalchemy import Column, Index, Integer, String, and_, or_
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm.session import make_transient
from sqlalchemy.orm.session import make_transient, Session
from typing import Optional

from airflow import configuration as conf
from airflow import executors, models
Expand Down Expand Up @@ -71,25 +72,54 @@ class BaseJob(Base, LoggingMixin):
Index('idx_job_state_heartbeat', state, latest_heartbeat),
)

heartrate = conf.getfloat('scheduler', 'JOB_HEARTBEAT_SEC')

def __init__(
self,
executor=executors.get_default_executor(),
heartrate=conf.getfloat('scheduler', 'JOB_HEARTBEAT_SEC'),
heartrate=None,
*args, **kwargs):
self.hostname = get_hostname()
self.executor = executor
self.executor_class = executor.__class__.__name__
self.start_date = timezone.utcnow()
self.latest_heartbeat = timezone.utcnow()
self.heartrate = heartrate
if heartrate is not None:
self.heartrate = heartrate
self.unixname = getpass.getuser()
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
super().__init__(*args, **kwargs)

def is_alive(self):
@classmethod
@provide_session
def most_recent_job(cls, session: Session) -> Optional['BaseJob']:
"""
Return the most recent job of this type, if any, based on last
heartbeat received.

This method should be called on a subclass (i.e. on SchedulerJob) to
return jobs of that type.

:param session: Database session
:rtype: BaseJob or None
"""
return session.query(cls).order_by(cls.latest_heartbeat.desc()).limit(1).first()

def is_alive(self, grace_multiplier=2.1):
"""
Is this job currently alive.

We define alive as in a state of RUNNING, and having sent a heartbeat
within a multiple of the heartrate (default of 2.1)

:param grace_multiplier: multiplier of heartrate to require heart beat
within
:type grace_multiplier: number
:rtype: boolean
"""
return (
(timezone.utcnow() - self.latest_heartbeat).seconds <
(conf.getint('scheduler', 'JOB_HEARTBEAT_SEC') * 2.1)
self.state == State.RUNNING and
(timezone.utcnow() - self.latest_heartbeat).seconds < self.heartrate * grace_multiplier
)

@provide_session
Expand Down
23 changes: 22 additions & 1 deletion airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ class SchedulerJob(BaseJob):
__mapper_args__ = {
'polymorphic_identity': 'SchedulerJob'
}
heartrate = conf.getint('scheduler', 'SCHEDULER_HEARTBEAT_SEC')

def __init__(
self,
Expand Down Expand Up @@ -336,7 +337,6 @@ def __init__(
self.do_pickle = do_pickle
super().__init__(*args, **kwargs)

self.heartrate = conf.getint('scheduler', 'SCHEDULER_HEARTBEAT_SEC')
self.max_threads = conf.getint('scheduler', 'max_threads')

if log:
Expand All @@ -362,6 +362,27 @@ def _exit_gracefully(self, signum, frame):
self.processor_agent.end()
sys.exit(os.EX_OK)

def is_alive(self, grace_multiplier=None):
"""
Is this SchedulerJob alive?

We define alive as in a state of running and a heartbeat within the
threshold defined in the ``scheduler_health_check_threshold`` config
setting.

``grace_multiplier`` is accepted for compatibility with the parent class.

:rtype: boolean
"""
if grace_multiplier is not None:
# Accept the same behaviour as superclass
return super().is_alive(grace_multiplier=grace_multiplier)
scheduler_health_check_threshold = conf.getint('scheduler', 'scheduler_health_check_threshold')
return (
self.state == State.RUNNING and
(timezone.utcnow() - self.latest_heartbeat).seconds < scheduler_health_check_threshold
)

@provide_session
def manage_slas(self, dag, session=None):
"""
Expand Down
17 changes: 17 additions & 0 deletions airflow/macros/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,23 @@ def ds_format(ds, input_format, output_format):
return datetime.strptime(ds, input_format).strftime(output_format)


def datetime_diff_for_humans(dt, since=None):
"""
Return a human-readable/approximate difference between two datetimes, or
one and now.

:param dt: The datetime to display the diff for
:type dt: datetime
:param since: When to display the date from. If ``None`` then the diff is
between ``dt`` and now.
:type since: None or datetime
:rtype: str
"""
import pendulum

return pendulum.instance(dt).diff_for_humans(since)


def _integrate_plugins():
"""Integrate plugins to the context"""
import sys
Expand Down
11 changes: 11 additions & 0 deletions airflow/www/templates/appbuilder/baselayout.html
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@
<div class="row">
{% block messages %}
{% include 'appbuilder/flash.html' %}
{% if scheduler_job is defined and (not scheduler_job or not scheduler_job.is_alive()) %}
<div class="alert alert-warning">
<p>The scheduler does not appear to be running.
{% if scheduler_job %}
Last heartbeat was received <time title="{{ scheduler_job.latest_heartbeat.isoformat() }}" datetime="{{ scheduler_job.latest_heartbeat.isoformat() }}">{{ macros.datetime_diff_for_humans(scheduler_job.latest_heartbeat) }}</time>.
{% endif %}
</p>
<p>The DAGs list may not update, and new tasks will not be scheduled.</p>
</div>
{% endif %}
{% endblock %}
{% block content %}
{% endblock %}
Expand All @@ -70,6 +80,7 @@
// below variables are used in base.js
var hostName = '{{ hostname }}';
var csrfToken = '{{ csrf_token() }}';
$("time[title]").tooltip()
</script>
<script src="{{ url_for_asset('base.js') }}" type="text/javascript"></script>
{% endblock %}
44 changes: 26 additions & 18 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@
from flask_appbuilder.actions import action
from flask_appbuilder.models.sqla.filters import BaseFilter
from flask_babel import lazy_gettext
import lazy_object_proxy
from pygments import highlight, lexers
from pygments.formatters import HtmlFormatter
from sqlalchemy import func, or_, desc, and_, union_all
from sqlalchemy import or_, desc, and_, union_all
from wtforms import SelectField, validators

import airflow
Expand Down Expand Up @@ -150,6 +151,19 @@ def show_traceback(error):
class AirflowBaseView(BaseView):
route_base = ''

# Make our macros available to our UI templates too.
extra_args = {
'macros': airflow.macros,
}

def render_template(self, *args, **kwargs):
return super().render_template(
*args,
# Cache this at most once per request, not for the lifetime of the view instance
scheduler_job=lazy_object_proxy.Proxy(jobs.SchedulerJob.most_recent_job),
**kwargs
)


class Airflow(AirflowBaseView):
@expose('/health')
Expand All @@ -160,31 +174,25 @@ def health(self, session=None):
including metadatabase and scheduler.
"""

BJ = jobs.BaseJob
payload = {}
scheduler_health_check_threshold = timedelta(seconds=conf.getint('scheduler',
'scheduler_health_check_threshold'
))
payload = {
'metadatabase': {'status': 'unhealthy'}
}

latest_scheduler_heartbeat = None
scheduler_status = 'unhealthy'
payload['metadatabase'] = {'status': 'healthy'}
try:
latest_scheduler_heartbeat = session.query(func.max(BJ.latest_heartbeat)).\
filter(BJ.state == 'running', BJ.job_type == 'SchedulerJob').\
scalar()
scheduler_job = jobs.SchedulerJob.most_recent_job()

if scheduler_job:
latest_scheduler_heartbeat = scheduler_job.latest_heartbeat.isoformat()
if scheduler_job.is_alive():
scheduler_status = 'healthy'
except Exception:
payload['metadatabase']['status'] = 'unhealthy'

if not latest_scheduler_heartbeat:
scheduler_status = 'unhealthy'
else:
if timezone.utcnow() - latest_scheduler_heartbeat <= scheduler_health_check_threshold:
scheduler_status = 'healthy'
else:
scheduler_status = 'unhealthy'

payload['scheduler'] = {'status': scheduler_status,
'latest_scheduler_heartbeat': str(latest_scheduler_heartbeat)}
'latest_scheduler_heartbeat': latest_scheduler_heartbeat}

return wwwutils.json_response(payload)

Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ def do_setup():
'iso8601>=0.1.12',
'json-merge-patch==0.2',
'jinja2>=2.10.1, <2.11.0',
'lazy_object_proxy~=1.3',
'markdown>=2.5.2, <3.0',
'pandas>=0.17.1, <1.0.0',
'pendulum==1.4.4',
Expand Down
38 changes: 36 additions & 2 deletions tests/jobs/test_base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
# under the License.
#

import datetime
import unittest

from airflow import configuration
from airflow.jobs import BaseJob
from airflow.utils import timezone
from airflow.utils.state import State
from airflow.utils.db import create_session

configuration.load_test_config()

Expand All @@ -33,9 +36,9 @@ class TestJob(BaseJob):
'polymorphic_identity': 'TestJob'
}

def __init__(self, cb):
def __init__(self, cb, **kwargs):
self.cb = cb
super().__init__()
super().__init__(**kwargs)

def _execute(self):
return self.cb()
Expand Down Expand Up @@ -65,3 +68,34 @@ def abort():

self.assertEqual(job.state, State.FAILED)
self.assertIsNotNone(job.end_date)

def test_most_recent_job(self):

with create_session() as session:
old_job = self.TestJob(None, heartrate=10)
old_job.latest_heartbeat = old_job.latest_heartbeat - datetime.timedelta(seconds=20)
job = self.TestJob(None, heartrate=10)
session.add(job)
session.add(old_job)
session.flush()

self.assertEqual(
self.TestJob.most_recent_job(session=session),
job
)

session.rollback()

def test_is_alive(self):
job = self.TestJob(None, heartrate=10, state=State.RUNNING)
self.assertTrue(job.is_alive())

job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=20)
self.assertTrue(job.is_alive())

job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=21)
self.assertFalse(job.is_alive())

job.state = State.SUCCESS
job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=10)
self.assertFalse(job.is_alive(), "Completed jobs even with recent heartbeat should not be alive")
14 changes: 14 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,20 @@ def getboolean(section, key):
def tearDownClass(cls):
cls.patcher.stop()

def test_is_alive(self):
job = SchedulerJob(None, heartrate=10, state=State.RUNNING)
self.assertTrue(job.is_alive())

job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=20)
self.assertTrue(job.is_alive())

job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=31)
self.assertFalse(job.is_alive())

job.state = State.SUCCESS
job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=10)
self.assertFalse(job.is_alive(), "Completed jobs even with recent heartbeat should not be alive")

def run_single_scheduler_loop_with_no_dags(self, dags_folder):
"""
Utility function that runs a single scheduler loop without actually
Expand Down
7 changes: 3 additions & 4 deletions tests/www/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ def test_health(self):

self.assertEqual('healthy', resp_json['metadatabase']['status'])
self.assertEqual('healthy', resp_json['scheduler']['status'])
self.assertEqual(str(last_scheduler_heartbeat_for_testing_1),
self.assertEqual(last_scheduler_heartbeat_for_testing_1.isoformat(),
resp_json['scheduler']['latest_scheduler_heartbeat'])

self.session.query(BaseJob).\
Expand All @@ -408,7 +408,7 @@ def test_health(self):

self.assertEqual('healthy', resp_json['metadatabase']['status'])
self.assertEqual('unhealthy', resp_json['scheduler']['status'])
self.assertEqual(str(last_scheduler_heartbeat_for_testing_2),
self.assertEqual(last_scheduler_heartbeat_for_testing_2.isoformat(),
resp_json['scheduler']['latest_scheduler_heartbeat'])

self.session.query(BaseJob).\
Expand All @@ -429,8 +429,7 @@ def test_health(self):

self.assertEqual('healthy', resp_json['metadatabase']['status'])
self.assertEqual('unhealthy', resp_json['scheduler']['status'])
self.assertEqual('None',
resp_json['scheduler']['latest_scheduler_heartbeat'])
self.assertIsNone(None, resp_json['scheduler']['latest_scheduler_heartbeat'])

def test_home(self):
resp = self.client.get('home', follow_redirects=True)
Expand Down