Skip to content

Commit

Permalink
Refactor the extension to use a class for waiting on the migration
Browse files Browse the repository at this point in the history
  • Loading branch information
carbonin committed Sep 25, 2018
1 parent abe7106 commit d8d2913
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 86 deletions.
90 changes: 53 additions & 37 deletions lib/extensions/ar_migration.rb
Original file line number Diff line number Diff line change
@@ -1,43 +1,9 @@
module ArPglogicalMigration
class PglogicalMigrationHelper
module PglogicalMigrationHelper
def self.migrations_column_present?
ActiveRecord::Base.connection.columns("miq_regions").any? { |c| c.name == "migrations_ran" }
end

def self.log_and_print(message)
if @current_message == message
print "."
else
Vmdb.rails_logger.info(message)
print message
end
@current_message = message
end

class HelperARClass < ActiveRecord::Base; end

def self.restart_subscription(s)
c = HelperARClass.establish_connection.connection
c.pglogical.subscription_disable(s.id)
c.pglogical.subscription_enable(s.id)
ensure
HelperARClass.remove_connection
end

def self.wait_for_remote_region_migration(subscription, version, wait_time = 1)
return unless migrations_column_present?
region = MiqRegion.find_by(:region => subscription.provider_region)
waited = false
until region.migrations_ran&.include?(version)
waited = true
log_and_print("Waiting for remote region #{region.region} to run migration #{version}")
restart_subscription(subscription)
sleep(wait_time)
region.reload
end
puts "\n" if waited
end

def self.update_local_migrations_ran(version, direction)
return unless migrations_column_present?
return unless (region = MiqRegion.my_region)
Expand All @@ -54,13 +20,63 @@ def self.update_local_migrations_ran(version, direction)
end
end

class RemoteRegionMigrationWatcher
class HelperARClass < ActiveRecord::Base; end

attr_reader :region, :subscription, :version

def initialize(subscription, version)
@region = MiqRegion.find_by(:region => subscription.provider_region)
@subscription = subscription
@version = version
end

def wait_for_remote_region_migration(wait_time = 1)
return unless wait_for_migration?

Vmdb.rails_logger.info(wait_message)
print(wait_message)

while wait_for_migration?
print(".")
restart_subscription
sleep(wait_time)
region.reload
end

puts("\n")
end

private

def wait_for_migration?
migrations_column_present? ? !region.migrations_ran&.include?(version) : false
end

def migrations_column_present?
@migrations_column_present ||= PglogicalMigrationHelper.migrations_column_present?
end

def wait_message
@wait_message ||= "Waiting for remote region #{region.region} to run migration #{version}"
end

def restart_subscription
c = HelperARClass.establish_connection.connection
c.pglogical.subscription_disable(subscription.id)
c.pglogical.subscription_enable(subscription.id)
ensure
HelperARClass.remove_connection
end
end

def migrate(direction)
PglogicalSubscription.all.each do |s|
ArPglogicalMigrationHelper.wait_for_remote_region_migration(s, version.to_s)
RemoteRegionMigrationWatcher.new(s, version.to_s).wait_for_remote_region_migration
end

ret = super
ArPglogicalMigrationHelper.update_local_migrations_ran(version.to_s, direction)
PglogicalMigrationHelper.update_local_migrations_ran(version.to_s, direction)
ret
end
end
Expand Down
116 changes: 67 additions & 49 deletions spec/lib/extensions/ar_migration_spec.rb
Original file line number Diff line number Diff line change
@@ -1,63 +1,94 @@
describe ArPglogicalMigration::PglogicalMigrationHelper do
shared_context "without the migrations ran column" do
before do
column_list = %w(id region created_at updated_at description guid).map { |n| double(:name => n) }
allow(ActiveRecord::Base.connection).to receive(:columns).with("miq_regions").and_return(column_list)
end
end

shared_context "with a dummy version" do
let(:version) { "1234567890" }

# sanity check - if this is somehow a version we have, these tests will make no sense
before { expect(my_region.migrations_ran).not_to include(version) }
end

context "with a region seeded" do
let!(:my_region) do
MiqRegion.seed
MiqRegion.my_region
end

before { allow(described_class).to receive_messages(:puts => nil, :print => nil) }
describe ArPglogicalMigration::PglogicalMigrationHelper do
context "without the migrations ran column" do
include_context "without the migrations ran column"

context "without the migrations ran column" do
before do
column_list = %w(id region created_at updated_at description guid).map { |n| double(:name => n) }
allow(ActiveRecord::Base.connection).to receive(:columns).with("miq_regions").and_return(column_list)
end
describe ".migrations_column_present?" do
it "is falsey" do
expect(described_class.migrations_column_present?).to be_falsey
end
end

describe ".migrations_column_present?" do
it "is falsey" do
expect(described_class.migrations_column_present?).to be_falsey
describe ".update_local_migrations_ran" do
it "does nothing" do
expect(MiqRegion).not_to receive(:my_region)
described_class.update_local_migrations_ran("12345", :up)
end
end
end

describe ".wait_for_remote_region_migrations" do
it "does nothing" do
expect(MiqRegion).not_to receive(:find)
described_class.wait_for_remote_region_migration(double("subscription"), "12345")
describe ".migrations_column_present?" do
it "is truthy" do
# we never want to remove this column so we can just test directly
expect(described_class.migrations_column_present?).to be_truthy
end
end

describe ".update_local_migrations_ran" do
it "does nothing" do
expect(MiqRegion).not_to receive(:my_region)
described_class.update_local_migrations_ran("12345", :up)
include_context "with a dummy version"

it "adds the given version when the direction is :up" do
described_class.update_local_migrations_ran(version, :up)
expect(my_region.reload.migrations_ran).to match_array(ActiveRecord::SchemaMigration.normalized_versions << version)
end
end
end

describe ".migrations_column_present?" do
it "is truthy" do
# we never want to remove this column so we can just test directly
expect(described_class.migrations_column_present?).to be_truthy
it "doesn't blow up when there is no region" do
MiqRegion.destroy_all
MiqRegion.my_region_clear_cache
described_class.update_local_migrations_ran(version, :up)
end
end
end

context "with a dummy version" do
let(:version) { "1234567890" }
describe ArPglogicalMigration::RemoteRegionMigrationWatcher do
include_context "with a dummy version"

let(:subscription) { double("Subscription", :enable => nil, :disable => nil, :provider_region => my_region.region) }

# sanity check - if this is somehow a version we have, these tests will make no sense
before { expect(my_region.migrations_ran).not_to include(version) }
subject do
described_class.new(subscription, version).tap do |s|
allow(s).to receive_messages(:puts => nil, :print => nil)
end
end

describe ".wait_for_remote_region_migration" do
let(:subscription) { double("Subscription", :enable => nil, :disable => nil, :provider_region => my_region.region) }
describe "#wait_for_remote_region_migrations" do
context "without the migrations ran column present" do
include_context "without the migrations ran column"

it "does nothing" do
expect(Vmdb.rails_logger).not_to receive(:info)
subject.wait_for_remote_region_migration
end
end

it "sleeps until the migration is added" do
allow(described_class).to receive(:restart_subscription)
my_region.update_attributes!(:migrations_ran => nil)
allow(subject).to receive(:restart_subscription)
allow(subject.region).to receive(:reload)

subject.region.update_attributes!(:migrations_ran => nil)

t = Thread.new do
Thread.current.abort_on_exception = true
# need to stub these because the thread uses a separate connection object which won't be in the same transaction
allow(MiqRegion).to receive(:find_by).with(:region => my_region.region).and_return(my_region)
allow(my_region).to receive(:reload)
described_class.wait_for_remote_region_migration(subscription, version, 0)
subject.wait_for_remote_region_migration(0)
end

# Try to pass execution to the created thread
Expand All @@ -68,25 +99,12 @@
sleep 1

expect(t.alive?).to be true
my_region.update_attributes!(:migrations_ran => ActiveRecord::SchemaMigration.normalized_versions << version)
subject.region.update_attributes!(:migrations_ran => ActiveRecord::SchemaMigration.normalized_versions << version)

# Wait a max of 5 seconds so we don't disrupt the whole test suite if something terrible happens
t = t.join(5)
expect(t.status).to be false
end
end

describe ".update_local_migrations_ran" do
it "adds the given version when the direction is :up" do
described_class.update_local_migrations_ran(version, :up)
expect(my_region.reload.migrations_ran).to match_array(ActiveRecord::SchemaMigration.normalized_versions << version)
end

it "doesn't blow up when there is no region" do
MiqRegion.destroy_all
MiqRegion.my_region_clear_cache
described_class.update_local_migrations_ran(version, :up)
end
end
end
end

0 comments on commit d8d2913

Please sign in to comment.