Skip to content

Commit

Permalink
Introduce Heartbeat Parameter to Allow Per-LocalTaskJob Configuration (
Browse files Browse the repository at this point in the history
…#32313)

* Refactor Job Heartbeat Parameter to Allow Per-Job Configuration

This pull request introduces changes to allow users to set heartbeat expectations separately for LocalTaskJob,. his resolves a current limitation where all job types share a single configuration parameter for expected heartbeat time.


related to: #30908
  • Loading branch information
pragnareddye authored Jul 15, 2023
1 parent 41e6119 commit 9b466bd
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 2 deletions.
9 changes: 9 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2302,6 +2302,15 @@ scheduler:
type: integer
example: ~
default: "5"
local_task_job_heartbeat_sec:
description: |
The frequency (in seconds) at which the LocalTaskJob should send heartbeat signals to the
scheduler to notify it's still alive. If this value is set to 0, the heartbeat interval will default
to the value of scheduler_zombie_task_threshold.
version_added: 2.7.0
type: integer
example: ~
default: "0"
num_runs:
description: |
The number of times to try to schedule each DAG file
Expand Down
5 changes: 5 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,11 @@ job_heartbeat_sec = 5
# how often the scheduler should run (in seconds).
scheduler_heartbeat_sec = 5

# The frequency (in seconds) at which the LocalTaskJob should send heartbeat signals to the
# scheduler to notify it's still alive. If this value is set to 0, the heartbeat interval will default
# to the value of scheduler_zombie_task_threshold.
local_task_job_heartbeat_sec = 0

# The number of times to try to schedule each DAG file
# -1 indicates unlimited number
num_runs = -1
Expand Down
7 changes: 5 additions & 2 deletions airflow/jobs/local_task_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,11 @@ def sigusr2_debug_handler(signum, frame):
return_code = None
try:
self.task_runner.start()

heartbeat_time_limit = conf.getint("scheduler", "scheduler_zombie_task_threshold")
local_task_job_heartbeat_sec = conf.getint("scheduler", "local_task_job_heartbeat_sec")
if local_task_job_heartbeat_sec < 1:
heartbeat_time_limit = conf.getint("scheduler", "scheduler_zombie_task_threshold")
else:
heartbeat_time_limit = local_task_job_heartbeat_sec

# LocalTaskJob should not run callbacks, which are handled by TaskInstance._run_raw_task
# 1, LocalTaskJob does not parse DAG, thus cannot run callbacks
Expand Down

0 comments on commit 9b466bd

Please sign in to comment.