diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 015da426c4f21..b3161f49b7baf 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2302,6 +2302,15 @@ scheduler: type: integer example: ~ default: "5" + local_task_job_heartbeat_sec: + description: | + The frequency (in seconds) at which the LocalTaskJob should send heartbeat signals to the + scheduler to notify it's still alive. If this value is set to 0, the heartbeat interval will default + to the value of scheduler_zombie_task_threshold. + version_added: 2.7.0 + type: integer + example: ~ + default: "0" num_runs: description: | The number of times to try to schedule each DAG file diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index cff56508df4bf..a71bd3ccaadf3 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -1195,6 +1195,11 @@ job_heartbeat_sec = 5 # how often the scheduler should run (in seconds). scheduler_heartbeat_sec = 5 +# The frequency (in seconds) at which the LocalTaskJob should send heartbeat signals to the +# scheduler to notify it's still alive. If this value is set to 0, the heartbeat interval will default +# to the value of scheduler_zombie_task_threshold. +local_task_job_heartbeat_sec = 0 + # The number of times to try to schedule each DAG file # -1 indicates unlimited number num_runs = -1 diff --git a/airflow/jobs/local_task_job_runner.py b/airflow/jobs/local_task_job_runner.py index fd234e4150d6b..6184a3e7fc42e 100644 --- a/airflow/jobs/local_task_job_runner.py +++ b/airflow/jobs/local_task_job_runner.py @@ -157,8 +157,11 @@ def sigusr2_debug_handler(signum, frame): return_code = None try: self.task_runner.start() - - heartbeat_time_limit = conf.getint("scheduler", "scheduler_zombie_task_threshold") + local_task_job_heartbeat_sec = conf.getint("scheduler", "local_task_job_heartbeat_sec") + if local_task_job_heartbeat_sec < 1: + heartbeat_time_limit = conf.getint("scheduler", "scheduler_zombie_task_threshold") + else: + heartbeat_time_limit = local_task_job_heartbeat_sec # LocalTaskJob should not run callbacks, which are handled by TaskInstance._run_raw_task # 1, LocalTaskJob does not parse DAG, thus cannot run callbacks