forked from ManageIQ/manageiq
-
Notifications
You must be signed in to change notification settings - Fork 1
/
dequeue.rb
138 lines (112 loc) · 4.6 KB
/
dequeue.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
module MiqServer::WorkerManagement::Dequeue
extend ActiveSupport::Concern
def peek(queue_name, priority, limit)
MiqQueue.peek(
:conditions => {:queue_name => queue_name, :priority => priority, :role => @active_role_names},
:select => "id, lock_version, priority, role",
:limit => limit
)
end
def get_worker_dequeue_method(worker_class)
(@child_worker_settings[worker_class.settings_name][:dequeue_method] || :drb).to_sym
end
def reset_queue_messages
@queue_messages_lock.synchronize(:EX) do
@queue_messages = {}
end
end
def get_queue_priority_for_worker(w)
w[:class].respond_to?(:queue_priority) ? w[:class].queue_priority : MiqQueue::MIN_PRIORITY
end
def get_queue_message_for_worker(w)
return nil if w.nil? || w[:queue_name].nil?
@queue_messages_lock.synchronize(:EX) do
queue_name = w[:queue_name]
queue_hash = @queue_messages[queue_name]
return nil unless queue_hash.kind_of?(Hash)
messages = queue_hash[:messages]
return nil unless messages.kind_of?(Array)
messages.each_index do |index|
msg = messages[index]
next if msg.nil?
next if MiqQueue.lower_priority?(msg[:priority], get_queue_priority_for_worker(w))
next unless w[:class].required_roles.blank? || msg[:role].blank? || Array.wrap(w[:class].required_roles).include?(msg[:role])
return messages.delete_at(index)
end
return nil
end
end
def get_queue_message(pid)
unless @workers_lock.nil?
@workers_lock.synchronize(:SH) do
w = @workers[pid]
msg = get_queue_message_for_worker(w)
[msg[:id], msg[:lock_version]] if msg
end
end
end
def prefetch_stale_threshold
::Settings.server.prefetch_stale_threshold.to_i_with_method
end
def prefetch_below_threshold?(queue_name, wcount)
@queue_messages_lock.synchronize(:SH) do
return false unless @queue_messages.key_path?(queue_name, :messages)
return (@queue_messages[queue_name][:messages].length <= (::Settings.server.prefetch_min_per_worker_dequeue * wcount))
end
end
def prefetch_stale?(queue_name)
@queue_messages_lock.synchronize(:SH) do
return true if @queue_messages[queue_name].nil?
return ((Time.now.utc - @queue_messages[queue_name][:timestamp]) > prefetch_stale_threshold)
end
end
def prefetch_has_lower_priority_than_miq_queue?(queue_name)
@queue_messages_lock.synchronize(:SH) do
return true if @queue_messages[queue_name].nil? || @queue_messages[queue_name][:messages].nil?
msg = @queue_messages[queue_name][:messages].first
return true if msg.nil?
return peek(queue_name, MiqQueue.priority(msg[:priority], :higher, 1), 1).any?
end
end
def get_worker_count_and_priority_by_queue_name
queue_names = {}
unless @workers_lock.nil?
@workers_lock.synchronize(:SH) do
@workers.each do |_pid, w|
next if w[:queue_name].nil?
next if w[:class].nil?
next unless get_worker_dequeue_method(w[:class]) == :drb
options = (queue_names[w[:queue_name]] ||= [0, MiqQueue::MAX_PRIORITY])
options[0] += 1
options[1] = MiqQueue.lower_priority(get_queue_priority_for_worker(w), options[1])
end
end
end
queue_names
end
def register_worker(worker_pid, worker_class, queue_name)
worker_class = worker_class.constantize if worker_class.kind_of?(String)
unless @workers_lock.nil?
@workers_lock.synchronize(:EX) do
worker_add(worker_pid)
h = @workers[worker_pid]
h[:class] ||= worker_class
h[:queue_name] ||= queue_name
end
end
end
def populate_queue_messages
queue_names = get_worker_count_and_priority_by_queue_name
@queue_messages_lock.synchronize(:EX) do
queue_names.each do |queue_name, (wcount, priority)|
next unless prefetch_below_threshold?(queue_name, wcount) || prefetch_stale?(queue_name) || prefetch_has_lower_priority_than_miq_queue?(queue_name)
@queue_messages[queue_name] ||= {}
@queue_messages[queue_name][:timestamp] = Time.now.utc
@queue_messages[queue_name][:messages] = peek(queue_name, priority, (::Settings.server.prefetch_max_per_worker_dequeue * wcount)).collect do |q|
{:id => q.id, :lock_version => q.lock_version, :priority => q.priority, :role => q.role}
end
_log.info("Fetched #{@queue_messages[queue_name][:messages].length} miq_queue rows for queue_name=#{queue_name}, wcount=#{wcount.inspect}, priority=#{priority.inspect}") if @queue_messages[queue_name][:messages].length > 0
end
end
end
end