Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MiqQueue consistency with purging #14676

Merged
merged 7 commits into from
Apr 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions app/models/drift_state/purging.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,6 @@ def purge_window_size
::Settings.drift_states.history.purge_window_size
end

def purge_timer
purge_queue(*purge_mode_and_value)
end

def purge_queue(mode, value)
MiqQueue.put_or_update(
:class_name => name,
:method_name => "purge",
) { |_msg, item| item.merge(:args => [mode, value]) }
end

def purge_count(mode, value)
send("purge_count_by_#{mode}", value)
end
Expand Down
29 changes: 4 additions & 25 deletions app/models/event_stream/purging.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
class EventStream < ApplicationRecord
module Purging
extend ActiveSupport::Concern
include PurgingMixin

module ClassMethods
def keep_events
Expand All @@ -14,33 +15,11 @@ def purge_date
end

def purge_window_size
::Settings.event_streams.history.purge_window_size || 1000
::Settings.event_streams.history.purge_window_size
end

def purge_timer
purge_queue(purge_date)
end

def purge_queue(ts)
MiqQueue.put(
:class_name => name,
:method_name => "purge",
:role => "event",
:queue_name => "ems",
:args => [ts],
)
end

def purge(older_than, window = nil, limit = nil)
_log.info("Purging #{limit || "all"} events older than [#{older_than}]...")

window ||= purge_window_size

total = where(arel_table[:timestamp].lteq(older_than)).delete_in_batches(window, limit) do |count, _total|
_log.info("Purging #{count} events.")
end

_log.info("Purging #{limit || "all"} events older than [#{older_than}]...Complete - Deleted #{total} records")
def purge_scope(older_than)
where(arel_table[:timestamp].lteq(older_than))
end
end
end
Expand Down
19 changes: 10 additions & 9 deletions app/models/metric/purging.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,11 @@ def self.purge_realtime_timer(ts = nil)
end

def self.purge_timer(ts, interval)
MiqQueue.put_unless_exists(
MiqQueue.put(
:class_name => name,
:method_name => "purge_#{interval}",
:role => "ems_metrics_processor",
:queue_name => "ems_metrics_processor"
) do |_msg, find_options|
find_options.merge(:args => [ts])
end
:args => [ts],
)
end

def self.purge_window_size
Expand Down Expand Up @@ -75,18 +72,22 @@ def self.purge_associated_records(metric_type, ids)
end

def self.purge_daily(older_than, window = nil, total_limit = nil, &block)
purge(older_than, "daily", window, total_limit, &block)
purge_by_date(older_than, "daily", window, total_limit, &block)
end

def self.purge_hourly(older_than, window = nil, total_limit = nil, &block)
purge(older_than, "hourly", window, total_limit, &block)
purge_by_date(older_than, "hourly", window, total_limit, &block)
end

def self.purge_realtime(older_than, window = nil, total_limit = nil, &block)
purge(older_than, "realtime", window, total_limit, &block)
purge_by_date(older_than, "realtime", window, total_limit, &block)
end

def self.purge(older_than, interval, window = nil, total_limit = nil, &block)
purge_by_date(older_than, interval, window, total_limit, &block)
end

def self.purge_by_date(older_than, interval, window = nil, total_limit = nil, &block)
scope = purge_scope(older_than, interval)
window ||= purge_window_size
_log.info("Purging #{total_limit || "all"} #{interval} metrics older than [#{older_than}]...")
Expand Down
13 changes: 0 additions & 13 deletions app/models/miq_report_result/purging.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,6 @@ def purge_window_size
::Settings.reporting.history.purge_window_size
end

def purge_timer
purge_queue(*purge_mode_and_value)
end

def purge_queue(mode, value)
MiqQueue.put_or_update(
:class_name => name,
:method_name => "purge",
:role => "reporting",
:queue_name => "reporting"
) { |_msg, item| item.merge(:args => [mode, value]) }
end

def purge_count(mode, value)
send("purge_count_by_#{mode}", value)
end
Expand Down
16 changes: 16 additions & 0 deletions app/models/mixins/purging_mixin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,22 @@ module PurgingMixin
extend ActiveSupport::Concern

module ClassMethods
def purge_mode_and_value
[:date, purge_date]
end

def purge_timer
purge_queue(*purge_mode_and_value)
end

def purge_queue(mode, value)
MiqQueue.put(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kbrock Role is not specified any more. Is it by intention?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Purging is quick. We perform it as pure sql and do not download tons of records.

So the thought is there is no reason to limit which workers can purge.
But that is my though, wanted your feedback here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, purging is a pure database function and can be done anywhere

:class_name => name,
:method_name => "purge_by_#{mode}",
:args => [value]
)
end

def purge(older_than = nil, window = nil, &block)
purge_by_date(older_than || purge_date, window || purge_window_size, &block)
end
Expand Down
10 changes: 0 additions & 10 deletions app/models/policy_event/purging.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,6 @@ def purge_window_size
::Settings.policy_events.history.purge_window_size
end

def purge_queue
MiqQueue.put_unless_exists(
:class_name => name,
:method_name => "purge",
:role => "event",
:queue_name => "ems"
)
end
alias_method :purge_timer, :purge_queue

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, is the commit message wrong here?

- converted put_or_update to put on general queue
- queuing directly into purge_by_date

Are we now using the mixin's method which does a put? All I see is deleted lines here, no new method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ok, it's coming from the mixin. It wasn't clear when looking at the commit message.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for keeping me honest.

Yes, most of these come from the default implementation

def purge_scope(older_than)
where(arel_table[:timestamp].lt(older_than))
end
Expand Down
11 changes: 4 additions & 7 deletions app/models/vmdb_metric/purging.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,11 @@ def purge_hourly_timer(ts = nil)
end

def purge_timer(value, interval)
MiqQueue.put_unless_exists(
MiqQueue.put(
:class_name => name,
:method_name => "purge_#{interval}",
:role => "database_operations",
:queue_name => "generic",
) do |_msg, find_options|
find_options.merge(:args => [value])
end
:args => [value]
)
end

def purge_window_size
Expand All @@ -59,7 +56,7 @@ def purge_daily(older_than, window = nil, &block)
# queue is calling purge_interval directly. (and mode is no longer used)
# keeping around in case messages are in the queue for upgrades
def purge(_mode, older_than, interval, window = nil, &block)
send("purge_#{interval}", older_than, window, &block)
purge_by_date(older_than, interval, window, &block)
end

private
Expand Down
21 changes: 4 additions & 17 deletions spec/models/drift_state/purging_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@
expect(q.length).to eq(1)
expect(q.first).to have_attributes(
:class_name => described_class.name,
:method_name => "purge"
:method_name => "purge_by_date"
)
expect(q.first.args[0]).to eq(:date)
expect(q.first.args[1]).to be_same_time_as 6.months.to_i.seconds.ago.utc
expect(q.first.args[0]).to be_same_time_as 6.months.to_i.seconds.ago.utc
end
end

Expand All @@ -54,20 +53,8 @@
expect(q.length).to eq(1)
expect(q.first).to have_attributes(
:class_name => described_class.name,
:method_name => "purge",
:args => [:remaining, 1]
)
end

it "with item already in the queue" do
described_class.purge_queue(:remaining, 2)

q = MiqQueue.all
expect(q.length).to eq(1)
expect(q.first).to have_attributes(
:class_name => described_class.name,
:method_name => "purge",
:args => [:remaining, 2]
:method_name => "purge_by_remaining",
:args => [1]
)
end
end
Expand Down
24 changes: 5 additions & 19 deletions spec/models/event_stream/purging_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,29 @@
let(:purge_time) { (Time.zone.now + 10).round }

it "submits to the queue" do
described_class.purge_queue(purge_time)
expect(described_class).to receive(:purge_date).and_return(purge_time)
described_class.purge_timer

q = MiqQueue.all
expect(q.length).to eq(1)
expect(q.first).to have_attributes(
:class_name => described_class.name,
:method_name => "purge",
:method_name => "purge_by_date",
:args => [purge_time]
)
end
end

context ".purge_date" do
it "using '3.month' syntax" do
allow(described_class).to receive_messages(:keep_events => "3.months")
stub_settings(:event_streams => {:history => {:keep_events => "3.months"}})

# Exposes 3.months.seconds.ago.utc != 3.months.ago.utc
expect(described_class.purge_date).to be_within(2.days).of(3.months.ago.utc)
end

it "defaults to 6 months" do
allow(described_class).to receive_messages(:keep_events => nil)
stub_settings(:event_streams => {:history => {:keep_events => nil}})
expect(described_class.purge_date).to be_within(1.day).of(6.months.ago.utc)
end
end
Expand All @@ -55,21 +56,6 @@ def assert_unpurged_ids(unpurged_ids)
described_class.purge(purge_date, 1)
assert_unpurged_ids(@new_event.id)
end

it "with a limit" do
described_class.purge(purge_date, nil, 1)
assert_unpurged_ids([@purge_date_event.id, @new_event.id])
end

it "with window > limit" do
described_class.purge(purge_date, 2, 1)
assert_unpurged_ids([@purge_date_event.id, @new_event.id])
end

it "with limit > window" do
described_class.purge(purge_date, 1, 2)
assert_unpurged_ids(@new_event.id)
end
end
end
end
21 changes: 4 additions & 17 deletions spec/models/miq_report_result/purging_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,10 @@
expect(q.length).to eq(1)
expect(q.first).to have_attributes(
:class_name => described_class.name,
:method_name => "purge"
:method_name => "purge_by_date"
)

expect(q.first.args[0]).to eq(:date)
expect(q.first.args[1]).to be_same_time_as 6.months.to_i.seconds.ago.utc
expect(q.first.args[0]).to be_same_time_as 6.months.to_i.seconds.ago.utc
end
end

Expand All @@ -83,20 +82,8 @@
expect(q.length).to eq(1)
expect(q.first).to have_attributes(
:class_name => described_class.name,
:method_name => "purge",
:args => [:remaining, 1]
)
end

it "with item already in the queue" do
described_class.purge_queue(:remaining, 2)

q = MiqQueue.all
expect(q.length).to eq(1)
expect(q.first).to have_attributes(
:class_name => described_class.name,
:method_name => "purge",
:args => [:remaining, 2]
:method_name => "purge_by_remaining",
:args => [1]
)
end
end
Expand Down
8 changes: 8 additions & 0 deletions spec/models/mixins/purging_mixin_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@
end
end

describe ".purge_mode_and_value" do
it "purge_mode_and_value should return proper options" do
stub_settings(:policy_events => {:history => {:keep_policy_events => 120}})
expect(example_class.purge_mode_and_value.first).to eq(:date)
expect(example_class.purge_mode_and_value.last).to be_within(1.second).of(120.seconds.ago.utc)
end
end

describe ".purge" do
let(:events) do
(-2..2).collect do |date_modifier|
Expand Down
18 changes: 3 additions & 15 deletions spec/models/policy_event/purging_spec.rb
Original file line number Diff line number Diff line change
@@ -1,30 +1,18 @@
describe PolicyEvent do
context "::Purging" do
context ".purge_queue" do
context ".purge_timer" do
before do
EvmSpecHelper.create_guid_miq_server_zone
end

it "with nothing in the queue" do
described_class.purge_queue
described_class.purge_timer

q = MiqQueue.all
expect(q.length).to eq(1)
expect(q.first).to have_attributes(
:class_name => described_class.name,
:method_name => "purge"
)
end

it "with item already in the queue" do
described_class.purge_queue
described_class.purge_queue

q = MiqQueue.all
expect(q.length).to eq(1)
expect(q.first).to have_attributes(
:class_name => described_class.name,
:method_name => "purge"
:method_name => "purge_by_date"
)
end
end
Expand Down
1 change: 0 additions & 1 deletion spec/models/vmdb_metric_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
it "should purge" do
expect do
VmdbMetric.purge_daily_timer
VmdbMetric.purge_daily_timer
end.to change { MiqQueue.count }.by(1)
end
end