-
Notifications
You must be signed in to change notification settings - Fork 900
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Patch ActiveRecord::Migration#migrate to ensure global region schema …
…consistency This patch does two things. 1. It maintains the migration list in MiqRegion#migrations_ran as each migration is completed 2. It halts the global region migration process if the changes from the same migration in all the remote regions have not been replicated This combination will allow us to ensure that the global region will never attempt to replicate data which would be inconsistent with the current schema. This patch uses SQL directly to do the update because the migration which adds the migrations_ran column was not able to utilize the attribute for the newly added column. We were not able to refresh the attributes cache of the MiqRegion model from within the #migrate method for some reason, but were able to detect if the column was present and update it while bypassing the AR caches. This seemed to be the more sane route given the issues we've had with model cache clearing in the past. Fixes https://bugzilla.redhat.com/show_bug.cgi?id=1601015
- Loading branch information
Showing
2 changed files
with
132 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
module MigrationSyncHelper | ||
def self.migrations_column_present? | ||
ActiveRecord::Base.connection.columns("miq_regions").detect { |c| c.name == "migrations_ran" } | ||
end | ||
|
||
def self.wait_for_remote_region_migrations(subscription, version, wait_time = 1) | ||
return unless MigrationSyncHelper.migrations_column_present? | ||
region = MiqRegion.find(subscription.provider_region) | ||
until region.migrations_ran.include?(version) | ||
subscription.disable | ||
subscription.enable | ||
sleep(wait_time) | ||
region.reload | ||
end | ||
end | ||
|
||
def self.update_local_migrations_ran(version, direction) | ||
return unless MigrationSyncHelper.migrations_column_present? | ||
return unless (region = MiqRegion.my_region) | ||
|
||
new_migrations = ActiveRecord::SchemaMigration.normalized_versions | ||
new_migrations << version if direction == :up | ||
migrations_value = ActiveRecord::Base.connection.quote(PG::TextEncoder::Array.new.encode(new_migrations)) | ||
|
||
ActiveRecord::Base.connection.exec_query(<<~SQL) | ||
UPDATE #{MiqRegion.table_name} | ||
SET migrations_ran = #{migrations_value} | ||
WHERE id = #{region.id} | ||
SQL | ||
end | ||
end | ||
|
||
module ArPglogicalMigration | ||
def migrate(direction) | ||
PglogicalSubscription.all.each do |s| | ||
MigrationSyncHelper.wait_for_remote_region_migration(s, version.to_s) | ||
end | ||
|
||
ret = super | ||
MigrationSyncHelper.update_local_migrations_ran(version.to_s, direction) | ||
ret | ||
end | ||
end | ||
|
||
ActiveRecord::Migration.prepend(ArPglogicalMigration) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
describe MigrationSyncHelper do | ||
let!(:my_region) do | ||
MiqRegion.seed | ||
MiqRegion.my_region | ||
end | ||
|
||
context "without the migrations ran column" do | ||
before do | ||
column_list = %w(id region created_at updated_at description guid).map { |n| double(:name => n) } | ||
allow(ActiveRecord::Base.connection).to receive(:columns).with("miq_regions").and_return(column_list) | ||
end | ||
|
||
describe ".migrations_column_present?" do | ||
it "is falsey" do | ||
expect(described_class.migrations_column_present?).to be_falsey | ||
end | ||
end | ||
|
||
describe ".wait_for_remote_region_migrations" do | ||
it "does nothing" do | ||
expect(MiqRegion).not_to receive(:find) | ||
described_class.wait_for_remote_region_migrations(double("subscription"), "12345") | ||
end | ||
end | ||
|
||
describe ".update_local_migrations_ran" do | ||
it "does nothing" do | ||
expect(MiqRegion).not_to receive(:my_region) | ||
described_class.update_local_migrations_ran("12345", :up) | ||
end | ||
end | ||
end | ||
|
||
describe ".migrations_column_present?" do | ||
it "is truthy" do | ||
# we never want to remove this column so we can just test directly | ||
expect(described_class.migrations_column_present?).to be_truthy | ||
end | ||
end | ||
|
||
context "with a dummy version" do | ||
let(:version) { "1234567890" } | ||
|
||
# sanity check - if this is somehow a version we have, these tests will make no sense | ||
before { expect(my_region.migrations_ran).not_to include(version) } | ||
|
||
describe ".wait_for_remote_region_migrations" do | ||
let(:subscription) { double("Subscription", :enable => nil, :disable => nil, :provider_region => my_region.id) } | ||
|
||
it "sleeps until the migration is added" do | ||
# need to stub this because the thread uses a separate connection object which won't be in the same transaction | ||
allow(MiqRegion).to receive(:find).with(my_region.id).and_return(my_region) | ||
t = Thread.new do | ||
Thread.current.abort_on_exception = true | ||
described_class.wait_for_remote_region_migrations(subscription, version, 0) | ||
end | ||
|
||
# Try to pass execution to the created thread | ||
# NOTE: This is could definitely be a source of weird spec timing issues because | ||
# we're relying on the thread scheduler to pass to the next thread | ||
# when we sleep, but if this isn't here we likely won't execute the conditional | ||
# block in .wait_for_remote_region_migrations | ||
sleep 1 | ||
|
||
expect(t.alive?).to be true | ||
my_region.update_attributes!(:migrations_ran => ActiveRecord::SchemaMigration.normalized_versions << version) | ||
|
||
# Wait a max of 5 seconds so we don't disrupt the whole test suite if something terrible happens | ||
t = t.join(5) | ||
expect(t.status).to be false | ||
end | ||
end | ||
|
||
describe ".update_local_migrations_ran" do | ||
it "adds the given version when the direction is :up" do | ||
described_class.update_local_migrations_ran(version, :up) | ||
expect(my_region.reload.migrations_ran).to match_array(ActiveRecord::SchemaMigration.normalized_versions << version) | ||
end | ||
|
||
it "doesn't blow up when there is no region" do | ||
MiqRegion.destroy_all | ||
MiqRegion.my_region_clear_cache | ||
described_class.update_local_migrations_ran(version, :up) | ||
end | ||
end | ||
end | ||
end |