Skip to content

Commit

Permalink
Merge pull request #14676 from kbrock/miq_queue_purge_es
Browse files Browse the repository at this point in the history
MiqQueue consistency with purging
  • Loading branch information
Fryguy authored Apr 10, 2017
2 parents 8c79fe3 + ecb49ef commit 932ac57
Show file tree
Hide file tree
Showing 13 changed files with 58 additions and 144 deletions.
11 changes: 0 additions & 11 deletions app/models/drift_state/purging.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,6 @@ def purge_window_size
::Settings.drift_states.history.purge_window_size
end

def purge_timer
purge_queue(*purge_mode_and_value)
end

def purge_queue(mode, value)
MiqQueue.put_or_update(
:class_name => name,
:method_name => "purge",
) { |_msg, item| item.merge(:args => [mode, value]) }
end

def purge_count(mode, value)
send("purge_count_by_#{mode}", value)
end
Expand Down
29 changes: 4 additions & 25 deletions app/models/event_stream/purging.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
class EventStream < ApplicationRecord
module Purging
extend ActiveSupport::Concern
include PurgingMixin

module ClassMethods
def keep_events
Expand All @@ -14,33 +15,11 @@ def purge_date
end

def purge_window_size
::Settings.event_streams.history.purge_window_size || 1000
::Settings.event_streams.history.purge_window_size
end

def purge_timer
purge_queue(purge_date)
end

def purge_queue(ts)
MiqQueue.put(
:class_name => name,
:method_name => "purge",
:role => "event",
:queue_name => "ems",
:args => [ts],
)
end

def purge(older_than, window = nil, limit = nil)
_log.info("Purging #{limit || "all"} events older than [#{older_than}]...")

window ||= purge_window_size

total = where(arel_table[:timestamp].lteq(older_than)).delete_in_batches(window, limit) do |count, _total|
_log.info("Purging #{count} events.")
end

_log.info("Purging #{limit || "all"} events older than [#{older_than}]...Complete - Deleted #{total} records")
def purge_scope(older_than)
where(arel_table[:timestamp].lteq(older_than))
end
end
end
Expand Down
19 changes: 10 additions & 9 deletions app/models/metric/purging.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,11 @@ def self.purge_realtime_timer(ts = nil)
end

def self.purge_timer(ts, interval)
MiqQueue.put_unless_exists(
MiqQueue.put(
:class_name => name,
:method_name => "purge_#{interval}",
:role => "ems_metrics_processor",
:queue_name => "ems_metrics_processor"
) do |_msg, find_options|
find_options.merge(:args => [ts])
end
:args => [ts],
)
end

def self.purge_window_size
Expand Down Expand Up @@ -75,18 +72,22 @@ def self.purge_associated_records(metric_type, ids)
end

def self.purge_daily(older_than, window = nil, total_limit = nil, &block)
purge(older_than, "daily", window, total_limit, &block)
purge_by_date(older_than, "daily", window, total_limit, &block)
end

def self.purge_hourly(older_than, window = nil, total_limit = nil, &block)
purge(older_than, "hourly", window, total_limit, &block)
purge_by_date(older_than, "hourly", window, total_limit, &block)
end

def self.purge_realtime(older_than, window = nil, total_limit = nil, &block)
purge(older_than, "realtime", window, total_limit, &block)
purge_by_date(older_than, "realtime", window, total_limit, &block)
end

def self.purge(older_than, interval, window = nil, total_limit = nil, &block)
purge_by_date(older_than, interval, window, total_limit, &block)
end

def self.purge_by_date(older_than, interval, window = nil, total_limit = nil, &block)
scope = purge_scope(older_than, interval)
window ||= purge_window_size
_log.info("Purging #{total_limit || "all"} #{interval} metrics older than [#{older_than}]...")
Expand Down
13 changes: 0 additions & 13 deletions app/models/miq_report_result/purging.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,6 @@ def purge_window_size
::Settings.reporting.history.purge_window_size
end

def purge_timer
purge_queue(*purge_mode_and_value)
end

def purge_queue(mode, value)
MiqQueue.put_or_update(
:class_name => name,
:method_name => "purge",
:role => "reporting",
:queue_name => "reporting"
) { |_msg, item| item.merge(:args => [mode, value]) }
end

def purge_count(mode, value)
send("purge_count_by_#{mode}", value)
end
Expand Down
16 changes: 16 additions & 0 deletions app/models/mixins/purging_mixin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,22 @@ module PurgingMixin
extend ActiveSupport::Concern

module ClassMethods
def purge_mode_and_value
[:date, purge_date]
end

def purge_timer
purge_queue(*purge_mode_and_value)
end

def purge_queue(mode, value)
MiqQueue.put(
:class_name => name,
:method_name => "purge_by_#{mode}",
:args => [value]
)
end

def purge(older_than = nil, window = nil, &block)
purge_by_date(older_than || purge_date, window || purge_window_size, &block)
end
Expand Down
10 changes: 0 additions & 10 deletions app/models/policy_event/purging.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,6 @@ def purge_window_size
::Settings.policy_events.history.purge_window_size
end

def purge_queue
MiqQueue.put_unless_exists(
:class_name => name,
:method_name => "purge",
:role => "event",
:queue_name => "ems"
)
end
alias_method :purge_timer, :purge_queue

def purge_scope(older_than)
where(arel_table[:timestamp].lt(older_than))
end
Expand Down
11 changes: 4 additions & 7 deletions app/models/vmdb_metric/purging.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,11 @@ def purge_hourly_timer(ts = nil)
end

def purge_timer(value, interval)
MiqQueue.put_unless_exists(
MiqQueue.put(
:class_name => name,
:method_name => "purge_#{interval}",
:role => "database_operations",
:queue_name => "generic",
) do |_msg, find_options|
find_options.merge(:args => [value])
end
:args => [value]
)
end

def purge_window_size
Expand All @@ -59,7 +56,7 @@ def purge_daily(older_than, window = nil, &block)
# queue is calling purge_interval directly. (and mode is no longer used)
# keeping around in case messages are in the queue for upgrades
def purge(_mode, older_than, interval, window = nil, &block)
send("purge_#{interval}", older_than, window, &block)
purge_by_date(older_than, interval, window, &block)
end

private
Expand Down
21 changes: 4 additions & 17 deletions spec/models/drift_state/purging_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@
expect(q.length).to eq(1)
expect(q.first).to have_attributes(
:class_name => described_class.name,
:method_name => "purge"
:method_name => "purge_by_date"
)
expect(q.first.args[0]).to eq(:date)
expect(q.first.args[1]).to be_same_time_as 6.months.to_i.seconds.ago.utc
expect(q.first.args[0]).to be_same_time_as 6.months.to_i.seconds.ago.utc
end
end

Expand All @@ -54,20 +53,8 @@
expect(q.length).to eq(1)
expect(q.first).to have_attributes(
:class_name => described_class.name,
:method_name => "purge",
:args => [:remaining, 1]
)
end

it "with item already in the queue" do
described_class.purge_queue(:remaining, 2)

q = MiqQueue.all
expect(q.length).to eq(1)
expect(q.first).to have_attributes(
:class_name => described_class.name,
:method_name => "purge",
:args => [:remaining, 2]
:method_name => "purge_by_remaining",
:args => [1]
)
end
end
Expand Down
24 changes: 5 additions & 19 deletions spec/models/event_stream/purging_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,29 @@
let(:purge_time) { (Time.zone.now + 10).round }

it "submits to the queue" do
described_class.purge_queue(purge_time)
expect(described_class).to receive(:purge_date).and_return(purge_time)
described_class.purge_timer

q = MiqQueue.all
expect(q.length).to eq(1)
expect(q.first).to have_attributes(
:class_name => described_class.name,
:method_name => "purge",
:method_name => "purge_by_date",
:args => [purge_time]
)
end
end

context ".purge_date" do
it "using '3.month' syntax" do
allow(described_class).to receive_messages(:keep_events => "3.months")
stub_settings(:event_streams => {:history => {:keep_events => "3.months"}})

# Exposes 3.months.seconds.ago.utc != 3.months.ago.utc
expect(described_class.purge_date).to be_within(2.days).of(3.months.ago.utc)
end

it "defaults to 6 months" do
allow(described_class).to receive_messages(:keep_events => nil)
stub_settings(:event_streams => {:history => {:keep_events => nil}})
expect(described_class.purge_date).to be_within(1.day).of(6.months.ago.utc)
end
end
Expand All @@ -55,21 +56,6 @@ def assert_unpurged_ids(unpurged_ids)
described_class.purge(purge_date, 1)
assert_unpurged_ids(@new_event.id)
end

it "with a limit" do
described_class.purge(purge_date, nil, 1)
assert_unpurged_ids([@purge_date_event.id, @new_event.id])
end

it "with window > limit" do
described_class.purge(purge_date, 2, 1)
assert_unpurged_ids([@purge_date_event.id, @new_event.id])
end

it "with limit > window" do
described_class.purge(purge_date, 1, 2)
assert_unpurged_ids(@new_event.id)
end
end
end
end
21 changes: 4 additions & 17 deletions spec/models/miq_report_result/purging_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,10 @@
expect(q.length).to eq(1)
expect(q.first).to have_attributes(
:class_name => described_class.name,
:method_name => "purge"
:method_name => "purge_by_date"
)

expect(q.first.args[0]).to eq(:date)
expect(q.first.args[1]).to be_same_time_as 6.months.to_i.seconds.ago.utc
expect(q.first.args[0]).to be_same_time_as 6.months.to_i.seconds.ago.utc
end
end

Expand All @@ -83,20 +82,8 @@
expect(q.length).to eq(1)
expect(q.first).to have_attributes(
:class_name => described_class.name,
:method_name => "purge",
:args => [:remaining, 1]
)
end

it "with item already in the queue" do
described_class.purge_queue(:remaining, 2)

q = MiqQueue.all
expect(q.length).to eq(1)
expect(q.first).to have_attributes(
:class_name => described_class.name,
:method_name => "purge",
:args => [:remaining, 2]
:method_name => "purge_by_remaining",
:args => [1]
)
end
end
Expand Down
8 changes: 8 additions & 0 deletions spec/models/mixins/purging_mixin_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@
end
end

describe ".purge_mode_and_value" do
it "purge_mode_and_value should return proper options" do
stub_settings(:policy_events => {:history => {:keep_policy_events => 120}})
expect(example_class.purge_mode_and_value.first).to eq(:date)
expect(example_class.purge_mode_and_value.last).to be_within(1.second).of(120.seconds.ago.utc)
end
end

describe ".purge" do
let(:events) do
(-2..2).collect do |date_modifier|
Expand Down
18 changes: 3 additions & 15 deletions spec/models/policy_event/purging_spec.rb
Original file line number Diff line number Diff line change
@@ -1,30 +1,18 @@
describe PolicyEvent do
context "::Purging" do
context ".purge_queue" do
context ".purge_timer" do
before do
EvmSpecHelper.create_guid_miq_server_zone
end

it "with nothing in the queue" do
described_class.purge_queue
described_class.purge_timer

q = MiqQueue.all
expect(q.length).to eq(1)
expect(q.first).to have_attributes(
:class_name => described_class.name,
:method_name => "purge"
)
end

it "with item already in the queue" do
described_class.purge_queue
described_class.purge_queue

q = MiqQueue.all
expect(q.length).to eq(1)
expect(q.first).to have_attributes(
:class_name => described_class.name,
:method_name => "purge"
:method_name => "purge_by_date"
)
end
end
Expand Down
1 change: 0 additions & 1 deletion spec/models/vmdb_metric_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
it "should purge" do
expect do
VmdbMetric.purge_daily_timer
VmdbMetric.purge_daily_timer
end.to change { MiqQueue.count }.by(1)
end
end

0 comments on commit 932ac57

Please sign in to comment.