forked from ManageIQ/manageiq
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathkubernetes.rb
257 lines (211 loc) · 8.61 KB
/
kubernetes.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
class MiqServer::WorkerManagement::Kubernetes < MiqServer::WorkerManagement
class_attribute :current_pods
self.current_pods = Concurrent::Hash.new
class_attribute :current_deployments
self.current_deployments = Concurrent::Hash.new
attr_accessor :deployments_monitor_thread, :pods_monitor_thread
def sync_from_system
# All miq_server instances have to reside on the same Kubernetes cluster, so
# we only have to sync the list of pods and deployments once
ensure_kube_monitors_started if my_server_is_primary?
# Update worker deployments with updated settings such as cpu/memory limits
sync_deployment_settings
end
def sync_starting_workers
MiqWorker.find_all_starting.each do |worker|
next if worker.class.rails_worker?
worker_pod = get_pod(worker[:system_uid])
container_status = worker_pod.status.containerStatuses.find { |container| container.name == worker.worker_deployment_name }
if worker_pod.status.phase == "Running" && container_status.ready && container_status.started
worker.update!(:status => "started")
end
end
end
def sync_stopping_workers
MiqWorker.find_all_stopping.reject { |w| w.class.rails_worker? }.each do |worker|
next if get_pod(worker[:system_uid]).present?
worker.update!(:status => MiqWorker::STATUS_STOPPED)
end
end
def enough_resource_to_start_worker?(_worker_class)
true
end
def cleanup_orphaned_worker_rows
unless current_pods.empty?
orphaned_rows = miq_workers.where.not(:system_uid => current_pods.keys)
unless orphaned_rows.empty?
_log.warn("Removing orphaned worker rows without corresponding pods: #{orphaned_rows.collect(&:system_uid).inspect}")
orphaned_rows.destroy_all
end
end
end
def cleanup_orphaned_workers
orphaned_pods = current_pods.keys - miq_workers.pluck(:system_uid)
return if orphaned_pods.empty?
# TODO destroy orphaned pods
orphaned_pods.each { |_pod| }
end
def cleanup_failed_workers
super
delete_failed_deployments
end
def failed_deployments(restart_count = 5)
# TODO: This logic might flag deployments that are hitting memory/cpu limits or otherwise not really 'failed'
current_pods.values.select { |h| h[:last_state_terminated] && h.fetch(:container_restarts, 0) > restart_count }.pluck(:label_name).uniq
end
def sync_deployment_settings
checked_deployments = Set.new
miq_workers.each do |worker|
next if checked_deployments.include?(worker.worker_deployment_name)
if deployment_resource_constraints_changed?(worker)
_log.info("Constraints changed, patching deployment: [#{worker.worker_deployment_name}]")
begin
worker.patch_deployment
rescue => err
_log.warn("Failure patching deployment: [#{worker.worker_deployment_name}] for worker: id: [#{worker.id}], system_uid: [#{worker.system_uid}]. Error: [#{err}]... skipping")
next
end
end
checked_deployments << worker.worker_deployment_name
end
end
def deployment_resource_constraints_changed?(worker)
return false unless ::Settings.server.worker_monitor.enforce_resource_constraints
container = current_deployments.fetch_path(worker.worker_deployment_name, :spec, :template, :spec, :containers).try(:first)
current_constraints = container.try(:fetch, :resources, nil) || {}
desired_constraints = worker.resource_constraints
constraints_changed?(current_constraints, desired_constraints)
end
def constraints_changed?(current, desired)
if current.present? && desired.present?
!cpu_value_eql?(current.fetch_path(:requests, :cpu), desired.fetch_path(:requests, :cpu)) ||
!cpu_value_eql?(current.fetch_path(:limits, :cpu), desired.fetch_path(:limits, :cpu)) ||
!mem_value_eql?(current.fetch_path(:requests, :memory), desired.fetch_path(:requests, :memory)) ||
!mem_value_eql?(current.fetch_path(:limits, :memory), desired.fetch_path(:limits, :memory))
else
# current, no desired => changed
# no current, desired => changed
# no current, no desired => unchanged
current.blank? ^ desired.blank?
end
end
private
# In podified there is only one "primary" miq_server whose zone is "default", the
# other miq_server instances are simply to allow for additional zones
def my_server_is_primary?
my_server.zone&.name == "default"
end
def cpu_value_eql?(current, desired)
# Convert to millicores if not already converted: "1" -> 1000; "1000m" -> 1000
current = current.to_s[-1] == "m" ? current.to_f : current.to_f * 1000
desired = desired.to_s[-1] == "m" ? desired.to_f : desired.to_f * 1000
current == desired
end
def mem_value_eql?(current, desired)
current.try(:iec_60027_2_to_i) == desired.try(:iec_60027_2_to_i)
end
def start_kube_monitor(resource = :pods)
require 'http'
Thread.new do
_log.info("Started new #{resource} monitor thread of #{Thread.list.length} total")
begin
send(:"monitor_#{resource}")
rescue HTTP::ConnectionError => e
_log.error("Exiting #{resource} monitor thread due to [#{e.class.name}]: #{e}")
rescue => e
_log.error("Exiting #{resource} monitor thread after uncaught error")
_log.log_backtrace(e)
end
end
end
def ensure_kube_monitors_started
[:deployments, :pods].each do |resource|
getter = "#{resource}_monitor_thread"
thread = send(getter)
if thread.nil? || !thread.alive?
if !thread.nil? && thread.status.nil?
dead_thread = thread
send(:"#{getter}=", nil)
_log.info("Waiting for the #{getter} Monitor Thread to exit...")
dead_thread.join
end
send(:"#{getter}=", start_kube_monitor(resource))
end
end
end
def delete_failed_deployments
return unless my_server_is_primary?
failed_deployments.each do |failed|
orchestrator.delete_deployment(failed)
end
end
def orchestrator
@orchestrator ||= ContainerOrchestrator.new
end
def monitor_deployments
loop do
current_deployments.clear
resource_version = collect_initial(:deployments)
watch_for_events(:deployments, resource_version)
end
end
def monitor_pods
loop do
current_pods.clear
resource_version = collect_initial(:pods)
# watch_for_events doesn't return unless an error caused us to break out of it, so we'll start over again
watch_for_events(:pods, resource_version)
end
end
def collect_initial(resource = :pods)
objects = orchestrator.send(:"get_#{resource}")
objects.each { |p| send(:"save_#{resource.to_s.singularize}", p) }
objects.resourceVersion
end
def watch_for_events(resource, resource_version)
orchestrator.send(:"watch_#{resource}", resource_version).each do |event|
case event.type.downcase
when "added", "modified"
send(:"save_#{resource.to_s.singularize}", event.object)
when "deleted"
send(:"delete_#{resource.to_s.singularize}", event.object)
when "error"
if (status = event.object)
# ocp 3 appears to return 'ERROR' watch events with the object containing the 410 code and "Gone" reason like below:
# #<Kubeclient::Resource type="ERROR", object={:kind=>"Status", :apiVersion=>"v1", :metadata=>{}, :status=>"Failure", :message=>"too old resource version: 199900 (27177196)", :reason=>"Gone", :code=>410}>
log_resource_error_event(status.code, status.message, status.reason)
end
break
end
end
end
def log_resource_error_event(code, message, reason)
_log.warn("Restarting watch_for_events due to error: [#{code} #{reason}], [#{message}]")
end
def save_deployment(deployment)
name = deployment.metadata.name
new_hash = Concurrent::Hash.new
new_hash[:spec] = deployment.spec.to_h
current_deployments[name] ||= new_hash
current_deployments[name].merge!(new_hash)
end
def delete_deployment(deployment)
current_deployments.delete(deployment.metadata.name)
end
def save_pod(pod)
return unless pod.status.containerStatuses
ch = Concurrent::Hash.new
ch[:label_name] = pod.metadata.labels.name
ch[:last_state_terminated] = pod.status.containerStatuses.any? { |cs| cs.lastState.terminated }
ch[:container_restarts] = pod.status.containerStatuses.sum { |cs| cs.restartCount.to_i }
name = pod.metadata.name
current_pods[name] ||= ch
current_pods[name].merge!(ch)
end
def delete_pod(pod)
current_pods.delete(pod.metadata.name)
end
def get_pod(pod_name)
orchestrator.get_pods.find { |pod| pod.metadata[:name] == pod_name }
end
end