diff --git a/app/models/miq_server/worker_management/kubernetes.rb b/app/models/miq_server/worker_management/kubernetes.rb index bb9fb528cac..841ad198549 100644 --- a/app/models/miq_server/worker_management/kubernetes.rb +++ b/app/models/miq_server/worker_management/kubernetes.rb @@ -23,10 +23,34 @@ def sync_from_system def sync_starting_workers starting = MiqWorker.find_all_starting + # Get a list of pods that aren't currently assigned to MiqWorker records + pods_without_workers = current_pods.keys - MiqWorker.server_scope.pluck(:system_uid).compact + # Non-rails workers cannot set their own miq_worker record to started once they # have finished initializing. Check for any starting non-rails workers whose # pod is running and mark the miq_worker as started. starting.reject(&:rails_worker?).each do |worker| + # If the current worker doesn't have a system_uid assigned then find the first + # pod available for our worker type and link them up. + if worker.system_uid.nil? + system_uid = pods_without_workers.detect { |pod_name| pod_name.start_with?(worker.worker_deployment_name) } + if system_uid + # We have found a pod for the current worker record so remove the pod from + # the list of pods without workers and set the pod name as the system_uid + # for the current worker record. + pods_without_workers.delete(system_uid) + worker.update!(:system_uid => system_uid) + else + # If we haven't found a pod for this worker record then we need to check + # whether it has been starting for too long and should be marked as + # not responding. + stop_worker(worker, MiqServer::WorkerManagement::NOT_RESPONDING) if exceeded_heartbeat_threshold?(worker) + # Without a valid system_uid we cannot run any further logic in this + # loop. + next + end + end + worker_pod = current_pods[worker.system_uid] next if worker_pod.nil? @@ -54,7 +78,11 @@ def enough_resource_to_start_worker?(_worker_class) def cleanup_orphaned_worker_rows unless current_pods.empty? + # Any worker rows which have a system_uid that is not in the list of + # current pod names, and is not starting (aka hasn't had a system_uid set + # yet) should be deleted. orphaned_rows = miq_workers.where.not(:system_uid => current_pods.keys) + .where.not(:status => MiqWorker::STATUSES_STARTING) unless orphaned_rows.empty? _log.warn("Removing orphaned worker rows without corresponding pods: #{orphaned_rows.collect(&:system_uid).inspect}") orphaned_rows.destroy_all diff --git a/spec/models/miq_server/worker_management/kubernetes_spec.rb b/spec/models/miq_server/worker_management/kubernetes_spec.rb index 407dde8cf19..359706d7fba 100644 --- a/spec/models/miq_server/worker_management/kubernetes_spec.rb +++ b/spec/models/miq_server/worker_management/kubernetes_spec.rb @@ -214,13 +214,16 @@ before { MiqWorkerType.seed } context "podified" do - let(:rails_worker) { true } - let(:pod_name) { "1-generic-abcd" } - let(:pod_running) { true } - let(:worker) { FactoryBot.create(:miq_generic_worker, :miq_server => server, :status => status, :system_uid => pod_name) } - let(:current_pods) { {pod_name => {:label_name => pod_label, :last_state_terminated => false, :container_restarts => 0, :running => pod_running}} } + let(:rails_worker) { true } + let(:pod_name) { "#{server.compressed_id}-generic-abcd" } + let(:pod_running) { true } + let(:last_heartbeat) { Time.now.utc } + let(:worker) { FactoryBot.create(:miq_generic_worker, :miq_server => server, :status => status, :system_uid => system_uid, :last_heartbeat => last_heartbeat) } + let(:system_uid) { pod_name } + let(:current_pods) { {pod_name => {:label_name => pod_label, :last_state_terminated => false, :container_restarts => 0, :running => pod_running}} } before do + allow(worker.class).to receive(:containerized_worker?).and_return(true) allow(worker.class).to receive(:rails_worker?).and_return(rails_worker) server.worker_manager.current_pods = current_pods end @@ -270,6 +273,52 @@ expect(server.worker_manager.sync_starting_workers).to include(worker) end end + + context "with a worker that doesn't have a system_uid yet" do + let(:system_uid) { nil } + + before { server.worker_manager.sync_config } + + context "without a pod" do + let(:current_pods) { {} } + + it "returns the worker as still starting" do + expect(server.worker_manager.sync_starting_workers).to include(worker) + end + + context "with a worker that has been starting longer than the starting_timeout" do + let(:last_heartbeat) { 20.minutes.ago.utc } + + it "marks the worker as not responding" do + # Make sure that #find_worker returns our instance of worker that + # that stubs the #stop_container method. + expect(server.worker_manager).to receive(:find_worker).with(worker).and_return(worker) + expect(worker).to receive(:stop_container) + + server.worker_manager.sync_starting_workers + expect(worker.reload.status).to eq("stopping") + end + end + end + + context "with a pod that is running" do + let(:pod_running) { true } + + it "sets the worker's system_uid and marks the worker started" do + expect(server.worker_manager.sync_starting_workers).to be_empty + expect(worker.reload.system_uid).to eq(pod_name) + end + end + + context "with a pod that isn't running" do + let(:pod_running) { false } + + it "sets the worker's system_uid and but doesn't mark the worker started" do + expect(server.worker_manager.sync_starting_workers).to include(worker) + expect(worker.reload.system_uid).to eq(pod_name) + end + end + end end end end