Skip to content

Commit

Permalink
Merge pull request #18393 from bdunne/migration_bz_1668800
Browse files Browse the repository at this point in the history
Use the new schema_migrations_ran table to track remote schema migrations
  • Loading branch information
carbonin authored Jan 29, 2019
2 parents c7bf786 + 8fd91be commit 8cca781
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 123 deletions.
81 changes: 36 additions & 45 deletions lib/extensions/ar_migration.rb
Original file line number Diff line number Diff line change
@@ -1,48 +1,36 @@
module ArPglogicalMigration
module PglogicalMigrationHelper
def self.migrations_column_present?
ActiveRecord::Base.connection.columns("miq_regions").any? { |c| c.name == "migrations_ran" }
end

def self.my_region_number
# Use ApplicationRecord here because we need to query region information
@my_region_number ||= ApplicationRecord.my_region_number
end

def self.my_region_created?
ActiveRecord::Base.connection.exec_query(<<~SQL).first["exists"]
SELECT EXISTS(
SELECT id FROM miq_regions WHERE region = #{ActiveRecord::Base.connection.quote(my_region_number)}
)
SQL
module ArPglogicalMigrationHelper
def self.discover_schema_migrations_ran_class
return unless ActiveRecord::Base.connection.table_exists?("schema_migrations_ran")
Class.new(ActiveRecord::Base) do
require 'active_record-id_regions'
include ActiveRecord::IdRegions
self.table_name = "schema_migrations_ran"
default_scope { in_my_region }
end
end

def self.update_local_migrations_ran(version, direction)
return unless migrations_column_present?
return unless my_region_created?
def self.update_local_migrations_ran(version, direction)
return unless schema_migrations_ran_class = discover_schema_migrations_ran_class

new_migrations = ActiveRecord::SchemaMigration.normalized_versions
new_migrations << version if direction == :up
migrations_value = ActiveRecord::Base.connection.quote(PG::TextEncoder::Array.new.encode(new_migrations))
new_migrations = ActiveRecord::SchemaMigration.normalized_versions
new_migrations << version if direction == :up

ActiveRecord::Base.connection.exec_query(<<~SQL)
UPDATE miq_regions
SET migrations_ran = #{migrations_value}
WHERE region = #{ActiveRecord::Base.connection.quote(my_region_number)}
SQL
(new_migrations - schema_migrations_ran_class.pluck(:version)).each do |v|
schema_migrations_ran_class.find_or_create_by(:version => v)
end

schema_migrations_ran_class.where(:version => version).delete_all if direction == :down
end

class RemoteRegionMigrationWatcher
class HelperARClass < ActiveRecord::Base; end
class SubscriptionHelper < ActiveRecord::Base; end

attr_reader :region, :subscription, :version
attr_reader :subscription, :version, :schema_migrations_ran_class

def initialize(subscription, version)
region_class = Class.new(ActiveRecord::Base) { self.table_name = "miq_regions" }
@region = region_class.find_by(:region => subscription.provider_region)
@subscription = subscription
@version = version
@schema_migrations_ran_class = ArPglogicalMigrationHelper.discover_schema_migrations_ran_class
@subscription = subscription
@version = version
end

def wait_for_remote_region_migration(wait_time = 1)
Expand All @@ -55,42 +43,45 @@ def wait_for_remote_region_migration(wait_time = 1)
print(".")
restart_subscription
sleep(wait_time)
region.reload
end

puts("\n")
end

private

def wait_for_migration?
migrations_column_present? ? !region.migrations_ran&.include?(version) : false
def region_number
subscription.provider_region
end

def migrations_column_present?
@migrations_column_present ||= PglogicalMigrationHelper.migrations_column_present?
def wait_for_migration?
return false unless schema_migrations_ran_class
# We need to unscope here since in_region doesn't override the default scope of in_my_region
# see https://github.com/ManageIQ/activerecord-id_regions/issues/11
!schema_migrations_ran_class.unscoped.in_region(region_number).where(:version => version).exists?
end

def wait_message
@wait_message ||= "Waiting for remote region #{region.region} to run migration #{version}"
@wait_message ||= "Waiting for remote region #{region_number} to run migration #{version}"
end

def restart_subscription
c = HelperARClass.establish_connection.connection
c = SubscriptionHelper.establish_connection.connection
c.pglogical.subscription_disable(subscription.id)
c.pglogical.subscription_enable(subscription.id)
ensure
HelperARClass.remove_connection
SubscriptionHelper.remove_connection
end
end
end

module ArPglogicalMigration
def migrate(direction)
PglogicalSubscription.all.each do |s|
RemoteRegionMigrationWatcher.new(s, version.to_s).wait_for_remote_region_migration
ArPglogicalMigrationHelper::RemoteRegionMigrationWatcher.new(s, version.to_s).wait_for_remote_region_migration
end

ret = super
PglogicalMigrationHelper.update_local_migrations_ran(version.to_s, direction)
ArPglogicalMigrationHelper.update_local_migrations_ran(version.to_s, direction)
ret
end
end
Expand Down
139 changes: 61 additions & 78 deletions spec/lib/extensions/ar_migration_spec.rb
Original file line number Diff line number Diff line change
@@ -1,109 +1,92 @@
shared_context "without the migrations ran column" do
before do
column_list = %w(id region created_at updated_at description guid).map { |n| double(:name => n) }
allow(ActiveRecord::Base.connection).to receive(:columns).with("miq_regions").and_return(column_list)
describe ArPglogicalMigrationHelper do
shared_context "without the schema_migrations_ran table" do
before do
allow(ActiveRecord::Base.connection).to receive(:table_exists?).with("schema_migrations_ran").and_return(false)
end
end
end

shared_context "with a dummy version" do
let(:version) { "1234567890" }

# sanity check - if this is somehow a version we have, these tests will make no sense
before { expect(my_region.migrations_ran).not_to include(version) }
end
shared_context "with a dummy version" do
let(:version) { "1234567890" }

context "with a region seeded" do
let!(:my_region) do
MiqRegion.seed
MiqRegion.my_region
# sanity check - if this is somehow a version we have, these tests will make no sense
before { expect(ActiveRecord::SchemaMigration.normalized_versions).not_to include(version) }
end

describe ArPglogicalMigration::PglogicalMigrationHelper do
context "without the migrations ran column" do
include_context "without the migrations ran column"
context "with a region seeded" do
let!(:my_region) do
MiqRegion.seed
MiqRegion.my_region
end

describe ".migrations_column_present?" do
it "is falsey" do
expect(described_class.migrations_column_present?).to be_falsey
end
end
describe ".update_local_migrations_ran" do
context "without the schema_migrations_ran table" do
include_context "without the schema_migrations_ran table"

describe ".update_local_migrations_ran" do
it "does nothing" do
expect(ActiveRecord::SchemaMigration).not_to receive(:normalized_versions)
described_class.update_local_migrations_ran("12345", :up)
end
end
end

describe ".migrations_column_present?" do
it "is truthy" do
# we never want to remove this column so we can just test directly
expect(described_class.migrations_column_present?).to be_truthy
end
end

describe ".update_local_migrations_ran" do
include_context "with a dummy version"

it "adds the given version when the direction is :up" do
described_class.update_local_migrations_ran(version, :up)
expect(my_region.reload.migrations_ran).to match_array(ActiveRecord::SchemaMigration.normalized_versions << version)
end

it "doesn't blow up when there is no region" do
MiqRegion.destroy_all
MiqRegion.my_region_clear_cache
described_class.update_local_migrations_ran(version, :up)
end
end
end

describe ArPglogicalMigration::RemoteRegionMigrationWatcher do
include_context "with a dummy version"
context "with the schema_migrations_ran table" do
include_context "with a dummy version"

let(:subscription) { double("Subscription", :enable => nil, :disable => nil, :provider_region => my_region.region) }
it "adds the given version when the direction is :up" do
described_class.update_local_migrations_ran(version, :up)
expect(described_class.discover_schema_migrations_ran_class.where(:version => version).exists?).to eq(true)
end

subject do
described_class.new(subscription, version).tap do |s|
allow(s).to receive_messages(:puts => nil, :print => nil)
it "doesn't blow up when there is no region" do
MiqRegion.destroy_all
MiqRegion.my_region_clear_cache
described_class.update_local_migrations_ran(version, :up)
end
end
end

describe "#wait_for_remote_region_migrations" do
context "without the migrations ran column present" do
include_context "without the migrations ran column"
describe ArPglogicalMigrationHelper::RemoteRegionMigrationWatcher do
include_context "with a dummy version"
let(:helper_class) { Class.new(ActiveRecord::Base) { include ActiveRecord::IdRegions } }
let(:other_region_number) { helper_class.my_region_number + rand(1..50) }
let(:subscription) { double("Subscription", :enable => nil, :disable => nil, :provider_region => other_region_number) }

it "does nothing" do
expect(Vmdb.rails_logger).not_to receive(:info)
subject.wait_for_remote_region_migration
subject do
described_class.new(subscription, version).tap do |s|
allow(s).to receive_messages(:puts => nil, :print => nil)
end
end

it "sleeps until the migration is added" do
allow(subject).to receive(:restart_subscription)
allow(subject.region).to receive(:reload)
describe "#wait_for_remote_region_migration" do
def wait_for_migration_called
@count ||= 0
if @count == 5
ArPglogicalMigrationHelper.discover_schema_migrations_ran_class.create!(:id => helper_class.id_in_region(1, other_region_number), :version => version)
end
@count += 1
end

subject.region.update_attributes!(:migrations_ran => nil)
context "without the schema_migrations_ran table present" do
include_context "without the schema_migrations_ran table"

t = Thread.new do
Thread.current.abort_on_exception = true
subject.wait_for_remote_region_migration(0)
it "does nothing" do
expect(Vmdb.rails_logger).not_to receive(:info)
subject.wait_for_remote_region_migration
end
end

# Try to pass execution to the created thread
# NOTE: This is could definitely be a source of weird spec timing issues because
# we're relying on the thread scheduler to pass to the next thread
# when we sleep, but if this isn't here we likely won't execute the conditional
# block in .wait_for_remote_region_migrations
sleep 1
it "waits for the migration to be added" do
allow(subject).to receive(:restart_subscription)
expect(ArPglogicalMigrationHelper.discover_schema_migrations_ran_class.unscoped.where(:version => version).exists?).to eq(false)

expect(t.alive?).to be true
subject.region.update_attributes!(:migrations_ran => ActiveRecord::SchemaMigration.normalized_versions << version)
allow(subject).to receive(:wait_for_migration?).and_wrap_original do |m, _args|
wait_for_migration_called
m.call
end

# Wait a max of 5 seconds so we don't disrupt the whole test suite if something terrible happens
t = t.join(5)
expect(t.status).to be false
subject.wait_for_remote_region_migration(0)

expect(ArPglogicalMigrationHelper.discover_schema_migrations_ran_class.unscoped.where(:version => version).exists?).to eq(true)
end
end
end
end
Expand Down

0 comments on commit 8cca781

Please sign in to comment.