diff --git a/airflow/jobs/base_job.py b/airflow/jobs/base_job.py index a424e1dc8bfee..9e5f7e8bb81f9 100644 --- a/airflow/jobs/base_job.py +++ b/airflow/jobs/base_job.py @@ -23,7 +23,8 @@ from sqlalchemy import Column, Index, Integer, String, and_, or_ from sqlalchemy.exc import OperationalError -from sqlalchemy.orm.session import make_transient +from sqlalchemy.orm.session import make_transient, Session +from typing import Optional from airflow import configuration as conf from airflow import executors, models @@ -71,25 +72,54 @@ class BaseJob(Base, LoggingMixin): Index('idx_job_state_heartbeat', state, latest_heartbeat), ) + heartrate = conf.getfloat('scheduler', 'JOB_HEARTBEAT_SEC') + def __init__( self, executor=executors.get_default_executor(), - heartrate=conf.getfloat('scheduler', 'JOB_HEARTBEAT_SEC'), + heartrate=None, *args, **kwargs): self.hostname = get_hostname() self.executor = executor self.executor_class = executor.__class__.__name__ self.start_date = timezone.utcnow() self.latest_heartbeat = timezone.utcnow() - self.heartrate = heartrate + if heartrate is not None: + self.heartrate = heartrate self.unixname = getpass.getuser() self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query') super().__init__(*args, **kwargs) - def is_alive(self): + @classmethod + @provide_session + def most_recent_job(cls, session: Session) -> Optional['BaseJob']: + """ + Return the most recent job of this type, if any, based on last + heartbeat received. + + This method should be called on a subclass (i.e. on SchedulerJob) to + return jobs of that type. + + :param session: Database session + :rtype: BaseJob or None + """ + return session.query(cls).order_by(cls.latest_heartbeat.desc()).limit(1).first() + + def is_alive(self, grace_multiplier=2.1): + """ + Is this job currently alive. + + We define alive as in a state of RUNNING, and having sent a heartbeat + within a multiple of the heartrate (default of 2.1) + + :param grace_multiplier: multiplier of heartrate to require heart beat + within + :type grace_multiplier: number + :rtype: boolean + """ return ( - (timezone.utcnow() - self.latest_heartbeat).seconds < - (conf.getint('scheduler', 'JOB_HEARTBEAT_SEC') * 2.1) + self.state == State.RUNNING and + (timezone.utcnow() - self.latest_heartbeat).seconds < self.heartrate * grace_multiplier ) @provide_session diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index ccce9f3381660..6b8ef69a15836 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -293,6 +293,7 @@ class SchedulerJob(BaseJob): __mapper_args__ = { 'polymorphic_identity': 'SchedulerJob' } + heartrate = conf.getint('scheduler', 'SCHEDULER_HEARTBEAT_SEC') def __init__( self, @@ -336,7 +337,6 @@ def __init__( self.do_pickle = do_pickle super().__init__(*args, **kwargs) - self.heartrate = conf.getint('scheduler', 'SCHEDULER_HEARTBEAT_SEC') self.max_threads = conf.getint('scheduler', 'max_threads') if log: @@ -362,6 +362,27 @@ def _exit_gracefully(self, signum, frame): self.processor_agent.end() sys.exit(os.EX_OK) + def is_alive(self, grace_multiplier=None): + """ + Is this SchedulerJob alive? + + We define alive as in a state of running and a heartbeat within the + threshold defined in the ``scheduler_health_check_threshold`` config + setting. + + ``grace_multiplier`` is accepted for compatibility with the parent class. + + :rtype: boolean + """ + if grace_multiplier is not None: + # Accept the same behaviour as superclass + return super().is_alive(grace_multiplier=grace_multiplier) + scheduler_health_check_threshold = conf.getint('scheduler', 'scheduler_health_check_threshold') + return ( + self.state == State.RUNNING and + (timezone.utcnow() - self.latest_heartbeat).seconds < scheduler_health_check_threshold + ) + @provide_session def manage_slas(self, dag, session=None): """ diff --git a/airflow/macros/__init__.py b/airflow/macros/__init__.py index 4b533d10b7466..27205a22c8d2d 100644 --- a/airflow/macros/__init__.py +++ b/airflow/macros/__init__.py @@ -66,6 +66,23 @@ def ds_format(ds, input_format, output_format): return datetime.strptime(ds, input_format).strftime(output_format) +def datetime_diff_for_humans(dt, since=None): + """ + Return a human-readable/approximate difference between two datetimes, or + one and now. + + :param dt: The datetime to display the diff for + :type dt: datetime + :param since: When to display the date from. If ``None`` then the diff is + between ``dt`` and now. + :type since: None or datetime + :rtype: str + """ + import pendulum + + return pendulum.instance(dt).diff_for_humans(since) + + def _integrate_plugins(): """Integrate plugins to the context""" import sys diff --git a/airflow/www/templates/appbuilder/baselayout.html b/airflow/www/templates/appbuilder/baselayout.html index 9cf6145cb8053..0b5b82df63afb 100644 --- a/airflow/www/templates/appbuilder/baselayout.html +++ b/airflow/www/templates/appbuilder/baselayout.html @@ -46,6 +46,16 @@
{% block messages %} {% include 'appbuilder/flash.html' %} + {% if scheduler_job is defined and (not scheduler_job or not scheduler_job.is_alive()) %} +
+

The scheduler does not appear to be running. + {% if scheduler_job %} + Last heartbeat was received . + {% endif %} +

+

The DAGs list may not update, and new tasks will not be scheduled.

+
+ {% endif %} {% endblock %} {% block content %} {% endblock %} @@ -70,6 +80,7 @@ // below variables are used in base.js var hostName = '{{ hostname }}'; var csrfToken = '{{ csrf_token() }}'; + $("time[title]").tooltip() {% endblock %} diff --git a/airflow/www/views.py b/airflow/www/views.py index 53e400fe69e98..d5ac9808fffaa 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -40,9 +40,10 @@ from flask_appbuilder.actions import action from flask_appbuilder.models.sqla.filters import BaseFilter from flask_babel import lazy_gettext +import lazy_object_proxy from pygments import highlight, lexers from pygments.formatters import HtmlFormatter -from sqlalchemy import func, or_, desc, and_, union_all +from sqlalchemy import or_, desc, and_, union_all from wtforms import SelectField, validators import airflow @@ -150,6 +151,19 @@ def show_traceback(error): class AirflowBaseView(BaseView): route_base = '' + # Make our macros available to our UI templates too. + extra_args = { + 'macros': airflow.macros, + } + + def render_template(self, *args, **kwargs): + return super().render_template( + *args, + # Cache this at most once per request, not for the lifetime of the view instance + scheduler_job=lazy_object_proxy.Proxy(jobs.SchedulerJob.most_recent_job), + **kwargs + ) + class Airflow(AirflowBaseView): @expose('/health') @@ -160,31 +174,25 @@ def health(self, session=None): including metadatabase and scheduler. """ - BJ = jobs.BaseJob - payload = {} - scheduler_health_check_threshold = timedelta(seconds=conf.getint('scheduler', - 'scheduler_health_check_threshold' - )) + payload = { + 'metadatabase': {'status': 'unhealthy'} + } latest_scheduler_heartbeat = None + scheduler_status = 'unhealthy' payload['metadatabase'] = {'status': 'healthy'} try: - latest_scheduler_heartbeat = session.query(func.max(BJ.latest_heartbeat)).\ - filter(BJ.state == 'running', BJ.job_type == 'SchedulerJob').\ - scalar() + scheduler_job = jobs.SchedulerJob.most_recent_job() + + if scheduler_job: + latest_scheduler_heartbeat = scheduler_job.latest_heartbeat.isoformat() + if scheduler_job.is_alive(): + scheduler_status = 'healthy' except Exception: payload['metadatabase']['status'] = 'unhealthy' - if not latest_scheduler_heartbeat: - scheduler_status = 'unhealthy' - else: - if timezone.utcnow() - latest_scheduler_heartbeat <= scheduler_health_check_threshold: - scheduler_status = 'healthy' - else: - scheduler_status = 'unhealthy' - payload['scheduler'] = {'status': scheduler_status, - 'latest_scheduler_heartbeat': str(latest_scheduler_heartbeat)} + 'latest_scheduler_heartbeat': latest_scheduler_heartbeat} return wwwutils.json_response(payload) diff --git a/setup.py b/setup.py index 7a67635c711e9..8e1c1f294f7ea 100644 --- a/setup.py +++ b/setup.py @@ -328,6 +328,7 @@ def do_setup(): 'iso8601>=0.1.12', 'json-merge-patch==0.2', 'jinja2>=2.10.1, <2.11.0', + 'lazy_object_proxy~=1.3', 'markdown>=2.5.2, <3.0', 'pandas>=0.17.1, <1.0.0', 'pendulum==1.4.4', diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py index 847b241a61ffb..a6eeb38c442a7 100644 --- a/tests/jobs/test_base_job.py +++ b/tests/jobs/test_base_job.py @@ -18,11 +18,14 @@ # under the License. # +import datetime import unittest from airflow import configuration from airflow.jobs import BaseJob +from airflow.utils import timezone from airflow.utils.state import State +from airflow.utils.db import create_session configuration.load_test_config() @@ -33,9 +36,9 @@ class TestJob(BaseJob): 'polymorphic_identity': 'TestJob' } - def __init__(self, cb): + def __init__(self, cb, **kwargs): self.cb = cb - super().__init__() + super().__init__(**kwargs) def _execute(self): return self.cb() @@ -65,3 +68,34 @@ def abort(): self.assertEqual(job.state, State.FAILED) self.assertIsNotNone(job.end_date) + + def test_most_recent_job(self): + + with create_session() as session: + old_job = self.TestJob(None, heartrate=10) + old_job.latest_heartbeat = old_job.latest_heartbeat - datetime.timedelta(seconds=20) + job = self.TestJob(None, heartrate=10) + session.add(job) + session.add(old_job) + session.flush() + + self.assertEqual( + self.TestJob.most_recent_job(session=session), + job + ) + + session.rollback() + + def test_is_alive(self): + job = self.TestJob(None, heartrate=10, state=State.RUNNING) + self.assertTrue(job.is_alive()) + + job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=20) + self.assertTrue(job.is_alive()) + + job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=21) + self.assertFalse(job.is_alive()) + + job.state = State.SUCCESS + job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=10) + self.assertFalse(job.is_alive(), "Completed jobs even with recent heartbeat should not be alive") diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index b0dfce41d0da3..739c19fa81ab3 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -96,6 +96,20 @@ def getboolean(section, key): def tearDownClass(cls): cls.patcher.stop() + def test_is_alive(self): + job = SchedulerJob(None, heartrate=10, state=State.RUNNING) + self.assertTrue(job.is_alive()) + + job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=20) + self.assertTrue(job.is_alive()) + + job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=31) + self.assertFalse(job.is_alive()) + + job.state = State.SUCCESS + job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=10) + self.assertFalse(job.is_alive(), "Completed jobs even with recent heartbeat should not be alive") + def run_single_scheduler_loop_with_no_dags(self, dags_folder): """ Utility function that runs a single scheduler loop without actually diff --git a/tests/www/test_views.py b/tests/www/test_views.py index 47354d3921063..e2ab673db05f1 100644 --- a/tests/www/test_views.py +++ b/tests/www/test_views.py @@ -383,7 +383,7 @@ def test_health(self): self.assertEqual('healthy', resp_json['metadatabase']['status']) self.assertEqual('healthy', resp_json['scheduler']['status']) - self.assertEqual(str(last_scheduler_heartbeat_for_testing_1), + self.assertEqual(last_scheduler_heartbeat_for_testing_1.isoformat(), resp_json['scheduler']['latest_scheduler_heartbeat']) self.session.query(BaseJob).\ @@ -408,7 +408,7 @@ def test_health(self): self.assertEqual('healthy', resp_json['metadatabase']['status']) self.assertEqual('unhealthy', resp_json['scheduler']['status']) - self.assertEqual(str(last_scheduler_heartbeat_for_testing_2), + self.assertEqual(last_scheduler_heartbeat_for_testing_2.isoformat(), resp_json['scheduler']['latest_scheduler_heartbeat']) self.session.query(BaseJob).\ @@ -429,8 +429,7 @@ def test_health(self): self.assertEqual('healthy', resp_json['metadatabase']['status']) self.assertEqual('unhealthy', resp_json['scheduler']['status']) - self.assertEqual('None', - resp_json['scheduler']['latest_scheduler_heartbeat']) + self.assertIsNone(None, resp_json['scheduler']['latest_scheduler_heartbeat']) def test_home(self): resp = self.client.get('home', follow_redirects=True)