Skip to content

Commit

Permalink
replace scheduled heartbeat workers with more frequent culling
Browse files Browse the repository at this point in the history
  • Loading branch information
ezekg committed Feb 6, 2024
1 parent 5388359 commit fc1b739
Show file tree
Hide file tree
Showing 17 changed files with 36 additions and 205 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,6 @@ def ping
)
end

# Queue up heartbeat worker which will handle deactivating dead processes
jid = ProcessHeartbeatWorker.perform_in(
machine_process.interval + MachineProcess::HEARTBEAT_DRIFT,
machine_process.id,
)

machine_process.update(
heartbeat_jid: jid,
)

Keygen.logger.info {
"[process.heartbeat.ping] account_id=#{current_account.id} process_id=#{machine_process.id}" \
" process_status=#{machine_process.status} process_interval=#{machine_process.interval}" \
" process_jid=#{jid} process_jid_was=#{machine_process.heartbeat_jid_previously_was}"
}

render jsonapi: machine_process
rescue MachineProcess::ResurrectionUnsupportedError,
MachineProcess::ResurrectionExpiredError
Expand Down
15 changes: 0 additions & 15 deletions app/controllers/api/v1/machine_processes_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,6 @@ def create
authorize! machine_process

if machine_process.save
jid = ProcessHeartbeatWorker.perform_in(
machine_process.interval + MachineProcess::HEARTBEAT_DRIFT,
machine_process.id,
)

Keygen.logger.info {
"[process.heartbeat.start] account_id=#{current_account.id} process_id=#{machine_process.id}" \
" process_status=#{machine_process.status} process_interval=#{machine_process.interval}" \
" process_jid=#{jid}"
}

machine_process.update(
heartbeat_jid: jid,
)

BroadcastEventService.call(
event: 'process.created',
account: current_account,
Expand Down
16 changes: 0 additions & 16 deletions app/controllers/api/v1/machines/actions/heartbeats_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,6 @@ def ping
)
end

# Queue up heartbeat worker which will handle deactivating dead machines
jid = MachineHeartbeatWorker.perform_in(
machine.heartbeat_duration + Machine::HEARTBEAT_DRIFT,
machine.id,
)

Keygen.logger.info {
"[machine.heartbeat.ping] account_id=#{current_account.id} machine_id=#{machine.id}" \
" machine_status=#{machine.heartbeat_status} machine_interval=#{machine.heartbeat_duration}" \
" machine_jid=#{jid} machine_jid_was=#{machine.heartbeat_jid_previously_was}"
}

machine.update(
heartbeat_jid: jid,
)

render jsonapi: machine
rescue Machine::ResurrectionUnsupportedError,
Machine::ResurrectionExpiredError
Expand Down
17 changes: 0 additions & 17 deletions app/controllers/api/v1/machines_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -119,23 +119,6 @@ def create
end

if machine.save
if machine.requires_heartbeat?
jid = MachineHeartbeatWorker.perform_in(
machine.heartbeat_duration + Machine::HEARTBEAT_DRIFT,
machine.id,
)

Keygen.logger.info {
"[machine.heartbeat.start] account_id=#{current_account.id} machine_id=#{machine.id}" \
" machine_status=#{machine.heartbeat_status} machine_interval=#{machine.heartbeat_duration}" \
" machine_jid=#{jid}"
}

machine.update(
heartbeat_jid: jid,
)
end

BroadcastEventService.call(
event: 'machine.created',
account: current_account,
Expand Down
18 changes: 2 additions & 16 deletions app/workers/cull_dead_machines_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,17 @@

class CullDeadMachinesWorker < BaseWorker
sidekiq_options queue: :cron,
lock: :until_executed, lock_ttl: 30.minutes, on_conflict: :raise,
lock: :until_executed, lock_ttl: 10.minutes, on_conflict: :raise,
cronitor_disabled: false

# In some cases, a machine can be orphaned from its heartbeat worker.
# For example, when a machine is started with a heartbeat duration
# of 600, but the policy's heartbeat duration is later changed to
# 86000, this will cause all in-progress heartbeat monitors to
# become out of sync if no further pings are sent. After death,
# this results in a zombie machine that needs to be culled.
#
# Another scenario is where a policy is created that does not require
# heartbeats, but is later updated to require heartbeats. Without
# this worker, previously created machines in an idle state would
# stick around even though they're required to have a heartbeat.
def perform
machines = Machine.joins(:policy)
.where.not(policies: { heartbeat_cull_strategy: 'ALWAYS_REVIVE' })
.where(heartbeat_jid: nil)
.dead

machines.find_each do |machine|
jid = MachineHeartbeatWorker.perform_in(
rand(60..600).seconds, # fan out to prevent a thundering herd
machine.id,
)
jid = MachineHeartbeatWorker.perform_async(machine.id)

Keygen.logger.info {
"[machine.heartbeat.cull] account_id=#{machine.account_id} machine_id=#{machine.id}" \
Expand Down
13 changes: 2 additions & 11 deletions app/workers/cull_dead_processes_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,17 @@

class CullDeadProcessesWorker < BaseWorker
sidekiq_options queue: :cron,
lock: :until_executed, lock_ttl: 30.minutes, on_conflict: :raise,
lock: :until_executed, lock_ttl: 10.minutes, on_conflict: :raise,
cronitor_disabled: false

# In some cases, a process can be orphaned from its heartbeat worker.
# For example, when a process is started with a heartbeat duration
# of 600, but the policy's heartbeat duration is later changed to
# 86000, this will cause all in-progress heartbeat monitors to
# become out of sync if no further pings are sent. After death,
# this results in a zombie process that needs to be culled.
def perform
processes = MachineProcess.joins(:policy)
.where.not(policies: { heartbeat_cull_strategy: 'ALWAYS_REVIVE' })
.where(heartbeat_jid: nil)
.dead

processes.find_each do |process|
jid = ProcessHeartbeatWorker.perform_in(
rand(60..600).seconds, # fan out to prevent a thundering herd
process.id,
)
jid = ProcessHeartbeatWorker.perform_async(process.id)

Keygen.logger.info {
"[process.heartbeat.cull] account_id=#{process.account.id} process_id=#{process.id}" \
Expand Down
2 changes: 1 addition & 1 deletion app/workers/machine_heartbeat_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def perform(machine_id)
machine.requires_heartbeat?

Keygen.logger.info {
"[machine.heartbeat.monitor] account_id=#{machine.account.id} machine_id=#{machine.id}" \
"[machine.heartbeat.worker] account_id=#{machine.account.id} machine_id=#{machine.id}" \
" machine_status=#{machine.heartbeat_status} machine_interval=#{machine.heartbeat_duration}" \
" machine_jid=#{machine.heartbeat_jid} jid=#{jid}"
}
Expand Down
2 changes: 1 addition & 1 deletion app/workers/process_heartbeat_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def perform(process_id)
process = MachineProcess.find(process_id)

Keygen.logger.info {
"[process.heartbeat.monitor] account_id=#{process.account.id} process_id=#{process.id}" \
"[process.heartbeat.worker] account_id=#{process.account.id} process_id=#{process.id}" \
" process_status=#{process.status} process_interval=#{process.interval}" \
" process_jid=#{process.heartbeat_jid} jid=#{jid}"
}
Expand Down
4 changes: 2 additions & 2 deletions config/schedule.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ check_for_license_overdue_check_ins:
class: "LicenseOverdueCheckInsWorker"

cull_dead_machines:
cron: "*/10 * * * *" # Every 10 mins
cron: "*/2 * * * *" # Every 2 mins
class: "CullDeadMachinesWorker"

cull_dead_processes:
cron: "*/10 * * * *" # Every 10 mins
cron: "*/2 * * * *" # Every 2 mins
class: "CullDeadProcessesWorker"

report_account_request_limits:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
class AddIndexToHeartbeatJidForMachines < ActiveRecord::Migration[7.1]
def change
add_index :machines, :heartbeat_jid
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
class AddIndexToHeartbeatJidForMachineProcesses < ActiveRecord::Migration[7.1]
def change
add_index :machine_processes, :heartbeat_jid
end
end
19 changes: 18 additions & 1 deletion db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema[7.0].define(version: 2024_01_22_032049) do
ActiveRecord::Schema[7.1].define(version: 2024_02_06_170649) do
# These are extensions that must be enabled in order to support this database
enable_extension "btree_gin"
enable_extension "pg_stat_statements"
Expand Down Expand Up @@ -194,6 +194,19 @@
t.index ["license_id"], name: "index_license_entitlements_on_license_id"
end

create_table "license_users", id: :uuid, default: -> { "uuid_generate_v4()" }, force: :cascade do |t|

This comment has been minimized.

Copy link
@ezekg

ezekg Feb 20, 2024

Author Member

This is cruft from #774. Reverted in aeef5a0.

t.uuid "account_id", null: false
t.uuid "environment_id"
t.uuid "license_id", null: false
t.uuid "user_id", null: false
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["account_id", "created_at"], name: "index_license_users_on_account_id_and_created_at", order: { created_at: :desc }
t.index ["environment_id"], name: "index_license_users_on_environment_id"
t.index ["license_id", "user_id", "account_id"], name: "index_license_users_on_license_id_and_user_id_and_account_id", unique: true
t.index ["user_id"], name: "index_license_users_on_user_id"
end

create_table "licenses", id: :uuid, default: -> { "uuid_generate_v4()" }, force: :cascade do |t|
t.string "key", null: false
t.datetime "expiry", precision: nil
Expand Down Expand Up @@ -268,6 +281,7 @@
t.index "machine_id, md5((pid)::text)", name: "index_machine_processes_on_machine_id_md5_pid", unique: true
t.index ["account_id"], name: "index_machine_processes_on_account_id"
t.index ["environment_id"], name: "index_machine_processes_on_environment_id"
t.index ["heartbeat_jid"], name: "index_machine_processes_on_heartbeat_jid"
t.index ["last_heartbeat_at"], name: "index_machine_processes_on_last_heartbeat_at"
t.index ["machine_id"], name: "index_machine_processes_on_machine_id"
end
Expand All @@ -291,6 +305,7 @@
t.datetime "last_check_out_at", precision: nil
t.uuid "environment_id"
t.string "heartbeat_jid"
t.uuid "owner_id"
t.index "license_id, md5((fingerprint)::text)", name: "machines_license_id_fingerprint_unique_idx", unique: true
t.index "to_tsvector('simple'::regconfig, COALESCE((id)::text, ''::text))", name: "machines_tsv_id_idx", using: :gist
t.index "to_tsvector('simple'::regconfig, COALESCE((metadata)::text, ''::text))", name: "machines_tsv_metadata_idx", using: :gist
Expand All @@ -300,9 +315,11 @@
t.index ["environment_id"], name: "index_machines_on_environment_id"
t.index ["fingerprint"], name: "machines_hash_fingerprint_idx", using: :hash
t.index ["group_id"], name: "index_machines_on_group_id"
t.index ["heartbeat_jid"], name: "index_machines_on_heartbeat_jid"
t.index ["id", "created_at", "account_id"], name: "index_machines_on_id_and_created_at_and_account_id", unique: true
t.index ["last_heartbeat_at"], name: "index_machines_on_last_heartbeat_at"
t.index ["license_id", "created_at"], name: "index_machines_on_license_id_and_created_at"
t.index ["owner_id"], name: "index_machines_on_owner_id"
end

create_table "metrics", id: :uuid, default: -> { "uuid_generate_v4()" }, force: :cascade do |t|
Expand Down
Loading

0 comments on commit fc1b739

Please sign in to comment.