From 727a053d92961fb2cb4fd3c6fa2b4f5d0d8cc1f3 Mon Sep 17 00:00:00 2001 From: Brandon Dunne Date: Fri, 25 Jan 2019 13:27:09 -0500 Subject: [PATCH] Re-work the migration checking logic - Before migrating the global, wait for all remotes to run the mogration (check the remote schema_migrations table) - Run the migration on the global - After migrating the global, wait for the remote region record to include the migration. (verifying that replication is working) --- app/models/pglogical_subscription.rb | 14 ++-- lib/extensions/ar_migration.rb | 52 ++++++++------- spec/lib/extensions/ar_migration_spec.rb | 82 +++++++++++++----------- 3 files changed, 81 insertions(+), 67 deletions(-) diff --git a/app/models/pglogical_subscription.rb b/app/models/pglogical_subscription.rb index eae8dd90007e..2f9d2a781fc5 100644 --- a/app/models/pglogical_subscription.rb +++ b/app/models/pglogical_subscription.rb @@ -114,6 +114,13 @@ def backlog nil end + def with_remote_connection + find_password + MiqRegionRemote.with_remote_connection(host, port || 5432, user, decrypted_password, dbname, "postgresql") do |conn| + yield conn + end + end + # translate the output from the pglogical stored proc to our object columns def self.subscription_to_columns(sub) cols = sub.symbolize_keys @@ -256,11 +263,4 @@ def remote_replication_lsn def remote_node_lsn with_remote_connection(&:xlog_location) end - - def with_remote_connection - find_password - MiqRegionRemote.with_remote_connection(host, port || 5432, user, decrypted_password, dbname, "postgresql") do |conn| - yield conn - end - end end diff --git a/lib/extensions/ar_migration.rb b/lib/extensions/ar_migration.rb index 4dbc77ae397d..fce264c91edf 100644 --- a/lib/extensions/ar_migration.rb +++ b/lib/extensions/ar_migration.rb @@ -46,39 +46,48 @@ def initialize(subscription, version) end def wait_for_remote_region_migration(wait_time = 1) - return unless wait_for_migration? + return if version_exists_on_remote_region_schema_migrations? + # Sit in a loop waiting for the remote schema_migrations table to include the version + wait_message = "Waiting for remote region #{region.region} to run migration #{version}" Vmdb.rails_logger.info(wait_message) print(wait_message) - while wait_for_migration? + until version_exists_on_remote_region_schema_migrations? print(".") - restart_subscription sleep(wait_time) - region.reload end puts("\n") + region.reload end - private + def wait_for_remote_migration_to_replicate(wait_time = 1) + # If !migrations_column_present?, we're not sure if we're replicating and can't tell, don't wait + return unless migrations_column_present? + return if region_record_includes_version? - def wait_for_migration? - migrations_column_present? ? !remote_region_migrated? : false - end + # Sit in a loop waiting for the remote schema_migrations table to include the version + wait_message = "Waiting for region #{region.region} record to include migration #{version}" + Vmdb.rails_logger.info(wait_message) + print(wait_message) + + until region_record_includes_version? + restart_subscription + print(".") + sleep(wait_time) + end - # NOTE: Check the local table first, but a migration can cause MiqRegion replication to stop (i.e. adding a new column to miq_regions). - # In that case, the global MiqRegion record for the remote region will not have the migration timestamp yet, so we need to check the - # schema_migrations table in the remote region directly. - def remote_region_migrated? - version_exists_in_replicated_region? || version_exists_on_remote_region? + puts("\n") end - def version_exists_in_replicated_region? - region.migrations_ran&.include?(version) + private + + def region_record_includes_version? + region.reload.migrations_ran&.include?(version) end - def version_exists_on_remote_region? + def version_exists_on_remote_region_schema_migrations? subscription.send(:with_remote_connection) do |connection| connection.select_values("SELECT 1 FROM schema_migrations WHERE version = '#{version}' LIMIT 1") end.any? @@ -88,10 +97,6 @@ def migrations_column_present? @migrations_column_present ||= PglogicalMigrationHelper.migrations_column_present? end - def wait_message - @wait_message ||= "Waiting for remote region #{region.region} to run migration #{version}" - end - def restart_subscription c = HelperARClass.establish_connection.connection c.pglogical.subscription_disable(subscription.id) @@ -102,12 +107,15 @@ def restart_subscription end def migrate(direction) - PglogicalSubscription.all.each do |s| - RemoteRegionMigrationWatcher.new(s, version.to_s).wait_for_remote_region_migration + subscriptions = PglogicalSubscription.all.collect do |s| + RemoteRegionMigrationWatcher.new(s, version.to_s).tap(&:wait_for_remote_region_migration) end ret = super PglogicalMigrationHelper.update_local_migrations_ran(version.to_s, direction) + + subscriptions.each(&:wait_for_remote_migration_to_replicate) + ret end end diff --git a/spec/lib/extensions/ar_migration_spec.rb b/spec/lib/extensions/ar_migration_spec.rb index 045ae4468415..c86361c230fd 100644 --- a/spec/lib/extensions/ar_migration_spec.rb +++ b/spec/lib/extensions/ar_migration_spec.rb @@ -70,61 +70,67 @@ end end - describe "#wait_for_remote_region_migrations" do + describe "#wait_for_remote_region_migrations sleeps until the migration is in the remote regions schema_migrations table" do + it "does not enter the loop if the version already exists" do + expect(Vmdb.rails_logger).not_to receive(:info) + expect(subject).not_to receive(:print) + expect(subscription).to receive(:with_remote_connection).and_return([version]) + + subject.wait_for_remote_region_migration(0) + end + + it "enters the loop if the version does not exist" do + expect(subject).to receive(:print).with(".").twice + expect(subscription).to receive(:with_remote_connection).and_return([], [], [], [version]) + + subject.wait_for_remote_region_migration(0) + end + end + + describe "#wait_for_remote_migration_to_replicate" do context "without the migrations ran column present" do include_context "without the migrations ran column" it "does nothing" do expect(Vmdb.rails_logger).not_to receive(:info) - subject.wait_for_remote_region_migration + subject.wait_for_remote_migration_to_replicate end end - it "sleeps until the migration is replicated up" do - allow(subject).to receive(:restart_subscription) - allow(subject.region).to receive(:reload) - allow(subscription).to receive(:with_remote_connection).and_return([]) + context "with the migrations ran column present" do + def reload_called + @count ||= 0 - subject.region.update_attributes!(:migrations_ran => nil) + if @count == 5 + subject.region.update_attributes!(:migrations_ran => [version]) + end - t = Thread.new do - Thread.current.abort_on_exception = true - subject.wait_for_remote_region_migration(0) + @count += 1 end - # Try to pass execution to the created thread - # NOTE: This is could definitely be a source of weird spec timing issues because - # we're relying on the thread scheduler to pass to the next thread - # when we sleep, but if this isn't here we likely won't execute the conditional - # block in .wait_for_remote_region_migrations - sleep 1 + it "when migrations_ran is nil, sits in the loop until the version appears" do + subject.region.update_attributes!(:migrations_ran => nil) - expect(t.alive?).to be true - subject.region.update_attributes!(:migrations_ran => ActiveRecord::SchemaMigration.normalized_versions << version) + expect(subject).to receive(:restart_subscription).exactly(4).times + allow(subject.region).to receive(:reload).and_wrap_original do |m, _args| + reload_called + m.call + end - # Wait a max of 5 seconds so we don't disrupt the whole test suite if something terrible happens - t = t.join(5) - expect(t.status).to be false - end - - it "when migrations_ran is nil, it checks the schema_migrations table on the remote region" do - expect(subject).not_to receive(:restart_subscription) - expect(subject.region).not_to receive(:reload) - expect(subscription).to receive(:with_remote_connection).and_return([version]) - - subject.region.update_attributes!(:migrations_ran => nil) - - subject.wait_for_remote_region_migration(0) - end + subject.wait_for_remote_migration_to_replicate(0) + end - it "when migrations_ran does not include that expected migration, it checks the schema_migrations on the remote region" do - expect(subject).not_to receive(:restart_subscription) - expect(subject.region).not_to receive(:reload) - expect(subscription).to receive(:with_remote_connection).and_return([version]) + it "when migrations_ran is has other values, sits in the loop until the version appears" do + subject.region.update_attributes!(:migrations_ran => ["1", "2", "3"]) - subject.region.update_attributes!(:migrations_ran => ["1234", "5678"]) + expect(subject).to receive(:restart_subscription).exactly(4).times + allow(subject.region).to receive(:reload).and_wrap_original do |m, _args| + reload_called + m.call + end - subject.wait_for_remote_region_migration(0) + subject.wait_for_remote_migration_to_replicate(0) + end end end end