diff --git a/app/models/drift_state/purging.rb b/app/models/drift_state/purging.rb index a31f7e3b6d4..d861f7e8f9f 100644 --- a/app/models/drift_state/purging.rb +++ b/app/models/drift_state/purging.rb @@ -14,17 +14,6 @@ def purge_window_size ::Settings.drift_states.history.purge_window_size end - def purge_timer - purge_queue(*purge_mode_and_value) - end - - def purge_queue(mode, value) - MiqQueue.put_or_update( - :class_name => name, - :method_name => "purge", - ) { |_msg, item| item.merge(:args => [mode, value]) } - end - def purge_count(mode, value) send("purge_count_by_#{mode}", value) end diff --git a/app/models/event_stream/purging.rb b/app/models/event_stream/purging.rb index 68ca4d1d3a4..5baa19df321 100644 --- a/app/models/event_stream/purging.rb +++ b/app/models/event_stream/purging.rb @@ -1,6 +1,7 @@ class EventStream < ApplicationRecord module Purging extend ActiveSupport::Concern + include PurgingMixin module ClassMethods def keep_events @@ -14,33 +15,11 @@ def purge_date end def purge_window_size - ::Settings.event_streams.history.purge_window_size || 1000 + ::Settings.event_streams.history.purge_window_size end - def purge_timer - purge_queue(purge_date) - end - - def purge_queue(ts) - MiqQueue.put( - :class_name => name, - :method_name => "purge", - :role => "event", - :queue_name => "ems", - :args => [ts], - ) - end - - def purge(older_than, window = nil, limit = nil) - _log.info("Purging #{limit || "all"} events older than [#{older_than}]...") - - window ||= purge_window_size - - total = where(arel_table[:timestamp].lteq(older_than)).delete_in_batches(window, limit) do |count, _total| - _log.info("Purging #{count} events.") - end - - _log.info("Purging #{limit || "all"} events older than [#{older_than}]...Complete - Deleted #{total} records") + def purge_scope(older_than) + where(arel_table[:timestamp].lteq(older_than)) end end end diff --git a/app/models/metric/purging.rb b/app/models/metric/purging.rb index d33f09d7386..c7c5d4d6e74 100644 --- a/app/models/metric/purging.rb +++ b/app/models/metric/purging.rb @@ -36,14 +36,11 @@ def self.purge_realtime_timer(ts = nil) end def self.purge_timer(ts, interval) - MiqQueue.put_unless_exists( + MiqQueue.put( :class_name => name, :method_name => "purge_#{interval}", - :role => "ems_metrics_processor", - :queue_name => "ems_metrics_processor" - ) do |_msg, find_options| - find_options.merge(:args => [ts]) - end + :args => [ts], + ) end def self.purge_window_size @@ -75,18 +72,22 @@ def self.purge_associated_records(metric_type, ids) end def self.purge_daily(older_than, window = nil, total_limit = nil, &block) - purge(older_than, "daily", window, total_limit, &block) + purge_by_date(older_than, "daily", window, total_limit, &block) end def self.purge_hourly(older_than, window = nil, total_limit = nil, &block) - purge(older_than, "hourly", window, total_limit, &block) + purge_by_date(older_than, "hourly", window, total_limit, &block) end def self.purge_realtime(older_than, window = nil, total_limit = nil, &block) - purge(older_than, "realtime", window, total_limit, &block) + purge_by_date(older_than, "realtime", window, total_limit, &block) end def self.purge(older_than, interval, window = nil, total_limit = nil, &block) + purge_by_date(older_than, interval, window, total_limit, &block) + end + + def self.purge_by_date(older_than, interval, window = nil, total_limit = nil, &block) scope = purge_scope(older_than, interval) window ||= purge_window_size _log.info("Purging #{total_limit || "all"} #{interval} metrics older than [#{older_than}]...") diff --git a/app/models/miq_report_result/purging.rb b/app/models/miq_report_result/purging.rb index da757fcf373..934043aa9d1 100644 --- a/app/models/miq_report_result/purging.rb +++ b/app/models/miq_report_result/purging.rb @@ -14,19 +14,6 @@ def purge_window_size ::Settings.reporting.history.purge_window_size end - def purge_timer - purge_queue(*purge_mode_and_value) - end - - def purge_queue(mode, value) - MiqQueue.put_or_update( - :class_name => name, - :method_name => "purge", - :role => "reporting", - :queue_name => "reporting" - ) { |_msg, item| item.merge(:args => [mode, value]) } - end - def purge_count(mode, value) send("purge_count_by_#{mode}", value) end diff --git a/app/models/mixins/purging_mixin.rb b/app/models/mixins/purging_mixin.rb index 7721707da31..08fad135913 100644 --- a/app/models/mixins/purging_mixin.rb +++ b/app/models/mixins/purging_mixin.rb @@ -43,6 +43,22 @@ module PurgingMixin extend ActiveSupport::Concern module ClassMethods + def purge_mode_and_value + [:date, purge_date] + end + + def purge_timer + purge_queue(*purge_mode_and_value) + end + + def purge_queue(mode, value) + MiqQueue.put( + :class_name => name, + :method_name => "purge_by_#{mode}", + :args => [value] + ) + end + def purge(older_than = nil, window = nil, &block) purge_by_date(older_than || purge_date, window || purge_window_size, &block) end diff --git a/app/models/policy_event/purging.rb b/app/models/policy_event/purging.rb index f0f0303a053..b0036d3a83c 100644 --- a/app/models/policy_event/purging.rb +++ b/app/models/policy_event/purging.rb @@ -12,16 +12,6 @@ def purge_window_size ::Settings.policy_events.history.purge_window_size end - def purge_queue - MiqQueue.put_unless_exists( - :class_name => name, - :method_name => "purge", - :role => "event", - :queue_name => "ems" - ) - end - alias_method :purge_timer, :purge_queue - def purge_scope(older_than) where(arel_table[:timestamp].lt(older_than)) end diff --git a/app/models/vmdb_metric/purging.rb b/app/models/vmdb_metric/purging.rb index e65b568373e..250a7fe2af5 100644 --- a/app/models/vmdb_metric/purging.rb +++ b/app/models/vmdb_metric/purging.rb @@ -29,14 +29,11 @@ def purge_hourly_timer(ts = nil) end def purge_timer(value, interval) - MiqQueue.put_unless_exists( + MiqQueue.put( :class_name => name, :method_name => "purge_#{interval}", - :role => "database_operations", - :queue_name => "generic", - ) do |_msg, find_options| - find_options.merge(:args => [value]) - end + :args => [value] + ) end def purge_window_size @@ -59,7 +56,7 @@ def purge_daily(older_than, window = nil, &block) # queue is calling purge_interval directly. (and mode is no longer used) # keeping around in case messages are in the queue for upgrades def purge(_mode, older_than, interval, window = nil, &block) - send("purge_#{interval}", older_than, window, &block) + purge_by_date(older_than, interval, window, &block) end private diff --git a/spec/models/drift_state/purging_spec.rb b/spec/models/drift_state/purging_spec.rb index 558f4f7d53a..1444999ead3 100644 --- a/spec/models/drift_state/purging_spec.rb +++ b/spec/models/drift_state/purging_spec.rb @@ -36,10 +36,9 @@ expect(q.length).to eq(1) expect(q.first).to have_attributes( :class_name => described_class.name, - :method_name => "purge" + :method_name => "purge_by_date" ) - expect(q.first.args[0]).to eq(:date) - expect(q.first.args[1]).to be_same_time_as 6.months.to_i.seconds.ago.utc + expect(q.first.args[0]).to be_same_time_as 6.months.to_i.seconds.ago.utc end end @@ -54,20 +53,8 @@ expect(q.length).to eq(1) expect(q.first).to have_attributes( :class_name => described_class.name, - :method_name => "purge", - :args => [:remaining, 1] - ) - end - - it "with item already in the queue" do - described_class.purge_queue(:remaining, 2) - - q = MiqQueue.all - expect(q.length).to eq(1) - expect(q.first).to have_attributes( - :class_name => described_class.name, - :method_name => "purge", - :args => [:remaining, 2] + :method_name => "purge_by_remaining", + :args => [1] ) end end diff --git a/spec/models/event_stream/purging_spec.rb b/spec/models/event_stream/purging_spec.rb index 0c9c75a8b71..c7c3a4949eb 100644 --- a/spec/models/event_stream/purging_spec.rb +++ b/spec/models/event_stream/purging_spec.rb @@ -7,13 +7,14 @@ let(:purge_time) { (Time.zone.now + 10).round } it "submits to the queue" do - described_class.purge_queue(purge_time) + expect(described_class).to receive(:purge_date).and_return(purge_time) + described_class.purge_timer q = MiqQueue.all expect(q.length).to eq(1) expect(q.first).to have_attributes( :class_name => described_class.name, - :method_name => "purge", + :method_name => "purge_by_date", :args => [purge_time] ) end @@ -21,14 +22,14 @@ context ".purge_date" do it "using '3.month' syntax" do - allow(described_class).to receive_messages(:keep_events => "3.months") + stub_settings(:event_streams => {:history => {:keep_events => "3.months"}}) # Exposes 3.months.seconds.ago.utc != 3.months.ago.utc expect(described_class.purge_date).to be_within(2.days).of(3.months.ago.utc) end it "defaults to 6 months" do - allow(described_class).to receive_messages(:keep_events => nil) + stub_settings(:event_streams => {:history => {:keep_events => nil}}) expect(described_class.purge_date).to be_within(1.day).of(6.months.ago.utc) end end @@ -55,21 +56,6 @@ def assert_unpurged_ids(unpurged_ids) described_class.purge(purge_date, 1) assert_unpurged_ids(@new_event.id) end - - it "with a limit" do - described_class.purge(purge_date, nil, 1) - assert_unpurged_ids([@purge_date_event.id, @new_event.id]) - end - - it "with window > limit" do - described_class.purge(purge_date, 2, 1) - assert_unpurged_ids([@purge_date_event.id, @new_event.id]) - end - - it "with limit > window" do - described_class.purge(purge_date, 1, 2) - assert_unpurged_ids(@new_event.id) - end end end end diff --git a/spec/models/miq_report_result/purging_spec.rb b/spec/models/miq_report_result/purging_spec.rb index f4c3ae5cafe..84cf8e03810 100644 --- a/spec/models/miq_report_result/purging_spec.rb +++ b/spec/models/miq_report_result/purging_spec.rb @@ -64,11 +64,10 @@ expect(q.length).to eq(1) expect(q.first).to have_attributes( :class_name => described_class.name, - :method_name => "purge" + :method_name => "purge_by_date" ) - expect(q.first.args[0]).to eq(:date) - expect(q.first.args[1]).to be_same_time_as 6.months.to_i.seconds.ago.utc + expect(q.first.args[0]).to be_same_time_as 6.months.to_i.seconds.ago.utc end end @@ -83,20 +82,8 @@ expect(q.length).to eq(1) expect(q.first).to have_attributes( :class_name => described_class.name, - :method_name => "purge", - :args => [:remaining, 1] - ) - end - - it "with item already in the queue" do - described_class.purge_queue(:remaining, 2) - - q = MiqQueue.all - expect(q.length).to eq(1) - expect(q.first).to have_attributes( - :class_name => described_class.name, - :method_name => "purge", - :args => [:remaining, 2] + :method_name => "purge_by_remaining", + :args => [1] ) end end diff --git a/spec/models/mixins/purging_mixin_spec.rb b/spec/models/mixins/purging_mixin_spec.rb index d2be978364c..1ba7daae583 100644 --- a/spec/models/mixins/purging_mixin_spec.rb +++ b/spec/models/mixins/purging_mixin_spec.rb @@ -10,6 +10,14 @@ end end + describe ".purge_mode_and_value" do + it "purge_mode_and_value should return proper options" do + stub_settings(:policy_events => {:history => {:keep_policy_events => 120}}) + expect(example_class.purge_mode_and_value.first).to eq(:date) + expect(example_class.purge_mode_and_value.last).to be_within(1.second).of(120.seconds.ago.utc) + end + end + describe ".purge" do let(:events) do (-2..2).collect do |date_modifier| diff --git a/spec/models/policy_event/purging_spec.rb b/spec/models/policy_event/purging_spec.rb index f4e4356166c..3c1b5f10bfc 100644 --- a/spec/models/policy_event/purging_spec.rb +++ b/spec/models/policy_event/purging_spec.rb @@ -1,30 +1,18 @@ describe PolicyEvent do context "::Purging" do - context ".purge_queue" do + context ".purge_timer" do before do EvmSpecHelper.create_guid_miq_server_zone end it "with nothing in the queue" do - described_class.purge_queue + described_class.purge_timer q = MiqQueue.all expect(q.length).to eq(1) expect(q.first).to have_attributes( :class_name => described_class.name, - :method_name => "purge" - ) - end - - it "with item already in the queue" do - described_class.purge_queue - described_class.purge_queue - - q = MiqQueue.all - expect(q.length).to eq(1) - expect(q.first).to have_attributes( - :class_name => described_class.name, - :method_name => "purge" + :method_name => "purge_by_date" ) end end diff --git a/spec/models/vmdb_metric_spec.rb b/spec/models/vmdb_metric_spec.rb index e2881c21349..c6a8ea61eff 100644 --- a/spec/models/vmdb_metric_spec.rb +++ b/spec/models/vmdb_metric_spec.rb @@ -6,7 +6,6 @@ it "should purge" do expect do VmdbMetric.purge_daily_timer - VmdbMetric.purge_daily_timer end.to change { MiqQueue.count }.by(1) end end