-
Notifications
You must be signed in to change notification settings - Fork 900
/
Copy pathdequeue.rb
116 lines (99 loc) · 4.24 KB
/
dequeue.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
module MiqServer::WorkerManagement::Dequeue
extend ActiveSupport::Concern
def peek(queue_name, priority, limit)
MiqQueue.peek(
:conditions => {:queue_name => queue_name, :priority => priority, :role => @active_role_names},
:select => "id, lock_version, priority, role",
:limit => limit
)
end
def get_worker_dequeue_method(worker_class)
(@child_worker_settings[worker_class.settings_name][:dequeue_method] || :drb).to_sym
end
def reset_queue_messages
@queue_messages_lock.synchronize(:EX) do
@queue_messages = {}
end
end
def get_queue_priority_for_worker(w)
w[:class].respond_to?(:queue_priority) ? w[:class].queue_priority : MiqQueue::MIN_PRIORITY
end
def get_queue_message_for_worker(w)
return nil if w.nil? || w[:queue_name].nil?
@queue_messages_lock.synchronize(:EX) do
queue_name = w[:queue_name]
queue_hash = @queue_messages[queue_name]
return nil unless queue_hash.kind_of?(Hash)
messages = queue_hash[:messages]
return nil unless messages.kind_of?(Array)
messages.each_index do |index|
msg = messages[index]
next if msg.nil?
next if MiqQueue.lower_priority?(msg[:priority], get_queue_priority_for_worker(w))
next unless w[:class].required_roles.blank? || msg[:role].blank? || w[:class].required_roles.to_miq_a.include?(msg[:role])
return messages.delete_at(index)
end
return nil
end
end
def get_queue_message(pid)
update_worker_last_heartbeat(pid)
@workers_lock.synchronize(:SH) do
w = @workers[pid]
msg = get_queue_message_for_worker(w)
msg ? [msg[:id], msg[:lock_version]] : nil
end unless @workers_lock.nil?
end
def prefetch_stale_threshold
::Settings.server.prefetch_stale_threshold.to_i_with_method
end
def prefetch_below_threshold?(queue_name, wcount)
@queue_messages_lock.synchronize(:SH) do
return false unless @queue_messages.key_path?(queue_name, :messages)
return (@queue_messages[queue_name][:messages].length <= (::Settings.server.prefetch_min_per_worker_dequeue * wcount))
end
end
def prefetch_stale?(queue_name)
@queue_messages_lock.synchronize(:SH) do
return true if @queue_messages[queue_name].nil?
return ((Time.now.utc - @queue_messages[queue_name][:timestamp]) > prefetch_stale_threshold)
end
end
def prefetch_has_lower_priority_than_miq_queue?(queue_name)
@queue_messages_lock.synchronize(:SH) do
return true if @queue_messages[queue_name].nil? || @queue_messages[queue_name][:messages].nil?
msg = @queue_messages[queue_name][:messages].first
return true if msg.nil?
return peek(queue_name, MiqQueue.priority(msg[:priority], :higher, 1), 1).any?
end
end
def get_worker_count_and_priority_by_queue_name
queue_names = {}
@workers_lock.synchronize(:SH) do
@workers.each do |_pid, w|
next if w[:queue_name].nil?
next if w[:class].nil?
next unless get_worker_dequeue_method(w[:class]) == :drb
options = (queue_names[w[:queue_name]] ||= [0, MiqQueue::MAX_PRIORITY])
options[0] += 1
options[1] = MiqQueue.lower_priority(get_queue_priority_for_worker(w), options[1])
end
end unless @workers_lock.nil?
queue_names
end
def populate_queue_messages
queue_names = get_worker_count_and_priority_by_queue_name
@queue_messages_lock.synchronize(:EX) do
queue_names.each do |queue_name, (wcount, priority)|
if prefetch_below_threshold?(queue_name, wcount) || prefetch_stale?(queue_name) || prefetch_has_lower_priority_than_miq_queue?(queue_name)
@queue_messages[queue_name] ||= {}
@queue_messages[queue_name][:timestamp] = Time.now.utc
@queue_messages[queue_name][:messages] = peek(queue_name, priority, (::Settings.server.prefetch_max_per_worker_dequeue * wcount)).collect do |q|
{:id => q.id, :lock_version => q.lock_version, :priority => q.priority, :role => q.role}
end
_log.info("Fetched #{@queue_messages[queue_name][:messages].length} miq_queue rows for queue_name=#{queue_name}, wcount=#{wcount.inspect}, priority=#{priority.inspect}") if @queue_messages[queue_name][:messages].length > 0
end
end
end
end
end