diff --git a/lib/extensions/ar_migration.rb b/lib/extensions/ar_migration.rb new file mode 100644 index 00000000000..b925faa4e7e --- /dev/null +++ b/lib/extensions/ar_migration.rb @@ -0,0 +1,45 @@ +module MigrationSyncHelper + def self.migrations_column_present? + ActiveRecord::Base.connection.columns("miq_regions").detect { |c| c.name == "migrations_ran" } + end + + def self.wait_for_remote_region_migrations(subscription, version, wait_time = 1) + return unless MigrationSyncHelper.migrations_column_present? + region = MiqRegion.find(subscription.provider_region) + until region.migrations_ran.include?(version) + subscription.disable + subscription.enable + sleep(wait_time) + region.reload + end + end + + def self.update_local_migrations_ran(version, direction) + return unless MigrationSyncHelper.migrations_column_present? + return unless (region = MiqRegion.my_region) + + new_migrations = ActiveRecord::SchemaMigration.normalized_versions + new_migrations << version if direction == :up + migrations_value = ActiveRecord::Base.connection.quote(PG::TextEncoder::Array.new.encode(new_migrations)) + + ActiveRecord::Base.connection.exec_query(<<~SQL) + UPDATE #{MiqRegion.table_name} + SET migrations_ran = #{migrations_value} + WHERE id = #{region.id} + SQL + end +end + +module ArPglogicalMigration + def migrate(direction) + PglogicalSubscription.all.each do |s| + MigrationSyncHelper.wait_for_remote_region_migration(s, version.to_s) + end + + ret = super + MigrationSyncHelper.update_local_migrations_ran(version.to_s, direction) + ret + end +end + +ActiveRecord::Migration.prepend(ArPglogicalMigration) diff --git a/spec/lib/extensions/ar_migration_spec.rb b/spec/lib/extensions/ar_migration_spec.rb new file mode 100644 index 00000000000..48414c79364 --- /dev/null +++ b/spec/lib/extensions/ar_migration_spec.rb @@ -0,0 +1,87 @@ +describe MigrationSyncHelper do + let!(:my_region) do + MiqRegion.seed + MiqRegion.my_region + end + + context "without the migrations ran column" do + before do + column_list = %w(id region created_at updated_at description guid).map { |n| double(:name => n) } + allow(ActiveRecord::Base.connection).to receive(:columns).with("miq_regions").and_return(column_list) + end + + describe ".migrations_column_present?" do + it "is falsey" do + expect(described_class.migrations_column_present?).to be_falsey + end + end + + describe ".wait_for_remote_region_migrations" do + it "does nothing" do + expect(MiqRegion).not_to receive(:find) + described_class.wait_for_remote_region_migrations(double("subscription"), "12345") + end + end + + describe ".update_local_migrations_ran" do + it "does nothing" do + expect(MiqRegion).not_to receive(:my_region) + described_class.update_local_migrations_ran("12345", :up) + end + end + end + + describe ".migrations_column_present?" do + it "is truthy" do + # we never want to remove this column so we can just test directly + expect(described_class.migrations_column_present?).to be_truthy + end + end + + context "with a dummy version" do + let(:version) { "1234567890" } + + # sanity check - if this is somehow a version we have, these tests will make no sense + before { expect(my_region.migrations_ran).not_to include(version) } + + describe ".wait_for_remote_region_migrations" do + let(:subscription) { double("Subscription", :enable => nil, :disable => nil, :provider_region => my_region.id) } + + it "sleeps until the migration is added" do + # need to stub this because the thread uses a separate connection object which won't be in the same transaction + allow(MiqRegion).to receive(:find).with(my_region.id).and_return(my_region) + t = Thread.new do + Thread.current.abort_on_exception = true + described_class.wait_for_remote_region_migrations(subscription, version, 0) + end + + # Try to pass execution to the created thread + # NOTE: This is could definitely be a source of weird spec timing issues because + # we're relying on the thread scheduler to pass to the next thread + # when we sleep, but if this isn't here we likely won't execute the conditional + # block in .wait_for_remote_region_migrations + sleep 1 + + expect(t.alive?).to be true + my_region.update_attributes!(:migrations_ran => ActiveRecord::SchemaMigration.normalized_versions << version) + + # Wait a max of 5 seconds so we don't disrupt the whole test suite if something terrible happens + t = t.join(5) + expect(t.status).to be false + end + end + + describe ".update_local_migrations_ran" do + it "adds the given version when the direction is :up" do + described_class.update_local_migrations_ran(version, :up) + expect(my_region.reload.migrations_ran).to match_array(ActiveRecord::SchemaMigration.normalized_versions << version) + end + + it "doesn't blow up when there is no region" do + MiqRegion.destroy_all + MiqRegion.my_region_clear_cache + described_class.update_local_migrations_ran(version, :up) + end + end + end +end