From 8cec170335b55ec1f3c507524dd8f246c84c477e Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Tue, 23 Apr 2019 11:41:37 -0400 Subject: [PATCH 01/23] s/REPLICATION_SET_NAME/PUBLICATION_NAME/ --- lib/miq_pglogical.rb | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/lib/miq_pglogical.rb b/lib/miq_pglogical.rb index b42b8698898..a0d6927a409 100644 --- a/lib/miq_pglogical.rb +++ b/lib/miq_pglogical.rb @@ -5,7 +5,7 @@ class MiqPglogical include Vmdb::Logging - REPLICATION_SET_NAME = 'miq'.freeze + PUBLICATION_NAME = 'miq'.freeze NODE_PREFIX = "region_".freeze ALWAYS_EXCLUDED_TABLES = %w(ar_internal_metadata schema_migrations repl_events repl_monitor repl_nodes).freeze @@ -31,7 +31,7 @@ def active_excludes # Returns whether or not this server is configured as a provider node # @return Boolean def provider? - pglogical.enabled? && pglogical.replication_sets.include?(REPLICATION_SET_NAME) + pglogical.enabled? && pglogical.replication_sets.include?(PUBLICATION_NAME) end # Returns whether or not this server is configured as a subscriber node @@ -71,7 +71,7 @@ def configure_provider # database def destroy_provider return unless provider? - pglogical.replication_set_drop(REPLICATION_SET_NAME) + pglogical.replication_set_drop(PUBLICATION_NAME) drop_node pglogical.disable end @@ -79,12 +79,12 @@ def destroy_provider # Lists the tables currently being replicated by pglogical # @return Array the table list def included_tables - pglogical.tables_in_replication_set(REPLICATION_SET_NAME) + pglogical.tables_in_replication_set(PUBLICATION_NAME) end # Creates the 'miq' replication set and refreshes the excluded tables def create_replication_set - pglogical.replication_set_create(REPLICATION_SET_NAME) + pglogical.replication_set_create(PUBLICATION_NAME) refresh_excludes end @@ -104,17 +104,17 @@ def self.refresh_excludes(new_excludes) # Aligns the contents of the 'miq' replication set with the currently configured vmdb excludes def refresh_excludes - pglogical.with_replication_set_lock(REPLICATION_SET_NAME) do + pglogical.with_replication_set_lock(PUBLICATION_NAME) do # remove newly excluded tables from replication set newly_excluded_tables.each do |table| - _log.info("Removing #{table} from #{REPLICATION_SET_NAME} replication set") - pglogical.replication_set_remove_table(REPLICATION_SET_NAME, table) + _log.info("Removing #{table} from #{PUBLICATION_NAME} replication set") + pglogical.replication_set_remove_table(PUBLICATION_NAME, table) end # add tables to the set which are no longer excluded (or new) newly_included_tables.each do |table| - _log.info("Adding #{table} to #{REPLICATION_SET_NAME} replication set") - pglogical.replication_set_add_table(REPLICATION_SET_NAME, table, true) + _log.info("Adding #{table} to #{PUBLICATION_NAME} replication set") + pglogical.replication_set_add_table(PUBLICATION_NAME, table, true) end end end From c826707df42489c24f18b46dc621149fc0720168 Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Tue, 23 Apr 2019 16:43:28 -0400 Subject: [PATCH 02/23] Use the new logical replication gem in MiqPglogical Most notably, the concept of a node goes away and we can set the tables in a publication directly rather than adding and removing them one by one. This also eliminates the need for a lock around the publication because the operation is atomic. --- lib/miq_pglogical.rb | 104 ++--------- spec/replication/util/miq_pglogical_spec.rb | 184 ++++++++------------ 2 files changed, 91 insertions(+), 197 deletions(-) diff --git a/lib/miq_pglogical.rb b/lib/miq_pglogical.rb index a0d6927a409..264166cee84 100644 --- a/lib/miq_pglogical.rb +++ b/lib/miq_pglogical.rb @@ -1,90 +1,58 @@ require 'pg' -require 'pg/pglogical' -require 'pg/pglogical/active_record_extension' +require 'pg/logical_replication' class MiqPglogical include Vmdb::Logging PUBLICATION_NAME = 'miq'.freeze - NODE_PREFIX = "region_".freeze ALWAYS_EXCLUDED_TABLES = %w(ar_internal_metadata schema_migrations repl_events repl_monitor repl_nodes).freeze attr_reader :configured_excludes def initialize - @connection = ApplicationRecord.connection + @connection = ApplicationRecord.connection.raw_connection self.configured_excludes = provider? ? active_excludes : self.class.default_excludes end - # Sets the tables that should be used to create the replication set using refresh_excludes + # Sets the tables that should be used to create the publication using refresh_excludes def configured_excludes=(new_excludes) @configured_excludes = new_excludes | ALWAYS_EXCLUDED_TABLES end - # Returns the excluded tables that are currently being used by pglogical + # Returns the excluded tables that are currently being used # @return Array the table list def active_excludes return [] unless provider? - @connection.tables - included_tables + ApplicationRecord.connection.tables - included_tables end - # Returns whether or not this server is configured as a provider node - # @return Boolean def provider? - pglogical.enabled? && pglogical.replication_sets.include?(PUBLICATION_NAME) + pglogical.publications.map { |p| p["name"] }.include?(PUBLICATION_NAME) end - # Returns whether or not this server is configured as a subscriber node - # @return Boolean def subscriber? - pglogical.enabled? && !pglogical.subscriptions.empty? + !pglogical.subscriptions.empty? end - # Returns whether or not this server is a pglogical node - def node? - pglogical.enabled? && pglogical.nodes.field_values("name").include?(self.class.local_node_name) - end - - # Creates a pglogical node using the rails connection - def create_node - pglogical.node_create(self.class.local_node_name, connection_dsn) - end - - # Drops the pglogical node associated with this connection - def drop_node - pglogical.node_drop(self.class.local_node_name, true) - end - - # Configures the database as a pglogical replication source - # This includes enabling the extension, creating the - # node and creating the replication set def configure_provider return if provider? - @connection.transaction(:requires_new => true) do - pglogical.enable - create_node unless node? - create_replication_set - end + create_replication_set end - # Removes the replication configuration and pglogical node from the - # database def destroy_provider return unless provider? - pglogical.replication_set_drop(PUBLICATION_NAME) - drop_node - pglogical.disable + pglogical.drop_publication(PUBLICATION_NAME) end - # Lists the tables currently being replicated by pglogical + # Lists the tables currently being replicated # @return Array the table list def included_tables - pglogical.tables_in_replication_set(PUBLICATION_NAME) + pglogical.tables_in_publication(PUBLICATION_NAME) end - # Creates the 'miq' replication set and refreshes the excluded tables + # Creates the 'miq' publication and refreshes the excluded tables def create_replication_set - pglogical.replication_set_create(PUBLICATION_NAME) + pglogical.create_publication(PUBLICATION_NAME) refresh_excludes end @@ -102,21 +70,10 @@ def self.refresh_excludes(new_excludes) pgl.refresh_excludes end - # Aligns the contents of the 'miq' replication set with the currently configured vmdb excludes + # Aligns the contents of the 'miq' publication with the currently configured excludes def refresh_excludes - pglogical.with_replication_set_lock(PUBLICATION_NAME) do - # remove newly excluded tables from replication set - newly_excluded_tables.each do |table| - _log.info("Removing #{table} from #{PUBLICATION_NAME} replication set") - pglogical.replication_set_remove_table(PUBLICATION_NAME, table) - end - - # add tables to the set which are no longer excluded (or new) - newly_included_tables.each do |table| - _log.info("Adding #{table} to #{PUBLICATION_NAME} replication set") - pglogical.replication_set_add_table(PUBLICATION_NAME, table, true) - end - end + tables = ApplicationRecord.connection.tables - configured_excludes + pglogical.set_publication_tables(PUBLICATION_NAME, tables) end def replication_lag @@ -127,18 +84,6 @@ def replication_wal_retained pglogical.wal_retained_bytes end - def self.local_node_name - region_to_node_name(MiqRegion.my_region_number) - end - - def self.region_to_node_name(region_id) - "#{NODE_PREFIX}#{region_id}" - end - - def self.node_name_to_region(name) - name.sub(NODE_PREFIX, "").to_i - end - def self.default_excludes YAML.load_file(Rails.root.join("config/default_replication_exclude_tables.yml"))[:exclude_tables] | ALWAYS_EXCLUDED_TABLES end @@ -160,21 +105,6 @@ def self.save_global_region(subscriptions_to_save, subscriptions_to_remove) def pglogical(refresh = false) @pglogical = nil if refresh - @pglogical ||= @connection.pglogical - end - - def connection_dsn - config = @connection.raw_connection.conninfo_hash.delete_blanks - PG::Connection.parse_connect_args(config) - end - - # tables that are currently included, but we want them excluded - def newly_excluded_tables - included_tables & configured_excludes - end - - # tables that are currently excluded, but we want them included - def newly_included_tables - (@connection.tables - configured_excludes) - included_tables + @pglogical ||= PG::LogicalReplication::Client.new(@connection) end end diff --git a/spec/replication/util/miq_pglogical_spec.rb b/spec/replication/util/miq_pglogical_spec.rb index 98d3a05442b..2ab1dd9d442 100644 --- a/spec/replication/util/miq_pglogical_spec.rb +++ b/spec/replication/util/miq_pglogical_spec.rb @@ -1,146 +1,110 @@ describe MiqPglogical do - context "requires pglogical been installed" do - let(:connection) { ApplicationRecord.connection } - let(:pglogical) { connection.pglogical } + let(:ar_connection) { ApplicationRecord.connection } + let(:pglogical) { PG::LogicalReplication::Client.new(ar_connection.raw_connection) } + before do + EvmSpecHelper.local_miq_server + end + + describe "#active_excludes" do + it "returns an empty array if a provider is not configured" do + expect(subject.active_excludes).to eq([]) + end + end + + describe "#provider?" do + it "is false when a provider is not configured" do + expect(subject.provider?).to be false + end + end + + describe "#configure_provider" do + it "creates the publication" do + subject.configure_provider + expect(pglogical.publications.first["name"]).to eq(described_class::PUBLICATION_NAME) + end + end + + context "when configured as a provider" do before do - skip "pglogical must be installed" unless pglogical.installed? - EvmSpecHelper.local_miq_server + subject.configure_provider end describe "#active_excludes" do - it "returns an empty array if a provider is not configured" do - expect(subject.active_excludes).to eq([]) + it "returns the initial set of excluded tables" do + expect(subject.active_excludes).to eq(ar_connection.tables - subject.included_tables) end end describe "#provider?" do - it "is false when a provider is not configured" do - expect(subject.provider?).to be false + it "is true" do + expect(subject.provider?).to be true end end - describe "#node?" do - it "is false when a provider is not configured" do - expect(subject.node?).to be false + describe "#destroy_provider" do + it "removes the provider configuration" do + subject.destroy_provider + expect(subject.provider?).to be false end end - describe "#configure_provider" do - it "enables the extenstion and creates the replication set" do - subject.configure_provider - expect(pglogical.enabled?).to be true - expect(pglogical.replication_sets).to include(described_class::REPLICATION_SET_NAME) - end - - it "does not enable the extension when an exception is raised" do - expect(subject).to receive(:create_replication_set).and_raise(PG::UniqueViolation) - expect { subject.configure_provider }.to raise_error(PG::UniqueViolation) - expect(pglogical.enabled?).to be false + describe "#create_replication_set" do + it "creates the correct initial set" do + expected_excludes = subject.configured_excludes + extra_excludes = subject.configured_excludes - ar_connection.tables + actual_excludes = ar_connection.tables - subject.included_tables + expect(actual_excludes | extra_excludes).to match_array(expected_excludes) end end - context "when configured as a provider" do - before do - subject.configure_provider - end - - describe "#active_excludes" do - it "returns the initial set of excluded tables" do - expect(subject.active_excludes).to eq(connection.tables - subject.included_tables) - end - end + describe ".refresh_excludes" do + it "sets the configured excludes and calls refresh on an instance" do + pgl = described_class.new + expect(described_class).to receive(:new).and_return(pgl) + expect(pgl).to receive(:refresh_excludes) - describe "#provider?" do - it "is true" do - expect(subject.provider?).to be true - end - end + new_excludes = %w(my new exclude tables) + described_class.refresh_excludes(new_excludes) - describe "#node?" do - it "is true" do - expect(subject.node?).to be true - end + expect(pgl.configured_excludes).to match_array(new_excludes | described_class::ALWAYS_EXCLUDED_TABLES) end + end - describe "#destroy_provider" do - it "removes the provider configuration" do - subject.destroy_provider - expect(subject.provider?).to be false - expect(subject.node?).to be false - expect(connection.extension_enabled?("pglogical")).to be false - end + describe "#refresh_excludes" do + it "adds a new non excluded table" do + ar_connection.exec_query(<<-SQL) + CREATE TABLE test (id INTEGER PRIMARY KEY) + SQL + subject.refresh_excludes + expect(subject.included_tables).to include("test") end - describe "#create_replication_set" do - it "creates the correct initial set" do - expected_excludes = subject.configured_excludes - extra_excludes = subject.configured_excludes - connection.tables - actual_excludes = connection.tables - subject.included_tables - expect(actual_excludes | extra_excludes).to match_array(expected_excludes) - end - end + it "removes a newly excluded table" do + table = subject.included_tables.first + subject.configured_excludes += [table] - describe ".refresh_excludes" do - it "sets the configured excludes and calls refresh on an instance" do - pgl = described_class.new - expect(described_class).to receive(:new).and_return(pgl) - expect(pgl).to receive(:refresh_excludes) + expect(subject.active_excludes).not_to include(table) + expect(subject.included_tables).to include(table) - new_excludes = %w(my new exclude tables) - described_class.refresh_excludes(new_excludes) + subject.refresh_excludes - expect(pgl.configured_excludes).to match_array(new_excludes | described_class::ALWAYS_EXCLUDED_TABLES) - end + expect(subject.active_excludes).to include(table) + expect(subject.included_tables).not_to include(table) end - describe "#refresh_excludes" do - it "adds a new non excluded table" do - connection.exec_query(<<-SQL) - CREATE TABLE test (id INTEGER PRIMARY KEY) - SQL - subject.refresh_excludes - expect(subject.included_tables).to include("test") - end - - it "removes a newly excluded table" do - table = subject.included_tables.first - subject.configured_excludes += [table] - - expect(subject.active_excludes).not_to include(table) - expect(subject.included_tables).to include(table) - - subject.refresh_excludes + it "adds a newly included table" do + current_excludes = subject.configured_excludes + table = current_excludes.pop + subject.configured_excludes = current_excludes - expect(subject.active_excludes).to include(table) - expect(subject.included_tables).not_to include(table) - end + expect(subject.active_excludes).to include(table) + expect(subject.included_tables).not_to include(table) - it "adds a newly included table" do - current_excludes = subject.configured_excludes - table = current_excludes.pop - subject.configured_excludes = current_excludes - - expect(subject.active_excludes).to include(table) - expect(subject.included_tables).not_to include(table) - - subject.refresh_excludes - - expect(subject.active_excludes).not_to include(table) - expect(subject.included_tables).to include(table) - end - end - end - - describe ".region_to_node_name" do - it "returns the correct name" do - expect(described_class.region_to_node_name(4)).to eq("region_4") - end - end + subject.refresh_excludes - describe ".node_name_to_region" do - it "returns the correct region" do - expect(described_class.node_name_to_region("region_5")).to eq(5) + expect(subject.active_excludes).not_to include(table) + expect(subject.included_tables).to include(table) end end end From 1bb5deb5791dfcb2c34ef16a45c911fa51f46369 Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Tue, 23 Apr 2019 16:45:02 -0400 Subject: [PATCH 03/23] Remove the MiqRegion.replication_enabled? method No one is using this and the concept of a "node" is gone now --- app/models/miq_region.rb | 4 ---- 1 file changed, 4 deletions(-) diff --git a/app/models/miq_region.rb b/app/models/miq_region.rb index 80facb45079..0e3677d6caf 100644 --- a/app/models/miq_region.rb +++ b/app/models/miq_region.rb @@ -150,10 +150,6 @@ def self.global_replication_type? MiqPglogical.new.subscriber? end - def self.replication_enabled? - MiqPglogical.new.node? - end - def self.replication_type if global_replication_type? :global From 07b0570e4437345c07f29bf91e9d6f7302e4a563 Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Wed, 24 Apr 2019 17:00:21 -0400 Subject: [PATCH 04/23] Update PglogicalSubscription model to use new logical replication gem --- app/models/pglogical_subscription.rb | 82 +++----- spec/models/pglogical_subscription_spec.rb | 228 +++++---------------- 2 files changed, 82 insertions(+), 228 deletions(-) diff --git a/app/models/pglogical_subscription.rb b/app/models/pglogical_subscription.rb index 1da59a85cf6..925c0965724 100644 --- a/app/models/pglogical_subscription.rb +++ b/app/models/pglogical_subscription.rb @@ -1,9 +1,5 @@ -# This model wraps a pglogical stored proc (pglogical.show_subscription_status) -# This is exposed to us through the PostgreSQLAdapter#pglogical object's #subscriptions method -# This model then exposes select values returned from that method require 'pg/dsn_parser' -require 'pg/pglogical' -require 'pg/pglogical/active_record_extension' +require 'pg/logical_replication' class PglogicalSubscription < ActsAsArModel set_columns_hash( @@ -65,12 +61,8 @@ def self.save_all!(subscription_list) end def delete(reload_failover_monitor_service = true) - pglogical.subscription_drop(id, true) + pglogical.drop_subscription(id, true) MiqRegion.destroy_region(connection, provider_region) - if self.class.count == 0 - pglogical.node_drop(MiqPglogical.local_node_name, true) - pglogical.disable - end EvmDatabase.restart_failover_monitor_service_queue if reload_failover_monitor_service end @@ -81,16 +73,16 @@ def self.delete_all(list = nil) end def disable - pglogical.subscription_disable(id).check + pglogical.disable_subscription(id).check end def enable - pglogical.subscription_enable(id).check + pglogical.enable_subscription(id).check end def self.pglogical(refresh = false) @pglogical = nil if refresh - @pglogical ||= connection.pglogical + @pglogical ||= PG::LogicalReplication::Client.new(connection.raw_connection) end def pglogical(refresh = false) @@ -108,7 +100,7 @@ def validate(new_connection_params = {}) end def backlog - connection.xlog_location_diff(remote_node_lsn, remote_replication_lsn) + connection.xlog_location_diff(remote_region_lsn, subscription_status["remote_replication_lsn"]) rescue PG::Error => e _log.error(e.message) nil @@ -119,18 +111,19 @@ def self.subscription_to_columns(sub) cols = sub.symbolize_keys # delete the things we dont care about + cols.delete(:owner) cols.delete(:slot_name) - cols.delete(:replication_sets) - cols.delete(:forward_origins) + cols.delete(:publications) cols.delete(:remote_replication_lsn) cols.delete(:local_replication_lsn) cols[:id] = cols.delete(:subscription_name) + cols[:status] = cols.delete(:enabled) ? "replicating" : "down" # create the individual dsn columns - cols.merge!(dsn_attributes(cols.delete(:provider_dsn))) + cols.merge!(dsn_attributes(cols.delete(:subscription_dsn))) - cols.merge!(provider_node_attributes(cols.delete(:provider_node))) + cols.merge!(remote_region_attributes(cols[:id])) end private_class_method :subscription_to_columns @@ -143,17 +136,17 @@ def self.dsn_attributes(dsn) end private_class_method :dsn_attributes - def self.provider_node_attributes(node_name) + def self.remote_region_attributes(subscription_name) attrs = {} - attrs[:provider_region] = MiqPglogical.node_name_to_region(node_name) + attrs[:provider_region] = subscription_name.split("_")[1].to_i region = MiqRegion.find_by_region(attrs[:provider_region]) attrs[:provider_region_name] = region.description if region attrs end - private_class_method :provider_node_attributes + private_class_method :remote_region_attributes def self.subscriptions - pglogical.enabled? ? pglogical.subscriptions : [] + pglogical.subscriptions end private_class_method :subscriptions @@ -188,43 +181,23 @@ def new_subscription_name "region_#{remote_region_number}_subscription" end - def ensure_node_created - return if MiqPglogical.new.node? - - pglogical.enable - node_dsn = PG::Connection.parse_connect_args(connection.raw_connection.conninfo_hash.delete_blanks) - pglogical.node_create(MiqPglogical.local_node_name, node_dsn).check - end - - def with_subscription_disabled - disable - yield - ensure - enable - end - def update_subscription - with_subscription_disabled do - provider_node_name = MiqPglogical.region_to_node_name(provider_region) - find_password if password.nil? - pglogical.node_dsn_update(provider_node_name, dsn) - end + find_password if password.nil? + pglogical.set_subscription_conninfo(id, conn_info_hash) self end # sets this instance's password field to the one in the subscription dsn in the database def find_password return password if password.present? - s = pglogical.subscription_show_status(id).symbolize_keys - dsn_hash = PG::DSNParser.parse(s.delete(:provider_dsn)) + s = subscription_status.symbolize_keys + dsn_hash = PG::DSNParser.parse(s.delete(:subscription_dsn)) self.password = dsn_hash[:password] end def create_subscription - ensure_node_created MiqRegion.destroy_region(connection, remote_region_number) - pglogical.subscription_create(new_subscription_name, dsn, [MiqPglogical::REPLICATION_SET_NAME], - false).check + pglogical.create_subscription(new_subscription_name, conn_info_hash, [MiqPglogical::PUBLICATION_NAME]).check self end @@ -234,26 +207,21 @@ def assert_different_region! end end - def dsn - conf = { + def conn_info_hash + { :dbname => dbname, :host => host, :user => user, :password => decrypted_password, :port => port }.delete_blanks - PG::Connection.parse_connect_args(conf) end def decrypted_password ManageIQ::Password.try_decrypt(password) end - def remote_replication_lsn - pglogical.subscription_show_status(id)["remote_replication_lsn"] - end - - def remote_node_lsn + def remote_region_lsn with_remote_connection(&:xlog_location) end @@ -263,4 +231,8 @@ def with_remote_connection yield conn end end + + def subscription_status + pglogical.subscriptions.find { |s| s["subscription_name"] == id } + end end diff --git a/spec/models/pglogical_subscription_spec.rb b/spec/models/pglogical_subscription_spec.rb index 13bd718277c..4b566a0fe70 100644 --- a/spec/models/pglogical_subscription_spec.rb +++ b/spec/models/pglogical_subscription_spec.rb @@ -5,23 +5,21 @@ [ { "subscription_name" => "region_#{remote_region1}_subscription", - "status" => "replicating", - "provider_node" => "region_#{remote_region1}", - "provider_dsn" => "dbname = 'vmdb\\'s_test' host='example.com' user='root' port='' password='p=as\\' s\\''", - "slot_name" => "pgl_vmdb_test_region_#{remote_region1}_subscripdb71d61", - "replication_sets" => ["miq"], - "forward_origins" => ["all"], + "owner" => "root", + "enabled" => true, + "subscription_dsn" => "dbname = 'vmdb\\'s_test' host='example.com' user='root' port='' password='p=as\\' s\\''", + "slot_name" => "region_#{remote_region1}_subscription", + "publications" => ["miq"], "remote_replication_lsn" => "0/420D9A0", "local_replication_lsn" => "18/72DE8268" }, { "subscription_name" => "region_#{remote_region2}_subscription", - "status" => "disabled", - "provider_node" => "region_#{remote_region2}", - "provider_dsn" => "dbname = vmdb_test2 host=test.example.com user = postgres port=5432 fallback_application_name='bin/rails'", - "slot_name" => "pgl_vmdb_test_region_#{remote_region2}_subscripdb71d61", - "replication_sets" => ["miq"], - "forward_origins" => ["all"], + "owner" => "root", + "enabled" => false, + "subscription_dsn" => "dbname = vmdb_test2 host=test.example.com user = postgres port=5432 fallback_application_name='bin/rails'", + "slot_name" => "region_#{remote_region2}_subscription", + "publications" => ["miq"], "remote_replication_lsn" => "1/53E9A8", "local_replication_lsn" => "20/72FF8369" } @@ -41,7 +39,7 @@ }, { "id" => "region_#{remote_region2}_subscription", - "status" => "disabled", + "status" => "down", "dbname" => "vmdb_test2", "host" => "test.example.com", "user" => "postgres", @@ -83,11 +81,6 @@ expect(described_class.all).to be_empty expect(described_class.find(:all)).to be_empty end - - it "retrieves an empty array with pglogical disabled" do - with_pglogical_disabled - expect(described_class.all).to be_empty - end end describe ".first" do @@ -101,11 +94,6 @@ with_no_records expect(described_class.find(:first)).to be_nil end - - it "returns nil with pglogical disabled" do - with_pglogical_disabled - expect(described_class.find(:first)).to be_nil - end end describe ".last" do @@ -119,11 +107,6 @@ with_no_records expect(described_class.find(:last)).to be_nil end - - it "returns nil with :last" do - with_pglogical_disabled - expect(described_class.find(:last)).to be_nil - end end describe ".find" do @@ -138,11 +121,6 @@ with_no_records expect { described_class.find("doesnt_exist") }.to raise_error(ActiveRecord::RecordNotFound) end - - it "raises with pglogical disabled" do - with_pglogical_disabled - expect { described_class.find("doesnt_exist") }.to raise_error(ActiveRecord::RecordNotFound) - end end describe ".find_by_id" do @@ -157,11 +135,6 @@ with_no_records expect(described_class.find_by_id("some_subscription")).to be_nil end - - it "returns nil with pglogical disabled" do - with_pglogical_disabled - expect(described_class.find_by_id("some_subscription")).to be_nil - end end describe "#save!" do @@ -169,11 +142,9 @@ let(:sub) { described_class.new(:host => "test-2.example.com", :user => "root", :password => "1234") } before do with_no_records - allow(pglogical).to receive(:subscription_show_status).and_return(subscriptions.first) - allow(pglogical).to receive(:subscription_create).and_return(double(:check => nil)) + allow(pglogical).to receive(:create_subscription).and_return(double(:check => nil)) allow(MiqRegionRemote).to receive(:with_remote_connection).and_yield(double(:connection)) allow(sub).to receive(:remote_region_number).and_return(remote_region1) - allow(sub).to receive(:ensure_node_created).and_return(true) end it "doesn't queue a message to restart the failover monitor service when passed 'false'" do @@ -189,118 +160,60 @@ it "raises when subscribing to the same region" do with_no_records - allow(pglogical).to receive(:subscription_show_status).and_return(subscriptions.first) allow(MiqRegionRemote).to receive(:with_remote_connection).and_yield(double(:connection)) - sub = described_class.new(:host => "some.host.example.com") + sub = described_class.new(:host => "some.host.example.com", :password => "password") expect { sub.save! }.to raise_error(RuntimeError, "Subscriptions cannot be created to the same region as the current region") end it "does not raise when subscribing to a different region" do with_no_records - allow(pglogical).to receive(:subscription_show_status).and_return(subscriptions.first) - allow(pglogical).to receive(:subscription_create).and_return(double(:check => nil)) + allow(pglogical).to receive(:create_subscription).and_return(double(:check => nil)) allow(MiqRegionRemote).to receive(:with_remote_connection).and_yield(double(:connection)) sub = described_class.new(:host => "test-2.example.com", :user => "root", :password => "1234") allow(sub).to receive(:remote_region_number).and_return(remote_region1) - allow(sub).to receive(:ensure_node_created).and_return(true) expect { sub.save! }.not_to raise_error end - it "creates the node when there are no subscriptions" do + it "creates the subscription" do with_no_records allow(MiqRegionRemote).to receive(:with_remote_connection).and_yield(double(:connection)) allow(MiqRegionRemote).to receive(:region_number_from_sequence).and_return(2) - # node created if we are not already a node - expect(MiqPglogical).to receive(:new).and_return(double(:node? => false)) - expect(pglogical).to receive(:enable) - expect(pglogical).to receive(:node_create).and_return(double(:check => nil)) - - # subscription is created - expect(pglogical).to receive(:subscription_create) do |name, dsn, replication_sets, sync_structure| - expect(name).to eq("region_2_subscription") - expect(dsn).to include("host='test-2.example.com'") - expect(dsn).to include("user='root'") - expect(replication_sets).to eq(['miq']) - expect(sync_structure).to be false - end.and_return(double(:check => nil)) + dsn = { + :host => "test-2.example.com", + :user => "root", + :password => "1234" + } + expect(pglogical).to receive(:create_subscription).with("region_2_subscription", dsn, ['miq']).and_return(double(:check => nil)) sub = described_class.new(:host => "test-2.example.com", :user => "root", :password => "1234") allow(sub).to receive(:assert_different_region!) - sub.save! - end - - it "doesnt create the node when we are already a node" do - with_no_records - allow(MiqRegionRemote).to receive(:with_remote_connection).and_yield(double(:connection)) - allow(MiqRegionRemote).to receive(:region_number_from_sequence).and_return(2) - - # node not created if we are already a node - expect(MiqPglogical).to receive(:new).and_return(double(:node? => true)) - expect(pglogical).not_to receive(:enable) - expect(pglogical).not_to receive(:node_create) - - # subscription is created - expect(pglogical).to receive(:subscription_create) do |name, dsn, replication_sets, sync_structure| - expect(name).to eq("region_2_subscription") - expect(dsn).to include("host='test-2.example.com'") - expect(dsn).to include("user='root'") - expect(replication_sets).to eq(['miq']) - expect(sync_structure).to be false - end.and_return(double(:check => nil)) - - sub = described_class.new(:host => "test-2.example.com", :password => "1234", :user => "root") - allow(sub).to receive(:assert_different_region!) - sub.save! expect(sub).to be_an_instance_of(described_class) end it "updates the dsn when an existing subscription is saved" do with_records - allow(pglogical).to receive(:subscription_show_status).and_return(subscriptions.first) allow(MiqRegionRemote).to receive(:with_remote_connection).and_yield(double(:connection)) sub = described_class.find(:first) sub.host = "other-host.example.com" allow(sub).to receive(:assert_different_region!) - expect(pglogical).to receive(:subscription_disable).with(sub.id) - .and_return(double(:check => nil)) - expect(pglogical).to receive(:node_dsn_update) do |provider_node_name, new_dsn| - expect(provider_node_name).to eq("region_#{remote_region1}") - expect(new_dsn).to include("host='other-host.example.com'") - expect(new_dsn).to include("dbname='vmdb\\'s_test'") - expect(new_dsn).to include("user='root'") - expect(new_dsn).to include("password='p=as\\' s\\''") - end - expect(pglogical).to receive(:subscription_enable).with(sub.id) - .and_return(double(:check => nil)) + new_dsn = { + :host => "other-host.example.com", + :dbname => sub.dbname, + :user => sub.user, + :password => "p=as\' s\'" + } + expect(pglogical).to receive(:set_subscription_conninfo).with(sub.id, new_dsn) expect(sub.save!).to eq(sub) end - - it "reenables the subscription when the dsn fails to save" do - with_records - allow(pglogical).to receive(:subscription_show_status).and_return(subscriptions.first) - allow(MiqRegionRemote).to receive(:with_remote_connection).and_yield(double(:connection)) - - sub = described_class.find(:first) - sub.host = "other-host.example.com" - allow(sub).to receive(:assert_different_region!) - - expect(pglogical).to receive(:subscription_disable).with(sub.id) - .and_return(double(:check => nil)) - expect(pglogical).to receive(:node_dsn_update).and_raise("Some Error") - expect(pglogical).to receive(:subscription_enable).with(sub.id) - .and_return(double(:check => nil)) - - expect { sub.save! }.to raise_error(RuntimeError, "Some Error") - end end describe ".delete_all" do @@ -335,30 +248,23 @@ allow(MiqRegionRemote).to receive(:with_remote_connection).and_yield(double(:connection)) allow(MiqRegionRemote).to receive(:region_number_from_sequence).and_return(2, 2, 3, 3) - # node created - allow(pglogical).to receive(:enable) - allow(pglogical).to receive(:node_create).and_return(double(:check => nil)) - - # subscription is created - expect(pglogical).to receive(:subscription_create) do |name, dsn, replication_sets, sync_structure| - expect(name).to eq("region_2_subscription") - expect(dsn).to include("host='test-2.example.com'") - expect(dsn).to include("user='root'") - expect(replication_sets).to eq(['miq']) - expect(sync_structure).to be false - end.and_return(double(:check => nil)) - - expect(pglogical).to receive(:subscription_create) do |name, dsn, replication_sets, sync_structure| - expect(name).to eq("region_3_subscription") - expect(dsn).to include("host='test-3.example.com'") - expect(dsn).to include("user='miq'") - expect(replication_sets).to eq(['miq']) - expect(sync_structure).to be false - end.and_return(double(:check => nil)) + dsn2 = { + :host => "test-2.example.com", + :user => "root", + :password => "1234" + } + expect(pglogical).to receive(:create_subscription).with("region_2_subscription", dsn2, ['miq']).and_return(double(:check => nil)) + + dsn3 = { + :host => "test-3.example.com", + :user => "miq", + :password => "1234" + } + expect(pglogical).to receive(:create_subscription).with("region_3_subscription", dsn3, ['miq']).and_return(double(:check => nil)) to_save = [] - to_save << described_class.new(:host => "test-2.example.com", :password => "1234", :user => "root") - to_save << described_class.new(:host => "test-3.example.com", :password => "1234", :user => "miq") + to_save << described_class.new(dsn2) + to_save << described_class.new(dsn3) to_save.each { |s| allow(s).to receive(:assert_different_region!) } described_class.save_all!(to_save) @@ -369,20 +275,14 @@ allow(MiqRegionRemote).to receive(:with_remote_connection).and_yield(double(:connection)) allow(MiqRegionRemote).to receive(:region_number_from_sequence).and_return(2, 2, 3, 3, 4, 4) - # node created - allow(pglogical).to receive(:enable) - allow(pglogical).to receive(:node_create).and_return(double(:check => nil)) - - # subscription is created - expect(pglogical).to receive(:subscription_create).ordered.and_raise(PG::Error.new("Error one")) - expect(pglogical).to receive(:subscription_create) do |name, dsn, replication_sets, sync_structure| - expect(name).to eq("region_3_subscription") - expect(dsn).to include("host='test-3.example.com'") - expect(dsn).to include("user='miq'") - expect(replication_sets).to eq(['miq']) - expect(sync_structure).to be false - end.ordered.and_return(double(:check => nil)) - expect(pglogical).to receive(:subscription_create).ordered.and_raise("Error two") + expect(pglogical).to receive(:create_subscription).ordered.and_raise(PG::Error.new("Error one")) + dsn3 = { + :host => "test-3.example.com", + :user => "miq", + :password => "1234" + } + expect(pglogical).to receive(:create_subscription).ordered.with("region_3_subscription", dsn3, ['miq']).and_return(double(:check => nil)) + expect(pglogical).to receive(:create_subscription).ordered.and_raise("Error two") to_save = [] to_save << described_class.new(:host => "test-2.example.com", :user => "root", :password => "1234") @@ -396,20 +296,14 @@ end describe "#delete" do - before do - allow(pglogical).to receive(:enabled?).and_return(true) - end - let(:sub) { described_class.find(:first) } - it "drops the node when this is the last subscription" do + it "drops the subscription" do allow(pglogical).to receive(:subscriptions).and_return([subscriptions.first], []) - expect(pglogical).to receive(:subscription_drop).with("region_#{remote_region1}_subscription", true) + expect(pglogical).to receive(:drop_subscription).with("region_#{remote_region1}_subscription", true) expect(MiqRegion).to receive(:destroy_region) .with(instance_of(ActiveRecord::ConnectionAdapters::PostgreSQLAdapter), remote_region1) - expect(pglogical).to receive(:node_drop).with("region_#{MiqRegion.my_region_number}", true) - expect(pglogical).to receive(:disable) sub.delete end @@ -417,7 +311,7 @@ it "doesn't queue a failover monitor restart when passed false" do allow(pglogical).to receive(:subscriptions).and_return(subscriptions, [subscriptions.last]) - expect(pglogical).to receive(:subscription_drop).with("region_#{remote_region1}_subscription", true) + expect(pglogical).to receive(:drop_subscription).with("region_#{remote_region1}_subscription", true) expect(MiqQueue.where(:method_name => "restart_failover_monitor_service")).to be_empty sub.delete(false) @@ -426,9 +320,7 @@ describe "#validate" do it "validates existing subscriptions with new parameters" do - allow(pglogical).to receive(:enabled?).and_return(true) allow(pglogical).to receive(:subscriptions).and_return([subscriptions.first]) - allow(pglogical).to receive(:subscription_show_status).and_return(subscriptions.first) sub = described_class.find(:first) expect(sub.host).to eq "example.com" @@ -447,7 +339,6 @@ sub.user = "root" sub.dbname = "vmdb_production" - expect(pglogical).not_to receive(:subscription_show_status) expect(MiqRegionRemote).to receive(:validate_connection_settings) .with("my.example.com", nil, "root", "thepassword", "vmdb_production") sub.validate @@ -456,7 +347,6 @@ it "validates connection parameters without accessing database or initializing subscription parameters" do sub = described_class.new - expect(pglogical).not_to receive(:subscription_show_status) expect(MiqRegionRemote).to receive(:validate_connection_settings) .with("my.example.com", nil, "root", "mypass", "vmdb_production") sub.validate('host' => "my.example.com", 'user' => "root", 'password' => "mypass", 'dbname' => "vmdb_production") @@ -467,9 +357,7 @@ let(:remote_connection) { double(:remote_connection) } before do - allow(pglogical).to receive(:enabled?).and_return(true) allow(pglogical).to receive(:subscriptions).and_return([subscriptions.first]) - allow(pglogical).to receive(:subscription_show_status).and_return(subscriptions.first) end it "returns the correct value" do @@ -479,7 +367,7 @@ expect(described_class.first.backlog).to eq(12_120) end - it "returns nill if error raised inside" do + it "returns nil if error raised inside" do expect(MiqRegionRemote).to receive(:with_remote_connection).and_raise(PG::Error) expect(described_class.first.backlog).to be nil @@ -490,15 +378,9 @@ def with_records allow(pglogical).to receive(:subscriptions).and_return(subscriptions) - allow(pglogical).to receive(:enabled?).and_return(true) end def with_no_records allow(pglogical).to receive(:subscriptions).and_return([]) - allow(pglogical).to receive(:enabled?).and_return(true) - end - - def with_pglogical_disabled - allow(pglogical).to receive(:enabled?).and_return(false) end end From eb9846420e6a656a3b7bdc43963687ad6d898fa1 Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Wed, 24 Apr 2019 17:21:58 -0400 Subject: [PATCH 05/23] Add new replication gem to Gemfile --- Gemfile | 1 + 1 file changed, 1 insertion(+) diff --git a/Gemfile b/Gemfile index 4b58aa9a893..910887799b4 100644 --- a/Gemfile +++ b/Gemfile @@ -163,6 +163,7 @@ end group :replication, :manageiq_default do gem "pg-pglogical", "~>2.1.2", :require => false + gem "pg-logical_replication", "~>0.1", :git => "https://github.com/ManageIQ/pg-logical_replication.git", :require => false, :branch => "master" end group :rest_api, :manageiq_default do From cf8fa22d44b91460c08d2ac238008f525d286279 Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Mon, 29 Apr 2019 13:24:14 -0400 Subject: [PATCH 06/23] Rename MiqPglogical @connection instance var to @pg_connection --- lib/miq_pglogical.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/miq_pglogical.rb b/lib/miq_pglogical.rb index 264166cee84..46e9ecc5f57 100644 --- a/lib/miq_pglogical.rb +++ b/lib/miq_pglogical.rb @@ -10,7 +10,7 @@ class MiqPglogical attr_reader :configured_excludes def initialize - @connection = ApplicationRecord.connection.raw_connection + @pg_connection = ApplicationRecord.connection.raw_connection self.configured_excludes = provider? ? active_excludes : self.class.default_excludes end @@ -105,6 +105,6 @@ def self.save_global_region(subscriptions_to_save, subscriptions_to_remove) def pglogical(refresh = false) @pglogical = nil if refresh - @pglogical ||= PG::LogicalReplication::Client.new(@connection) + @pglogical ||= PG::LogicalReplication::Client.new(@pg_connection) end end From b0795d909c261b3c48d6ed77082a95b3aced75c3 Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Mon, 29 Apr 2019 14:00:26 -0400 Subject: [PATCH 07/23] Use the #subscriber? method from the client gem --- lib/miq_pglogical.rb | 6 ++---- spec/replication/util/miq_pglogical_spec.rb | 6 ++++++ 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/lib/miq_pglogical.rb b/lib/miq_pglogical.rb index 46e9ecc5f57..1efd6c8885a 100644 --- a/lib/miq_pglogical.rb +++ b/lib/miq_pglogical.rb @@ -14,6 +14,8 @@ def initialize self.configured_excludes = provider? ? active_excludes : self.class.default_excludes end + delegate :subscriber?, :to => :pglogical + # Sets the tables that should be used to create the publication using refresh_excludes def configured_excludes=(new_excludes) @configured_excludes = new_excludes | ALWAYS_EXCLUDED_TABLES @@ -30,10 +32,6 @@ def provider? pglogical.publications.map { |p| p["name"] }.include?(PUBLICATION_NAME) end - def subscriber? - !pglogical.subscriptions.empty? - end - def configure_provider return if provider? create_replication_set diff --git a/spec/replication/util/miq_pglogical_spec.rb b/spec/replication/util/miq_pglogical_spec.rb index 2ab1dd9d442..f5eaec6e798 100644 --- a/spec/replication/util/miq_pglogical_spec.rb +++ b/spec/replication/util/miq_pglogical_spec.rb @@ -12,6 +12,12 @@ end end + describe "#subscriber?" do + it "is false when a subscription is not configured" do + expect(subject.subscriber?).to be false + end + end + describe "#provider?" do it "is false when a provider is not configured" do expect(subject.provider?).to be false From ff604b4c5396299b2a5df2d66499aaf18233e29a Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Mon, 29 Apr 2019 14:02:39 -0400 Subject: [PATCH 08/23] Use the #publishes? method for MiqPglogical#provider? --- lib/miq_pglogical.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/miq_pglogical.rb b/lib/miq_pglogical.rb index 1efd6c8885a..a8f71e87dbe 100644 --- a/lib/miq_pglogical.rb +++ b/lib/miq_pglogical.rb @@ -29,7 +29,7 @@ def active_excludes end def provider? - pglogical.publications.map { |p| p["name"] }.include?(PUBLICATION_NAME) + pglogical.publishes?(PUBLICATION_NAME) end def configure_provider From ea9167b9bb9fa9dbad8bc3f23ac071b1e0e1eb8f Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Mon, 29 Apr 2019 14:03:37 -0400 Subject: [PATCH 09/23] Move the MiqPglogical spec into the regular location Previously it was a separate suite, but now that we're using the built-in replication technology we can put it next to the other lib specs --- spec/{replication/util => lib}/miq_pglogical_spec.rb | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename spec/{replication/util => lib}/miq_pglogical_spec.rb (100%) diff --git a/spec/replication/util/miq_pglogical_spec.rb b/spec/lib/miq_pglogical_spec.rb similarity index 100% rename from spec/replication/util/miq_pglogical_spec.rb rename to spec/lib/miq_pglogical_spec.rb From 81ddc80b7fbe3e7289b51f23966ab7605c210f85 Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Mon, 29 Apr 2019 15:42:55 -0400 Subject: [PATCH 10/23] Use the new config handler for logical replication failover --- Gemfile | 2 +- lib/evm_database.rb | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Gemfile b/Gemfile index 910887799b4..5efbb829641 100644 --- a/Gemfile +++ b/Gemfile @@ -50,7 +50,7 @@ gem "log_decorator", "~>0.1", :require => false gem "manageiq-api-client", "~>0.3.3", :require => false gem "manageiq-messaging", "~>0.1.4", :require => false gem "manageiq-password", "~>0.3", :require => false -gem "manageiq-postgres_ha_admin", "~>3.0", :require => false +gem "manageiq-postgres_ha_admin", "~>3.0", :git => "https://github.com/carbonin/manageiq-postgres_ha_admin.git", :branch => "add_logical_replication_config_handler", :require => false gem "memoist", "~>0.15.0", :require => false gem "mime-types", "~>3.0", :path => File.expand_path("mime-types-redirector", __dir__) gem "more_core_extensions", "~>3.7" diff --git a/lib/evm_database.rb b/lib/evm_database.rb index f0a4b35deb1..e587bcaa228 100644 --- a/lib/evm_database.rb +++ b/lib/evm_database.rb @@ -147,7 +147,7 @@ def self.run_failover_monitor(monitor = nil) monitor ||= ManageIQ::PostgresHaAdmin::FailoverMonitor.new(Rails.root.join("config", "ha_admin.yml")) configure_rails_handler(monitor) - configure_pglogical_handlers(monitor) + configure_logical_replication_handlers(monitor) _log.info("Starting database failover monitor") monitor.monitor_loop @@ -177,12 +177,12 @@ def self.configure_rails_handler(monitor) end private_class_method :configure_rails_handler - def self.configure_pglogical_handlers(monitor) + def self.configure_logical_replication_handlers(monitor) return unless MiqServer.my_server.has_active_role?("database_operations") local_db_conninfo = ActiveRecord::Base.connection.raw_connection.conninfo_hash.delete_blanks PglogicalSubscription.all.each do |s| - handler = ManageIQ::PostgresHaAdmin::PglogicalConfigHandler.new(:subscription => s.id, :conn_info => local_db_conninfo) + handler = ManageIQ::PostgresHaAdmin::LogicalReplicationConfigHandler.new(:subscription => s.id, :conn_info => local_db_conninfo) _log.info("Configuring database failover for replication subscription #{s.id} ") handler.after_failover do |new_conn_info| @@ -193,5 +193,5 @@ def self.configure_pglogical_handlers(monitor) monitor.add_handler(handler) end end - private_class_method :configure_pglogical_handlers + private_class_method :configure_logical_replication_handlers end From 27365fdfc6d12459b744d6492731489e56fb0acf Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Mon, 29 Apr 2019 16:19:13 -0400 Subject: [PATCH 11/23] Resync all subscriptions after migrating the database This is needed to allow the subscription to pick up on tables that were newly added to the publication in the remote region. --- app/models/pglogical_subscription.rb | 4 ++++ lib/tasks/evm_dbsync.rake | 1 + 2 files changed, 5 insertions(+) diff --git a/app/models/pglogical_subscription.rb b/app/models/pglogical_subscription.rb index 925c0965724..3ca67e56500 100644 --- a/app/models/pglogical_subscription.rb +++ b/app/models/pglogical_subscription.rb @@ -106,6 +106,10 @@ def backlog nil end + def sync_tables + pglogical.sync_subscription(id) + end + # translate the output from the pglogical stored proc to our object columns def self.subscription_to_columns(sub) cols = sub.symbolize_keys diff --git a/lib/tasks/evm_dbsync.rake b/lib/tasks/evm_dbsync.rake index 2574944ed90..7f887afde0e 100644 --- a/lib/tasks/evm_dbsync.rake +++ b/lib/tasks/evm_dbsync.rake @@ -23,6 +23,7 @@ namespace :evm do require 'miq_pglogical' pgl = MiqPglogical.new pgl.refresh_excludes if pgl.provider? + PglogicalSubscription.all.each(&:sync_tables) end end end From c7fbecfc3951aab5aac2b4de4f13c20b4212c103 Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Mon, 29 Apr 2019 16:20:40 -0400 Subject: [PATCH 12/23] Use the new client gem in the migration patch --- lib/extensions/ar_migration.rb | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/extensions/ar_migration.rb b/lib/extensions/ar_migration.rb index 0116e63987a..a55c493b993 100644 --- a/lib/extensions/ar_migration.rb +++ b/lib/extensions/ar_migration.rb @@ -69,9 +69,11 @@ def wait_message end def restart_subscription - c = SubscriptionHelper.establish_connection.connection - c.pglogical.subscription_disable(subscription.id) - c.pglogical.subscription_enable(subscription.id) + c = SubscriptionHelper.establish_connection.connection.raw_connection + rep_client = PG::LogicalReplication::Client.new(c) + + rep_client.disable_subscription(subscription.id) + rep_client.enable_subscription(subscription.id) ensure SubscriptionHelper.remove_connection end From 03d599a826085e9e4a800e5bb228c3dc25dd995e Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Tue, 30 Apr 2019 14:16:24 -0400 Subject: [PATCH 13/23] Fix spec for logical replication HA config handler --- spec/lib/evm_database_spec.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/lib/evm_database_spec.rb b/spec/lib/evm_database_spec.rb index ba383f880fd..f0cdd769004 100644 --- a/spec/lib/evm_database_spec.rb +++ b/spec/lib/evm_database_spec.rb @@ -155,7 +155,7 @@ expect(h.environment).to eq("test") end - it "adds a pglogical config handler for every subscription when our server has the database_operations role" do + it "adds a logical replication config handler for every subscription when our server has the database_operations role" do ServerRole.seed server.role = "database_operations" server.activate_all_roles @@ -164,7 +164,7 @@ handlers = monitor.config_handlers.map(&:first) expect(handlers.count).to eq(3) - handlers.delete_if { |h| h.kind_of?(ManageIQ::PostgresHaAdmin::RailsConfigHandler) } + handlers.select! { |h| h.kind_of?(ManageIQ::PostgresHaAdmin::LogicalReplicationConfigHandler) } expect(handlers.count).to eq(2) expect(%w(sub_id_1 sub_id_2)).to include(handlers.first.subscription) expect(%w(sub_id_1 sub_id_2)).to include(handlers.last.subscription) From 916c65f0da3b825e41a0b7c4a62898c3041504c0 Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Tue, 30 Apr 2019 17:18:41 -0400 Subject: [PATCH 14/23] Don't try to wait for pglogical connections to go away on restore There won't be any matching connections and the new logical replication tech doesn't have an analogous connection to wait for. Instead just wait for all the client connections to be gone. This is a new field in pg_stat_activity in PostgreSQL 10. The previous behavior was to only show entries in the view for client connections. But now that the new identifier was added, backend processes are also present and would make our count incorrect. We also don't need to change the default scope on VmdbDatabaseConnection because none of these new entries will be specifically connecting to a database (the current scope is limiting us to the currently configured database) --- lib/evm_database_ops.rb | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/lib/evm_database_ops.rb b/lib/evm_database_ops.rb index 0d73d12f667..c7bd178edda 100644 --- a/lib/evm_database_ops.rb +++ b/lib/evm_database_ops.rb @@ -186,13 +186,8 @@ def self.restore(db_opts, connect_opts = {}) end MiqRegion.replication_type = :none - 60.times do - break if VmdbDatabaseConnection.where("application_name LIKE 'pglogical manager%'").count.zero? - _log.info("Waiting for pglogical connections to close...") - sleep 5 - end - connection_count = backup_type == :basebackup ? VmdbDatabaseConnection.unscoped.count : VmdbDatabaseConnection.count + connection_count = backup_type == :basebackup ? VmdbDatabaseConnection.unscoped.where(:backend_type => "client backend").count : VmdbDatabaseConnection.count if connection_count > 1 message = "Database restore failed. #{connection_count - 1} connections remain to the database." _log.error(message) From 07728300ad34945d6611d52e2e77b158863d8979 Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Mon, 6 May 2019 15:53:46 -0400 Subject: [PATCH 15/23] Disable and unset the subscription slot if the publisher is unreachable Before this change, the failover monitor would not be able to remove the subscription when the remote region failed over because it couldn't remove the replication slot. To deal with this we need to disable the subscription, set the replication slot to the special value "NONE" then retry the delete. We also don't want to do this on every delete because in every other case we want the remote replication slot to be removed when the subscription is removed. More details can be found in the PostgreSQL documentation: https://www.postgresql.org/docs/10/sql-dropsubscription.html --- app/models/pglogical_subscription.rb | 11 ++++- spec/models/pglogical_subscription_spec.rb | 48 ++++++++++++++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/app/models/pglogical_subscription.rb b/app/models/pglogical_subscription.rb index 3ca67e56500..54b5cbe1622 100644 --- a/app/models/pglogical_subscription.rb +++ b/app/models/pglogical_subscription.rb @@ -61,7 +61,7 @@ def self.save_all!(subscription_list) end def delete(reload_failover_monitor_service = true) - pglogical.drop_subscription(id, true) + safe_delete MiqRegion.destroy_region(connection, provider_region) EvmDatabase.restart_failover_monitor_service_queue if reload_failover_monitor_service end @@ -175,6 +175,15 @@ def self.find_id(to_find) private + def safe_delete + pglogical.drop_subscription(id, true) + rescue PG::InternalError => e + raise unless e.message =~ /could not connect to publisher/ || e.message =~ /replication slot .* does not exist/ + disable + pglogical.alter_subscription_options(id, "slot_name" => "NONE") + pglogical.drop_subscription(id, true) + end + def remote_region_number with_remote_connection do |_conn| return MiqRegionRemote.region_number_from_sequence diff --git a/spec/models/pglogical_subscription_spec.rb b/spec/models/pglogical_subscription_spec.rb index 4b566a0fe70..40336d90dc3 100644 --- a/spec/models/pglogical_subscription_spec.rb +++ b/spec/models/pglogical_subscription_spec.rb @@ -316,6 +316,54 @@ sub.delete(false) end + + it "removes the subscription when the publisher is unreachable" do + allow(pglogical).to receive(:subscriptions).and_return([subscriptions.first], []) + exception = PG::InternalError.new(<<~MESSAGE) + ERROR: could not connect to publisher when attempting to drop the replication slot "region_#{remote_region1}_subscription" + DETAIL: The error was: could not connect to server: Connection refused + Is the server running on host "example.com" and accepting + TCP/IP connections on port 5432? + HINT: Use ALTER SUBSCRIPTION ... SET (slot_name = NONE) to disassociate the subscription from the slot. + MESSAGE + + expect(pglogical).to receive(:drop_subscription).with("region_#{remote_region1}_subscription", true).ordered.and_raise(exception) + expect(sub).to receive(:disable) + expect(pglogical).to receive(:alter_subscription_options).with(sub.id, "slot_name" => "NONE") + expect(pglogical).to receive(:drop_subscription).with("region_#{remote_region1}_subscription", true).ordered + expect(MiqRegion).to receive(:destroy_region) + .with(instance_of(ActiveRecord::ConnectionAdapters::PostgreSQLAdapter), remote_region1) + + sub.delete + end + + it "removes the subscription when the replication slot is missing" do + allow(pglogical).to receive(:subscriptions).and_return([subscriptions.first], []) + exception = PG::InternalError.new(<<~MESSAGE) + ERROR: could not drop the replication slot "NONE" on publisher + DETAIL: The error was: ERROR: replication slot "NONE" does not exist + MESSAGE + + expect(pglogical).to receive(:drop_subscription).with("region_#{remote_region1}_subscription", true).ordered.and_raise(exception) + expect(sub).to receive(:disable) + expect(pglogical).to receive(:alter_subscription_options).with(sub.id, "slot_name" => "NONE") + expect(pglogical).to receive(:drop_subscription).with("region_#{remote_region1}_subscription", true).ordered + expect(MiqRegion).to receive(:destroy_region) + .with(instance_of(ActiveRecord::ConnectionAdapters::PostgreSQLAdapter), remote_region1) + + sub.delete + end + + it "re-raises other PG::InternalErrors" do + allow(pglogical).to receive(:subscriptions).and_return([subscriptions.first], []) + exception = PG::InternalError.new(<<~MESSAGE) + ERROR: badness happened :( + MESSAGE + + expect(pglogical).to receive(:drop_subscription).with("region_#{remote_region1}_subscription", true).ordered.and_raise(exception) + + expect { sub.delete }.to raise_error(exception) + end end describe "#validate" do From c620596634c6aa26831a259102b539c010977ade Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Tue, 7 May 2019 10:14:08 -0400 Subject: [PATCH 16/23] Use worker count to determine subscription status --- Gemfile | 3 +- app/models/pglogical_subscription.rb | 18 ++++++++- spec/models/pglogical_subscription_spec.rb | 46 +++++++++++++++++++++- 3 files changed, 62 insertions(+), 5 deletions(-) diff --git a/Gemfile b/Gemfile index 5efbb829641..2b83cb1d4b5 100644 --- a/Gemfile +++ b/Gemfile @@ -162,8 +162,7 @@ group :automate, :seed, :manageiq_default do end group :replication, :manageiq_default do - gem "pg-pglogical", "~>2.1.2", :require => false - gem "pg-logical_replication", "~>0.1", :git => "https://github.com/ManageIQ/pg-logical_replication.git", :require => false, :branch => "master" + gem "pg-logical_replication", "~>0.1", :git => "https://github.com/carbonin/pg-logical_replication.git", :require => false, :branch => "add_subscription_worker_count" end group :rest_api, :manageiq_default do diff --git a/app/models/pglogical_subscription.rb b/app/models/pglogical_subscription.rb index 54b5cbe1622..d127da34695 100644 --- a/app/models/pglogical_subscription.rb +++ b/app/models/pglogical_subscription.rb @@ -121,8 +121,8 @@ def self.subscription_to_columns(sub) cols.delete(:remote_replication_lsn) cols.delete(:local_replication_lsn) - cols[:id] = cols.delete(:subscription_name) - cols[:status] = cols.delete(:enabled) ? "replicating" : "down" + cols[:id] = cols.delete(:subscription_name) + cols[:status] = subscription_status(cols.delete(:worker_count), cols.delete(:enabled)) # create the individual dsn columns cols.merge!(dsn_attributes(cols.delete(:subscription_dsn))) @@ -131,6 +131,20 @@ def self.subscription_to_columns(sub) end private_class_method :subscription_to_columns + def self.subscription_status(workers, enabled) + return "disabled" unless enabled + + case workers + when 0 + "down" + when 1 + "replicating" + else + "initializing" + end + end + private_class_method :subscription_status + def self.dsn_attributes(dsn) attrs = PG::DSNParser.parse(dsn) attrs.select! { |k, _v| [:dbname, :host, :user, :port].include?(k) } diff --git a/spec/models/pglogical_subscription_spec.rb b/spec/models/pglogical_subscription_spec.rb index 40336d90dc3..966a6b36191 100644 --- a/spec/models/pglogical_subscription_spec.rb +++ b/spec/models/pglogical_subscription_spec.rb @@ -1,11 +1,14 @@ describe PglogicalSubscription do let(:remote_region1) { ApplicationRecord.my_region_number + 1 } let(:remote_region2) { ApplicationRecord.my_region_number + 2 } + let(:remote_region3) { ApplicationRecord.my_region_number + 3 } + let(:remote_region4) { ApplicationRecord.my_region_number + 4 } let(:subscriptions) do [ { "subscription_name" => "region_#{remote_region1}_subscription", "owner" => "root", + "worker_count" => 1, "enabled" => true, "subscription_dsn" => "dbname = 'vmdb\\'s_test' host='example.com' user='root' port='' password='p=as\\' s\\''", "slot_name" => "region_#{remote_region1}_subscription", @@ -13,9 +16,32 @@ "remote_replication_lsn" => "0/420D9A0", "local_replication_lsn" => "18/72DE8268" }, + { + "subscription_name" => "region_#{remote_region3}_subscription", + "owner" => "root", + "worker_count" => 0, + "enabled" => true, + "subscription_dsn" => "dbname=vmdb_development host=example.com user='root' port=5432", + "slot_name" => "region_#{remote_region3}_subscription", + "publications" => ["miq"], + "remote_replication_lsn" => "0/420D9A0", + "local_replication_lsn" => "18/72DE8268" + }, + { + "subscription_name" => "region_#{remote_region4}_subscription", + "owner" => "root", + "worker_count" => 4, + "enabled" => true, + "subscription_dsn" => "dbname=vmdb_production host=example.com user='root' port=5432", + "slot_name" => "region_#{remote_region4}_subscription", + "publications" => ["miq"], + "remote_replication_lsn" => "0/420D9A0", + "local_replication_lsn" => "18/72DE8268" + }, { "subscription_name" => "region_#{remote_region2}_subscription", "owner" => "root", + "worker_count" => 0, "enabled" => false, "subscription_dsn" => "dbname = vmdb_test2 host=test.example.com user = postgres port=5432 fallback_application_name='bin/rails'", "slot_name" => "region_#{remote_region2}_subscription", @@ -38,8 +64,26 @@ "provider_region_name" => "The region" }, { - "id" => "region_#{remote_region2}_subscription", + "id" => "region_#{remote_region3}_subscription", "status" => "down", + "dbname" => "vmdb_development", + "host" => "example.com", + "user" => "root", + "port" => 5432, + "provider_region" => remote_region3 + }, + { + "id" => "region_#{remote_region4}_subscription", + "status" => "initializing", + "dbname" => "vmdb_production", + "host" => "example.com", + "user" => "root", + "port" => 5432, + "provider_region" => remote_region4 + }, + { + "id" => "region_#{remote_region2}_subscription", + "status" => "disabled", "dbname" => "vmdb_test2", "host" => "test.example.com", "user" => "postgres", From b8ea3581d1ebf5d3f0a23ec29cecb9a10e368290 Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Tue, 7 May 2019 14:32:10 -0400 Subject: [PATCH 17/23] Only list subscriptions from the current database --- app/models/pglogical_subscription.rb | 3 ++- spec/models/pglogical_subscription_spec.rb | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/app/models/pglogical_subscription.rb b/app/models/pglogical_subscription.rb index d127da34695..99f46079001 100644 --- a/app/models/pglogical_subscription.rb +++ b/app/models/pglogical_subscription.rb @@ -115,6 +115,7 @@ def self.subscription_to_columns(sub) cols = sub.symbolize_keys # delete the things we dont care about + cols.delete(:database_name) cols.delete(:owner) cols.delete(:slot_name) cols.delete(:publications) @@ -164,7 +165,7 @@ def self.remote_region_attributes(subscription_name) private_class_method :remote_region_attributes def self.subscriptions - pglogical.subscriptions + pglogical.subscriptions(connection.current_database) end private_class_method :subscriptions diff --git a/spec/models/pglogical_subscription_spec.rb b/spec/models/pglogical_subscription_spec.rb index 966a6b36191..b7c15c9b417 100644 --- a/spec/models/pglogical_subscription_spec.rb +++ b/spec/models/pglogical_subscription_spec.rb @@ -7,6 +7,7 @@ [ { "subscription_name" => "region_#{remote_region1}_subscription", + "database_name" => "vmdb_production", "owner" => "root", "worker_count" => 1, "enabled" => true, @@ -18,6 +19,7 @@ }, { "subscription_name" => "region_#{remote_region3}_subscription", + "database_name" => "vmdb_production", "owner" => "root", "worker_count" => 0, "enabled" => true, @@ -29,6 +31,7 @@ }, { "subscription_name" => "region_#{remote_region4}_subscription", + "database_name" => "vmdb_production", "owner" => "root", "worker_count" => 4, "enabled" => true, @@ -40,6 +43,7 @@ }, { "subscription_name" => "region_#{remote_region2}_subscription", + "database_name" => "vmdb_production", "owner" => "root", "worker_count" => 0, "enabled" => false, From 1b98b69e3bc3f485c3eb106ba2e7eb2ac0163099 Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Tue, 7 May 2019 14:33:48 -0400 Subject: [PATCH 18/23] Remove user-configurable excluded tables Previously the user could choose which tables would be excluded from replication. The original intention of this was to exclude larger tables if a customer was having trouble replicating all the data. Since we have moved to a more performant solution this is no longer needed. Additionally to change the set of excluded tables with the built-in logical replication, we need to issue a command on the remote database and on the global database. This would make the user experience for this feature much worse. Because of these two reasons, I'm removing the feature. Now, the set of excluded tables will be set by us by updating config/default_replication_exclude_tables.yml --- config/default_replication_exclude_tables.yml | 1 - lib/miq_pglogical.rb | 42 ++-------- spec/lib/miq_pglogical_spec.rb | 79 +------------------ 3 files changed, 7 insertions(+), 115 deletions(-) diff --git a/config/default_replication_exclude_tables.yml b/config/default_replication_exclude_tables.yml index 8f3940f7cf8..ab7d41af4b0 100644 --- a/config/default_replication_exclude_tables.yml +++ b/config/default_replication_exclude_tables.yml @@ -85,7 +85,6 @@ - server_roles - sessions - vim_performance_states -- vim_performance_tag_values - vmdb_database_metrics - vmdb_databases - vmdb_indexes diff --git a/lib/miq_pglogical.rb b/lib/miq_pglogical.rb index a8f71e87dbe..c16a5589f30 100644 --- a/lib/miq_pglogical.rb +++ b/lib/miq_pglogical.rb @@ -7,27 +7,12 @@ class MiqPglogical PUBLICATION_NAME = 'miq'.freeze ALWAYS_EXCLUDED_TABLES = %w(ar_internal_metadata schema_migrations repl_events repl_monitor repl_nodes).freeze - attr_reader :configured_excludes - def initialize @pg_connection = ApplicationRecord.connection.raw_connection - self.configured_excludes = provider? ? active_excludes : self.class.default_excludes end delegate :subscriber?, :to => :pglogical - # Sets the tables that should be used to create the publication using refresh_excludes - def configured_excludes=(new_excludes) - @configured_excludes = new_excludes | ALWAYS_EXCLUDED_TABLES - end - - # Returns the excluded tables that are currently being used - # @return Array the table list - def active_excludes - return [] unless provider? - ApplicationRecord.connection.tables - included_tables - end - def provider? pglogical.publishes?(PUBLICATION_NAME) end @@ -54,23 +39,9 @@ def create_replication_set refresh_excludes end - def self.refresh_excludes_queue(new_excludes) - MiqQueue.put( - :class_name => "MiqPglogical", - :method_name => "refresh_excludes", - :args => [new_excludes] - ) - end - - def self.refresh_excludes(new_excludes) - pgl = new - pgl.configured_excludes = new_excludes - pgl.refresh_excludes - end - # Aligns the contents of the 'miq' publication with the currently configured excludes def refresh_excludes - tables = ApplicationRecord.connection.tables - configured_excludes + tables = ApplicationRecord.connection.tables - default_excludes pglogical.set_publication_tables(PUBLICATION_NAME, tables) end @@ -82,15 +53,12 @@ def replication_wal_retained pglogical.wal_retained_bytes end - def self.default_excludes - YAML.load_file(Rails.root.join("config/default_replication_exclude_tables.yml"))[:exclude_tables] | ALWAYS_EXCLUDED_TABLES + def default_excludes + self.class.default_excludes end - def self.save_remote_region(exclusion_list) - # part of `MiqRegion.replication_type=` is initialization of default subscription list - MiqRegion.replication_type = :remote - # UI is passing empty 'exclution_list' if there are no changes to default subscription list - refresh_excludes(YAML.safe_load(exclusion_list)) unless exclusion_list.empty? + def self.default_excludes + YAML.load_file(Rails.root.join("config/default_replication_exclude_tables.yml"))[:exclude_tables] | ALWAYS_EXCLUDED_TABLES end def self.save_global_region(subscriptions_to_save, subscriptions_to_remove) diff --git a/spec/lib/miq_pglogical_spec.rb b/spec/lib/miq_pglogical_spec.rb index f5eaec6e798..763c669cb86 100644 --- a/spec/lib/miq_pglogical_spec.rb +++ b/spec/lib/miq_pglogical_spec.rb @@ -6,12 +6,6 @@ EvmSpecHelper.local_miq_server end - describe "#active_excludes" do - it "returns an empty array if a provider is not configured" do - expect(subject.active_excludes).to eq([]) - end - end - describe "#subscriber?" do it "is false when a subscription is not configured" do expect(subject.subscriber?).to be false @@ -36,12 +30,6 @@ subject.configure_provider end - describe "#active_excludes" do - it "returns the initial set of excluded tables" do - expect(subject.active_excludes).to eq(ar_connection.tables - subject.included_tables) - end - end - describe "#provider?" do it "is true" do expect(subject.provider?).to be true @@ -57,23 +45,9 @@ describe "#create_replication_set" do it "creates the correct initial set" do - expected_excludes = subject.configured_excludes - extra_excludes = subject.configured_excludes - ar_connection.tables + expected_excludes = subject.default_excludes actual_excludes = ar_connection.tables - subject.included_tables - expect(actual_excludes | extra_excludes).to match_array(expected_excludes) - end - end - - describe ".refresh_excludes" do - it "sets the configured excludes and calls refresh on an instance" do - pgl = described_class.new - expect(described_class).to receive(:new).and_return(pgl) - expect(pgl).to receive(:refresh_excludes) - - new_excludes = %w(my new exclude tables) - described_class.refresh_excludes(new_excludes) - - expect(pgl.configured_excludes).to match_array(new_excludes | described_class::ALWAYS_EXCLUDED_TABLES) + expect(actual_excludes | MiqPglogical::ALWAYS_EXCLUDED_TABLES).to match_array(expected_excludes) end end @@ -85,61 +59,12 @@ subject.refresh_excludes expect(subject.included_tables).to include("test") end - - it "removes a newly excluded table" do - table = subject.included_tables.first - subject.configured_excludes += [table] - - expect(subject.active_excludes).not_to include(table) - expect(subject.included_tables).to include(table) - - subject.refresh_excludes - - expect(subject.active_excludes).to include(table) - expect(subject.included_tables).not_to include(table) - end - - it "adds a newly included table" do - current_excludes = subject.configured_excludes - table = current_excludes.pop - subject.configured_excludes = current_excludes - - expect(subject.active_excludes).to include(table) - expect(subject.included_tables).not_to include(table) - - subject.refresh_excludes - - expect(subject.active_excludes).not_to include(table) - expect(subject.included_tables).to include(table) - end - end - end - - describe ".save_remote_region" do - it "sets replication type for this region to 'remote'" do - allow(described_class).to receive(:refresh_excludes) - expect(MiqRegion).to receive(:replication_type=).with(:remote) - described_class.save_remote_region("") - end - - it "updates list of tables to be excluded from replication" do - tables = "---\n- vmdb_databases\n- vmdb_indexes\n- vmdb_metrics\n- vmdb_tables\n" - allow(MiqRegion).to receive(:replication_type=) - expect(described_class).to receive(:refresh_excludes).with(YAML.safe_load(tables)) - described_class.save_remote_region(tables) - end - - it "does not updates list of tables to be excluded from replication if passed parameter is empty" do - allow(MiqRegion).to receive(:replication_type=) - expect(described_class).not_to receive(:refresh_excludes) - described_class.save_remote_region("") end end describe ".save_global_region" do let(:subscription) { double } it "sets replication type for this region to 'global'" do - allow(described_class).to receive(:refresh_excludes) expect(MiqRegion).to receive(:replication_type=).with(:global) described_class.save_global_region([], []) end From 157b452baca5097348fdf0e287cfb46025e64013 Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Tue, 7 May 2019 14:40:25 -0400 Subject: [PATCH 19/23] Don't use "default" for excludes now that we can't change them --- ...ude_tables.yml => replication_exclude_tables.yml} | 0 lib/miq_pglogical.rb | 12 ++++++------ spec/lib/miq_pglogical_spec.rb | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) rename config/{default_replication_exclude_tables.yml => replication_exclude_tables.yml} (100%) diff --git a/config/default_replication_exclude_tables.yml b/config/replication_exclude_tables.yml similarity index 100% rename from config/default_replication_exclude_tables.yml rename to config/replication_exclude_tables.yml diff --git a/lib/miq_pglogical.rb b/lib/miq_pglogical.rb index c16a5589f30..bb1ba9b0de2 100644 --- a/lib/miq_pglogical.rb +++ b/lib/miq_pglogical.rb @@ -39,9 +39,9 @@ def create_replication_set refresh_excludes end - # Aligns the contents of the 'miq' publication with the currently configured excludes + # Aligns the contents of the 'miq' publication with the excludes file def refresh_excludes - tables = ApplicationRecord.connection.tables - default_excludes + tables = ApplicationRecord.connection.tables - excludes pglogical.set_publication_tables(PUBLICATION_NAME, tables) end @@ -53,12 +53,12 @@ def replication_wal_retained pglogical.wal_retained_bytes end - def default_excludes - self.class.default_excludes + def excludes + self.class.excludes end - def self.default_excludes - YAML.load_file(Rails.root.join("config/default_replication_exclude_tables.yml"))[:exclude_tables] | ALWAYS_EXCLUDED_TABLES + def self.excludes + YAML.load_file(Rails.root.join("config", "replication_exclude_tables.yml"))[:exclude_tables] | ALWAYS_EXCLUDED_TABLES end def self.save_global_region(subscriptions_to_save, subscriptions_to_remove) diff --git a/spec/lib/miq_pglogical_spec.rb b/spec/lib/miq_pglogical_spec.rb index 763c669cb86..5c789d3bfb8 100644 --- a/spec/lib/miq_pglogical_spec.rb +++ b/spec/lib/miq_pglogical_spec.rb @@ -45,7 +45,7 @@ describe "#create_replication_set" do it "creates the correct initial set" do - expected_excludes = subject.default_excludes + expected_excludes = subject.excludes actual_excludes = ar_connection.tables - subject.included_tables expect(actual_excludes | MiqPglogical::ALWAYS_EXCLUDED_TABLES).to match_array(expected_excludes) end From 1d02c051f3ed498b16b8c5a52d007986698b15ad Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Wed, 8 May 2019 17:57:22 -0400 Subject: [PATCH 20/23] Use released pg-logical_replication --- Gemfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Gemfile b/Gemfile index 2b83cb1d4b5..45467c5e89b 100644 --- a/Gemfile +++ b/Gemfile @@ -162,7 +162,7 @@ group :automate, :seed, :manageiq_default do end group :replication, :manageiq_default do - gem "pg-logical_replication", "~>0.1", :git => "https://github.com/carbonin/pg-logical_replication.git", :require => false, :branch => "add_subscription_worker_count" + gem "pg-logical_replication", "~>1.0", :require => false end group :rest_api, :manageiq_default do From ef127801675f81f5838fa6648349c29a4dab9b3c Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Thu, 9 May 2019 10:11:02 -0400 Subject: [PATCH 21/23] Use version 3.1.0 of manageiq-postgres_ha_admin --- Gemfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Gemfile b/Gemfile index 45467c5e89b..efe3e181af9 100644 --- a/Gemfile +++ b/Gemfile @@ -50,7 +50,7 @@ gem "log_decorator", "~>0.1", :require => false gem "manageiq-api-client", "~>0.3.3", :require => false gem "manageiq-messaging", "~>0.1.4", :require => false gem "manageiq-password", "~>0.3", :require => false -gem "manageiq-postgres_ha_admin", "~>3.0", :git => "https://github.com/carbonin/manageiq-postgres_ha_admin.git", :branch => "add_logical_replication_config_handler", :require => false +gem "manageiq-postgres_ha_admin", "~>3.1", :require => false gem "memoist", "~>0.15.0", :require => false gem "mime-types", "~>3.0", :path => File.expand_path("mime-types-redirector", __dir__) gem "more_core_extensions", "~>3.7" From 336bcb16784221cf79036b06934ed2db6882677f Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Mon, 13 May 2019 14:37:52 -0400 Subject: [PATCH 22/23] Add a transaction around the multi-step error-case subscription delete This is the situation where the remote server is unreachable so we need to set the replication slot to "NONE" in order to drop the subscription without error. But we don't want to end up in an in-between case where the subscription is disabled, but not deleted so wrap the three commands in a transaction. --- app/models/pglogical_subscription.rb | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/app/models/pglogical_subscription.rb b/app/models/pglogical_subscription.rb index 99f46079001..fd73638c79c 100644 --- a/app/models/pglogical_subscription.rb +++ b/app/models/pglogical_subscription.rb @@ -194,9 +194,11 @@ def safe_delete pglogical.drop_subscription(id, true) rescue PG::InternalError => e raise unless e.message =~ /could not connect to publisher/ || e.message =~ /replication slot .* does not exist/ - disable - pglogical.alter_subscription_options(id, "slot_name" => "NONE") - pglogical.drop_subscription(id, true) + connection.transaction do + disable + pglogical.alter_subscription_options(id, "slot_name" => "NONE") + pglogical.drop_subscription(id, true) + end end def remote_region_number From eed3a504bc9ef091718d62cf6bf60cbef001967e Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Mon, 13 May 2019 18:18:10 -0400 Subject: [PATCH 23/23] Rename PglogicalSubscription#subscription_status to subscription_attributes We have a .subscription_status method that actually returns the status of a subscription based on the number of workers and the enabled attribute. The instance method is actually returning a new copy of the current subscription's attirubtes for fetching things like the current wal location or the password so we can more accurately name it subscription_attributes --- app/models/pglogical_subscription.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/app/models/pglogical_subscription.rb b/app/models/pglogical_subscription.rb index fd73638c79c..525285d6b6b 100644 --- a/app/models/pglogical_subscription.rb +++ b/app/models/pglogical_subscription.rb @@ -100,7 +100,7 @@ def validate(new_connection_params = {}) end def backlog - connection.xlog_location_diff(remote_region_lsn, subscription_status["remote_replication_lsn"]) + connection.xlog_location_diff(remote_region_lsn, subscription_attributes["remote_replication_lsn"]) rescue PG::Error => e _log.error(e.message) nil @@ -220,7 +220,7 @@ def update_subscription # sets this instance's password field to the one in the subscription dsn in the database def find_password return password if password.present? - s = subscription_status.symbolize_keys + s = subscription_attributes.symbolize_keys dsn_hash = PG::DSNParser.parse(s.delete(:subscription_dsn)) self.password = dsn_hash[:password] end @@ -262,7 +262,7 @@ def with_remote_connection end end - def subscription_status + def subscription_attributes pglogical.subscriptions.find { |s| s["subscription_name"] == id } end end