diff --git a/src/dist/replication/common/replication_common.cpp b/src/dist/replication/common/replication_common.cpp index 3f2e4c8bca..d681ca6fdc 100644 --- a/src/dist/replication/common/replication_common.cpp +++ b/src/dist/replication/common/replication_common.cpp @@ -48,7 +48,6 @@ replication_options::replication_options() verbose_commit_log_on_start = false; delay_for_fd_timeout_on_start = false; empty_write_disabled = false; - allow_non_idempotent_write = false; duplication_enabled = true; prepare_timeout_ms_for_secondaries = 1000; @@ -265,11 +264,6 @@ void replication_options::initialize() "empty_write_disabled", empty_write_disabled, "whether to disable empty write, default is false"); - allow_non_idempotent_write = - dsn_config_get_value_bool("replication", - "allow_non_idempotent_write", - allow_non_idempotent_write, - "whether to allow non-idempotent write, default is false"); duplication_enabled = dsn_config_get_value_bool( "replication", "duplication_enabled", duplication_enabled, "is duplication enabled"); diff --git a/src/dist/replication/common/replication_common.h b/src/dist/replication/common/replication_common.h index 5107484b8b..d29f5419ca 100644 --- a/src/dist/replication/common/replication_common.h +++ b/src/dist/replication/common/replication_common.h @@ -51,7 +51,6 @@ class replication_options bool verbose_commit_log_on_start; bool delay_for_fd_timeout_on_start; bool empty_write_disabled; - bool allow_non_idempotent_write; bool duplication_enabled; int32_t prepare_timeout_ms_for_secondaries; diff --git a/src/dist/replication/lib/duplication/mutation_batch.cpp b/src/dist/replication/lib/duplication/mutation_batch.cpp index 5ed4dbc512..980db26afe 100644 --- a/src/dist/replication/lib/duplication/mutation_batch.cpp +++ b/src/dist/replication/lib/duplication/mutation_batch.cpp @@ -103,7 +103,14 @@ add_mutation_if_valid(mutation_ptr &mu, mutation_tuple_set &mutations, decree st if (update.code == RPC_REPLICATION_WRITE_EMPTY) { continue; } - + // Ignore non-idempotent writes. + // Normally a duplicating replica will reply non-idempotent writes with + // ERR_OPERATION_DISABLED, but there could still be a mutation written + // before the duplication was added. + // To ignore means this write will be lost, which is acceptable under this rare case. + if (!task_spec::get(update.code)->rpc_request_is_write_idempotent) { + continue; + } blob bb; if (update.data.buffer() != nullptr) { bb = std::move(update.data); diff --git a/src/dist/replication/lib/duplication/test/duplication_test_base.h b/src/dist/replication/lib/duplication/test/duplication_test_base.h index 18237ebe75..1c5ac367df 100644 --- a/src/dist/replication/lib/duplication/test/duplication_test_base.h +++ b/src/dist/replication/lib/duplication/test/duplication_test_base.h @@ -13,6 +13,9 @@ namespace dsn { namespace replication { +DEFINE_STORAGE_WRITE_RPC_CODE(RPC_DUPLICATION_IDEMPOTENT_WRITE, NOT_ALLOW_BATCH, IS_IDEMPOTENT) +DEFINE_STORAGE_WRITE_RPC_CODE(RPC_DUPLICATION_NON_IDEMPOTENT_WRITE, NOT_ALLOW_BATCH, NOT_IDEMPOTENT) + class duplication_test_base : public replica_test_base { public: @@ -55,6 +58,13 @@ class duplication_test_base : public replica_test_base EXPECT_EQ(err, error_s::ok()); return log_file_map; } + + mutation_ptr create_test_mutation(int64_t decree, string_view data) override + { + auto mut = replica_test_base::create_test_mutation(decree, data); + mut->data.updates[0].code = RPC_DUPLICATION_IDEMPOTENT_WRITE; // must be idempotent write + return mut; + } }; } // namespace replication diff --git a/src/dist/replication/lib/duplication/test/load_from_private_log_test.cpp b/src/dist/replication/lib/duplication/test/load_from_private_log_test.cpp index 293e69a13e..f21a79725a 100644 --- a/src/dist/replication/lib/duplication/test/load_from_private_log_test.cpp +++ b/src/dist/replication/lib/duplication/test/load_from_private_log_test.cpp @@ -17,7 +17,7 @@ namespace dsn { namespace replication { -DEFINE_TASK_CODE_RPC(RPC_RRDB_RRDB_PUT, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT); +DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_PUT, ALLOW_BATCH, IS_IDEMPOTENT) class load_from_private_log_test : public duplication_test_base { diff --git a/src/dist/replication/lib/duplication/test/mutation_batch_test.cpp b/src/dist/replication/lib/duplication/test/mutation_batch_test.cpp index ab6b6be114..f3dc72c891 100644 --- a/src/dist/replication/lib/duplication/test/mutation_batch_test.cpp +++ b/src/dist/replication/lib/duplication/test/mutation_batch_test.cpp @@ -24,14 +24,15 @@ * THE SOFTWARE. */ -#include "dist/replication/test/replica_test/unit_test/replica_test_base.h" +#include "duplication_test_base.h" #include "dist/replication/lib/duplication/mutation_batch.h" namespace dsn { namespace replication { -class mutation_batch_test : public replica_test_base +class mutation_batch_test : public duplication_test_base { +public: }; TEST_F(mutation_batch_test, add_mutation_if_valid) @@ -59,5 +60,16 @@ TEST_F(mutation_batch_test, add_mutation_if_valid) ASSERT_EQ(result.size(), 2); } +TEST_F(mutation_batch_test, ignore_non_idempotent_write) +{ + mutation_tuple_set result; + + std::string s = "hello"; + mutation_ptr mu = create_test_mutation(1, s); + mu->data.updates[0].code = RPC_DUPLICATION_NON_IDEMPOTENT_WRITE; + add_mutation_if_valid(mu, result, 0); + ASSERT_EQ(result.size(), 0); +} + } // namespace replication } // namespace dsn diff --git a/src/dist/replication/lib/replica.cpp b/src/dist/replication/lib/replica.cpp index 4c3cc4c995..4cf4a0c119 100644 --- a/src/dist/replication/lib/replica.cpp +++ b/src/dist/replication/lib/replica.cpp @@ -56,7 +56,8 @@ replica::replica( _cur_download_size(0), _restore_progress(0), _restore_status(ERR_OK), - _duplication_mgr(new replica_duplicator_manager(this)) + _duplication_mgr(new replica_duplicator_manager(this)), + _duplicating(app.duplicating) { dassert(_app_info.app_type != "", ""); dassert(stub != nullptr, ""); @@ -79,6 +80,10 @@ replica::replica( _counter_recent_write_throttling_reject_count.init_app_counter( "eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str()); + counter_str = fmt::format("dup.disabled_non_idempotent_write_count@{}", _app_info.app_name); + _counter_dup_disabled_non_idempotent_write_count.init_app_counter( + "eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str()); + // init table level latency perf counters init_table_level_latency_counters(); diff --git a/src/dist/replication/lib/replica.h b/src/dist/replication/lib/replica.h index f33945be80..dc423db979 100644 --- a/src/dist/replication/lib/replica.h +++ b/src/dist/replication/lib/replica.h @@ -174,7 +174,7 @@ class replica : public serverlet, public ref_counter, public replica_ba // Duplication // replica_duplicator_manager *get_duplication_manager() const { return _duplication_mgr.get(); } - bool is_duplicating() const { return _app_info.duplicating; } + bool is_duplicating() const { return _duplicating; } void update_last_checkpoint_generate_time(); @@ -495,6 +495,7 @@ class replica : public serverlet, public ref_counter, public replica_ba // duplication std::unique_ptr _duplication_mgr; + bool _duplicating{false}; // partition split // _child_gpid = gpid({app_id},{pidx}+{old_partition_count}) for parent partition @@ -512,6 +513,7 @@ class replica : public serverlet, public ref_counter, public replica_ba perf_counter_wrapper _counter_recent_write_throttling_delay_count; perf_counter_wrapper _counter_recent_write_throttling_reject_count; std::vector _counters_table_level_latency; + perf_counter_wrapper _counter_dup_disabled_non_idempotent_write_count; perf_counter_wrapper _counter_backup_request_qps; dsn::task_tracker _tracker; diff --git a/src/dist/replication/lib/replica_2pc.cpp b/src/dist/replication/lib/replica_2pc.cpp index 67b8020ffe..f9cf2ce4df 100644 --- a/src/dist/replication/lib/replica_2pc.cpp +++ b/src/dist/replication/lib/replica_2pc.cpp @@ -58,7 +58,10 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) } task_spec *spec = task_spec::get(request->rpc_code()); - if (!_options->allow_non_idempotent_write && !spec->rpc_request_is_write_idempotent) { + if (is_duplicating() && !spec->rpc_request_is_write_idempotent) { + // Ignore non-idempotent write, because duplication provides no guarantee of atomicity to + // make this write produce the same result on multiple clusters. + _counter_dup_disabled_non_idempotent_write_count->increment(); response_client_write(request, ERR_OPERATION_DISABLED); return; } diff --git a/src/dist/replication/lib/replica_config.cpp b/src/dist/replication/lib/replica_config.cpp index 4ed0f11063..607eccd5d9 100644 --- a/src/dist/replication/lib/replica_config.cpp +++ b/src/dist/replication/lib/replica_config.cpp @@ -976,6 +976,7 @@ void replica::on_config_sync(const app_info &info, const partition_configuration return; update_app_envs(info.envs); + _duplicating = info.duplicating; if (status() == partition_status::PS_PRIMARY || nullptr != _primary_states.reconfiguration_task) { diff --git a/src/dist/replication/test/replica_test/unit_test/replica_test_base.h b/src/dist/replication/test/replica_test/unit_test/replica_test_base.h index 773c23b8d1..0926afe992 100644 --- a/src/dist/replication/test/replica_test/unit_test/replica_test_base.h +++ b/src/dist/replication/test/replica_test/unit_test/replica_test_base.h @@ -55,7 +55,7 @@ struct replica_test_base : replica_stub_test_base replica_test_base() { _replica = create_mock_replica(stub.get(), 1, 1, _log_dir.c_str()); } - mutation_ptr create_test_mutation(int64_t decree, string_view data) + virtual mutation_ptr create_test_mutation(int64_t decree, string_view data) { mutation_ptr mu(new mutation()); mu->data.header.ballot = 1;