-
Notifications
You must be signed in to change notification settings - Fork 897
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #17919 from carbonin/replication_migration_extension
Add a patch to ActiveRecord::Migration for tracking replicated migrations
- Loading branch information
Showing
4 changed files
with
217 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
module ArPglogicalMigration | ||
module PglogicalMigrationHelper | ||
def self.migrations_column_present? | ||
ActiveRecord::Base.connection.columns("miq_regions").any? { |c| c.name == "migrations_ran" } | ||
end | ||
|
||
def self.my_region_number | ||
# Use ApplicationRecord here because we need to query region information | ||
@my_region_number ||= ApplicationRecord.my_region_number | ||
end | ||
|
||
def self.my_region_created? | ||
ActiveRecord::Base.connection.exec_query(<<~SQL).first["exists"] | ||
SELECT EXISTS( | ||
SELECT id FROM miq_regions WHERE region = #{ActiveRecord::Base.connection.quote(my_region_number)} | ||
) | ||
SQL | ||
end | ||
|
||
def self.update_local_migrations_ran(version, direction) | ||
return unless migrations_column_present? | ||
return unless my_region_created? | ||
|
||
new_migrations = ActiveRecord::SchemaMigration.normalized_versions | ||
new_migrations << version if direction == :up | ||
migrations_value = ActiveRecord::Base.connection.quote(PG::TextEncoder::Array.new.encode(new_migrations)) | ||
|
||
ActiveRecord::Base.connection.exec_query(<<~SQL) | ||
UPDATE miq_regions | ||
SET migrations_ran = #{migrations_value} | ||
WHERE region = #{ActiveRecord::Base.connection.quote(my_region_number)} | ||
SQL | ||
end | ||
end | ||
|
||
class RemoteRegionMigrationWatcher | ||
class HelperARClass < ActiveRecord::Base; end | ||
|
||
attr_reader :region, :subscription, :version | ||
|
||
def initialize(subscription, version) | ||
region_class = Class.new(ActiveRecord::Base) { self.table_name = "miq_regions" } | ||
@region = region_class.find_by(:region => subscription.provider_region) | ||
@subscription = subscription | ||
@version = version | ||
end | ||
|
||
def wait_for_remote_region_migration(wait_time = 1) | ||
return unless wait_for_migration? | ||
|
||
Vmdb.rails_logger.info(wait_message) | ||
print(wait_message) | ||
|
||
while wait_for_migration? | ||
print(".") | ||
restart_subscription | ||
sleep(wait_time) | ||
region.reload | ||
end | ||
|
||
puts("\n") | ||
end | ||
|
||
private | ||
|
||
def wait_for_migration? | ||
migrations_column_present? ? !region.migrations_ran&.include?(version) : false | ||
end | ||
|
||
def migrations_column_present? | ||
@migrations_column_present ||= PglogicalMigrationHelper.migrations_column_present? | ||
end | ||
|
||
def wait_message | ||
@wait_message ||= "Waiting for remote region #{region.region} to run migration #{version}" | ||
end | ||
|
||
def restart_subscription | ||
c = HelperARClass.establish_connection.connection | ||
c.pglogical.subscription_disable(subscription.id) | ||
c.pglogical.subscription_enable(subscription.id) | ||
ensure | ||
HelperARClass.remove_connection | ||
end | ||
end | ||
|
||
def migrate(direction) | ||
PglogicalSubscription.all.each do |s| | ||
RemoteRegionMigrationWatcher.new(s, version.to_s).wait_for_remote_region_migration | ||
end | ||
|
||
ret = super | ||
PglogicalMigrationHelper.update_local_migrations_ran(version.to_s, direction) | ||
ret | ||
end | ||
end | ||
|
||
ActiveRecord::Migration.prepend(ArPglogicalMigration) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
shared_context "without the migrations ran column" do | ||
before do | ||
column_list = %w(id region created_at updated_at description guid).map { |n| double(:name => n) } | ||
allow(ActiveRecord::Base.connection).to receive(:columns).with("miq_regions").and_return(column_list) | ||
end | ||
end | ||
|
||
shared_context "with a dummy version" do | ||
let(:version) { "1234567890" } | ||
|
||
# sanity check - if this is somehow a version we have, these tests will make no sense | ||
before { expect(my_region.migrations_ran).not_to include(version) } | ||
end | ||
|
||
context "with a region seeded" do | ||
let!(:my_region) do | ||
MiqRegion.seed | ||
MiqRegion.my_region | ||
end | ||
|
||
describe ArPglogicalMigration::PglogicalMigrationHelper do | ||
context "without the migrations ran column" do | ||
include_context "without the migrations ran column" | ||
|
||
describe ".migrations_column_present?" do | ||
it "is falsey" do | ||
expect(described_class.migrations_column_present?).to be_falsey | ||
end | ||
end | ||
|
||
describe ".update_local_migrations_ran" do | ||
it "does nothing" do | ||
expect(ActiveRecord::SchemaMigration).not_to receive(:normalized_versions) | ||
described_class.update_local_migrations_ran("12345", :up) | ||
end | ||
end | ||
end | ||
|
||
describe ".migrations_column_present?" do | ||
it "is truthy" do | ||
# we never want to remove this column so we can just test directly | ||
expect(described_class.migrations_column_present?).to be_truthy | ||
end | ||
end | ||
|
||
describe ".update_local_migrations_ran" do | ||
include_context "with a dummy version" | ||
|
||
it "adds the given version when the direction is :up" do | ||
described_class.update_local_migrations_ran(version, :up) | ||
expect(my_region.reload.migrations_ran).to match_array(ActiveRecord::SchemaMigration.normalized_versions << version) | ||
end | ||
|
||
it "doesn't blow up when there is no region" do | ||
MiqRegion.destroy_all | ||
MiqRegion.my_region_clear_cache | ||
described_class.update_local_migrations_ran(version, :up) | ||
end | ||
end | ||
end | ||
|
||
describe ArPglogicalMigration::RemoteRegionMigrationWatcher do | ||
include_context "with a dummy version" | ||
|
||
let(:subscription) { double("Subscription", :enable => nil, :disable => nil, :provider_region => my_region.region) } | ||
|
||
subject do | ||
described_class.new(subscription, version).tap do |s| | ||
allow(s).to receive_messages(:puts => nil, :print => nil) | ||
end | ||
end | ||
|
||
describe "#wait_for_remote_region_migrations" do | ||
context "without the migrations ran column present" do | ||
include_context "without the migrations ran column" | ||
|
||
it "does nothing" do | ||
expect(Vmdb.rails_logger).not_to receive(:info) | ||
subject.wait_for_remote_region_migration | ||
end | ||
end | ||
|
||
it "sleeps until the migration is added" do | ||
allow(subject).to receive(:restart_subscription) | ||
allow(subject.region).to receive(:reload) | ||
|
||
subject.region.update_attributes!(:migrations_ran => nil) | ||
|
||
t = Thread.new do | ||
Thread.current.abort_on_exception = true | ||
subject.wait_for_remote_region_migration(0) | ||
end | ||
|
||
# Try to pass execution to the created thread | ||
# NOTE: This is could definitely be a source of weird spec timing issues because | ||
# we're relying on the thread scheduler to pass to the next thread | ||
# when we sleep, but if this isn't here we likely won't execute the conditional | ||
# block in .wait_for_remote_region_migrations | ||
sleep 1 | ||
|
||
expect(t.alive?).to be true | ||
subject.region.update_attributes!(:migrations_ran => ActiveRecord::SchemaMigration.normalized_versions << version) | ||
|
||
# Wait a max of 5 seconds so we don't disrupt the whole test suite if something terrible happens | ||
t = t.join(5) | ||
expect(t.status).to be false | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters