diff --git a/lib/extensions/ar_migration.rb b/lib/extensions/ar_migration.rb index c66ecb19ea49..720af38c2d8e 100644 --- a/lib/extensions/ar_migration.rb +++ b/lib/extensions/ar_migration.rb @@ -1,43 +1,9 @@ module ArPglogicalMigration - class PglogicalMigrationHelper + module PglogicalMigrationHelper def self.migrations_column_present? ActiveRecord::Base.connection.columns("miq_regions").any? { |c| c.name == "migrations_ran" } end - def self.log_and_print(message) - if @current_message == message - print "." - else - Vmdb.rails_logger.info(message) - print message - end - @current_message = message - end - - class HelperARClass < ActiveRecord::Base; end - - def self.restart_subscription(s) - c = HelperARClass.establish_connection.connection - c.pglogical.subscription_disable(s.id) - c.pglogical.subscription_enable(s.id) - ensure - HelperARClass.remove_connection - end - - def self.wait_for_remote_region_migration(subscription, version, wait_time = 1) - return unless migrations_column_present? - region = MiqRegion.find_by(:region => subscription.provider_region) - waited = false - until region.migrations_ran&.include?(version) - waited = true - log_and_print("Waiting for remote region #{region.region} to run migration #{version}") - restart_subscription(subscription) - sleep(wait_time) - region.reload - end - puts "\n" if waited - end - def self.update_local_migrations_ran(version, direction) return unless migrations_column_present? return unless (region = MiqRegion.my_region) @@ -54,13 +20,63 @@ def self.update_local_migrations_ran(version, direction) end end + class RemoteRegionMigrationWatcher + class HelperARClass < ActiveRecord::Base; end + + attr_reader :region, :subscription, :version + + def initialize(subscription, version) + @region = MiqRegion.find_by(:region => subscription.provider_region) + @subscription = subscription + @version = version + end + + def wait_for_remote_region_migration(wait_time = 1) + return unless wait_for_migration? + + Vmdb.rails_logger.info(wait_message) + print(wait_message) + + while wait_for_migration? + print(".") + restart_subscription + sleep(wait_time) + region.reload + end + + puts("\n") + end + + private + + def wait_for_migration? + migrations_column_present? ? !region.migrations_ran&.include?(version) : false + end + + def migrations_column_present? + @migrations_column_present ||= PglogicalMigrationHelper.migrations_column_present? + end + + def wait_message + @wait_message ||= "Waiting for remote region #{region.region} to run migration #{version}" + end + + def restart_subscription + c = HelperARClass.establish_connection.connection + c.pglogical.subscription_disable(subscription.id) + c.pglogical.subscription_enable(subscription.id) + ensure + HelperARClass.remove_connection + end + end + def migrate(direction) PglogicalSubscription.all.each do |s| - ArPglogicalMigrationHelper.wait_for_remote_region_migration(s, version.to_s) + RemoteRegionMigrationWatcher.new(s, version.to_s).wait_for_remote_region_migration end ret = super - ArPglogicalMigrationHelper.update_local_migrations_ran(version.to_s, direction) + PglogicalMigrationHelper.update_local_migrations_ran(version.to_s, direction) ret end end diff --git a/spec/lib/extensions/ar_migration_spec.rb b/spec/lib/extensions/ar_migration_spec.rb index 929f11d0945b..37371fb767e7 100644 --- a/spec/lib/extensions/ar_migration_spec.rb +++ b/spec/lib/extensions/ar_migration_spec.rb @@ -1,63 +1,94 @@ -describe ArPglogicalMigration::PglogicalMigrationHelper do +shared_context "without the migrations ran column" do + before do + column_list = %w(id region created_at updated_at description guid).map { |n| double(:name => n) } + allow(ActiveRecord::Base.connection).to receive(:columns).with("miq_regions").and_return(column_list) + end +end + +shared_context "with a dummy version" do + let(:version) { "1234567890" } + + # sanity check - if this is somehow a version we have, these tests will make no sense + before { expect(my_region.migrations_ran).not_to include(version) } +end + +context "with a region seeded" do let!(:my_region) do MiqRegion.seed MiqRegion.my_region end - before { allow(described_class).to receive_messages(:puts => nil, :print => nil) } + describe ArPglogicalMigration::PglogicalMigrationHelper do + context "without the migrations ran column" do + include_context "without the migrations ran column" - context "without the migrations ran column" do - before do - column_list = %w(id region created_at updated_at description guid).map { |n| double(:name => n) } - allow(ActiveRecord::Base.connection).to receive(:columns).with("miq_regions").and_return(column_list) - end + describe ".migrations_column_present?" do + it "is falsey" do + expect(described_class.migrations_column_present?).to be_falsey + end + end - describe ".migrations_column_present?" do - it "is falsey" do - expect(described_class.migrations_column_present?).to be_falsey + describe ".update_local_migrations_ran" do + it "does nothing" do + expect(MiqRegion).not_to receive(:my_region) + described_class.update_local_migrations_ran("12345", :up) + end end end - describe ".wait_for_remote_region_migrations" do - it "does nothing" do - expect(MiqRegion).not_to receive(:find) - described_class.wait_for_remote_region_migration(double("subscription"), "12345") + describe ".migrations_column_present?" do + it "is truthy" do + # we never want to remove this column so we can just test directly + expect(described_class.migrations_column_present?).to be_truthy end end describe ".update_local_migrations_ran" do - it "does nothing" do - expect(MiqRegion).not_to receive(:my_region) - described_class.update_local_migrations_ran("12345", :up) + include_context "with a dummy version" + + it "adds the given version when the direction is :up" do + described_class.update_local_migrations_ran(version, :up) + expect(my_region.reload.migrations_ran).to match_array(ActiveRecord::SchemaMigration.normalized_versions << version) end - end - end - describe ".migrations_column_present?" do - it "is truthy" do - # we never want to remove this column so we can just test directly - expect(described_class.migrations_column_present?).to be_truthy + it "doesn't blow up when there is no region" do + MiqRegion.destroy_all + MiqRegion.my_region_clear_cache + described_class.update_local_migrations_ran(version, :up) + end end end - context "with a dummy version" do - let(:version) { "1234567890" } + describe ArPglogicalMigration::RemoteRegionMigrationWatcher do + include_context "with a dummy version" + + let(:subscription) { double("Subscription", :enable => nil, :disable => nil, :provider_region => my_region.region) } - # sanity check - if this is somehow a version we have, these tests will make no sense - before { expect(my_region.migrations_ran).not_to include(version) } + subject do + described_class.new(subscription, version).tap do |s| + allow(s).to receive_messages(:puts => nil, :print => nil) + end + end - describe ".wait_for_remote_region_migration" do - let(:subscription) { double("Subscription", :enable => nil, :disable => nil, :provider_region => my_region.region) } + describe "#wait_for_remote_region_migrations" do + context "without the migrations ran column present" do + include_context "without the migrations ran column" + + it "does nothing" do + expect(Vmdb.rails_logger).not_to receive(:info) + subject.wait_for_remote_region_migration + end + end it "sleeps until the migration is added" do - allow(described_class).to receive(:restart_subscription) - my_region.update_attributes!(:migrations_ran => nil) + allow(subject).to receive(:restart_subscription) + allow(subject.region).to receive(:reload) + + subject.region.update_attributes!(:migrations_ran => nil) + t = Thread.new do Thread.current.abort_on_exception = true - # need to stub these because the thread uses a separate connection object which won't be in the same transaction - allow(MiqRegion).to receive(:find_by).with(:region => my_region.region).and_return(my_region) - allow(my_region).to receive(:reload) - described_class.wait_for_remote_region_migration(subscription, version, 0) + subject.wait_for_remote_region_migration(0) end # Try to pass execution to the created thread @@ -68,25 +99,12 @@ sleep 1 expect(t.alive?).to be true - my_region.update_attributes!(:migrations_ran => ActiveRecord::SchemaMigration.normalized_versions << version) + subject.region.update_attributes!(:migrations_ran => ActiveRecord::SchemaMigration.normalized_versions << version) # Wait a max of 5 seconds so we don't disrupt the whole test suite if something terrible happens t = t.join(5) expect(t.status).to be false end end - - describe ".update_local_migrations_ran" do - it "adds the given version when the direction is :up" do - described_class.update_local_migrations_ran(version, :up) - expect(my_region.reload.migrations_ran).to match_array(ActiveRecord::SchemaMigration.normalized_versions << version) - end - - it "doesn't blow up when there is no region" do - MiqRegion.destroy_all - MiqRegion.my_region_clear_cache - described_class.update_local_migrations_ran(version, :up) - end - end end end