diff --git a/include/dsn/dist/replication/replication.codes.h b/include/dsn/dist/replication/replication.codes.h index a6bd95602d..925e2f9166 100644 --- a/include/dsn/dist/replication/replication.codes.h +++ b/include/dsn/dist/replication/replication.codes.h @@ -35,10 +35,11 @@ DEFINE_THREAD_POOL_CODE(THREAD_POOL_LOCAL_APP) DEFINE_THREAD_POOL_CODE(THREAD_POOL_REPLICATION_LONG) DEFINE_THREAD_POOL_CODE(THREAD_POOL_COMPACT) -#define DEFINE_STORAGE_WRITE_RPC_CODE(x, allow_batch) \ - DEFINE_STORAGE_RPC_CODE(x, TASK_PRIORITY_LOW, THREAD_POOL_REPLICATION, true, allow_batch) +#define DEFINE_STORAGE_WRITE_RPC_CODE(x, allow_batch, is_idempotent) \ + DEFINE_STORAGE_RPC_CODE( \ + x, TASK_PRIORITY_LOW, THREAD_POOL_REPLICATION, true, allow_batch, is_idempotent) #define DEFINE_STORAGE_READ_RPC_CODE(x) \ - DEFINE_STORAGE_RPC_CODE(x, TASK_PRIORITY_COMMON, THREAD_POOL_LOCAL_APP, false, true) + DEFINE_STORAGE_RPC_CODE(x, TASK_PRIORITY_COMMON, THREAD_POOL_LOCAL_APP, false, true, true) #define MAKE_EVENT_CODE(x, pri) DEFINE_TASK_CODE(x, pri, CURRENT_THREAD_POOL) #define MAKE_EVENT_CODE_AIO(x, pri) DEFINE_TASK_CODE_AIO(x, pri, CURRENT_THREAD_POOL) diff --git a/include/dsn/tool-api/task_code.h b/include/dsn/tool-api/task_code.h index 5c672f15c9..15738c9519 100644 --- a/include/dsn/tool-api/task_code.h +++ b/include/dsn/tool-api/task_code.h @@ -102,7 +102,8 @@ class task_code dsn_task_priority_t pri, dsn::threadpool_code pool, bool is_storage_write, - bool allow_batch); + bool allow_batch, + bool is_idempotent); const char *to_string() const; @@ -153,7 +154,9 @@ class task_code // 2. for a write rpc, a primary may also need to replicate it // to secondaries before forwarding to the storage engine. // 3. some storage engine's rpc shouldn't be batched, -// either for better performance or correctness +// either for better performance or correctness. +// 4. some write rpc is idempotent, but some is not. +// we should differentiate it. // so we define some specical fields in task_spec to mark these features. // // please refer to rpc_engine::on_recv_request for the detailes on how storage_engine's rpc @@ -162,11 +165,21 @@ class task_code // Notice we dispatch storage rpc's response to THREAD_POOL_DEFAULT, // the reason is that the storage rpc's response mainly runs at client side, which is not // necessary to start so many threadpools -#define DEFINE_STORAGE_RPC_CODE(x, pri, pool, is_write, allow_batch) \ +#define DEFINE_STORAGE_RPC_CODE(x, pri, pool, is_write, allow_batch, is_idempotent) \ __selectany const ::dsn::task_code x( \ - #x, TASK_TYPE_RPC_REQUEST, pri, pool, is_write, allow_batch); \ - __selectany const ::dsn::task_code x##_ACK( \ - #x "_ACK", TASK_TYPE_RPC_RESPONSE, pri, THREAD_POOL_DEFAULT, is_write, allow_batch); + #x, TASK_TYPE_RPC_REQUEST, pri, pool, is_write, allow_batch, is_idempotent); \ + __selectany const ::dsn::task_code x##_ACK(#x "_ACK", \ + TASK_TYPE_RPC_RESPONSE, \ + pri, \ + THREAD_POOL_DEFAULT, \ + is_write, \ + allow_batch, \ + is_idempotent); + +#define ALLOW_BATCH true +#define NOT_ALLOW_BATCH false +#define IS_IDEMPOTENT true +#define NOT_IDEMPOTENT false // define a default task code "task_code_invalid", it's mainly used for representing // some error status when you want to return task_code in some functions. diff --git a/include/dsn/tool-api/task_spec.h b/include/dsn/tool-api/task_spec.h index 30f9ae2abf..c27e9a5603 100644 --- a/include/dsn/tool-api/task_spec.h +++ b/include/dsn/tool-api/task_spec.h @@ -163,7 +163,8 @@ class task_spec : public extensible_object dsn_task_priority_t pri, dsn::threadpool_code pool, bool is_write_operation, - bool allow_batch); + bool allow_batch, + bool is_idempotent); public: // not configurable [ @@ -176,6 +177,7 @@ class task_spec : public extensible_object bool rpc_request_for_storage; bool rpc_request_is_write_operation; // need stateful replication bool rpc_request_is_write_allow_batch; // if write allow batch + bool rpc_request_is_write_idempotent; // if write operation is idempotent // ] // configurable [ diff --git a/src/apps/skv/simple_kv.code.definition.h b/src/apps/skv/simple_kv.code.definition.h index 02ffd8034b..41b624c8a4 100644 --- a/src/apps/skv/simple_kv.code.definition.h +++ b/src/apps/skv/simple_kv.code.definition.h @@ -41,8 +41,8 @@ namespace replication { namespace application { DEFINE_STORAGE_READ_RPC_CODE(RPC_SIMPLE_KV_SIMPLE_KV_READ) -DEFINE_STORAGE_WRITE_RPC_CODE(RPC_SIMPLE_KV_SIMPLE_KV_WRITE, true) -DEFINE_STORAGE_WRITE_RPC_CODE(RPC_SIMPLE_KV_SIMPLE_KV_APPEND, true) +DEFINE_STORAGE_WRITE_RPC_CODE(RPC_SIMPLE_KV_SIMPLE_KV_WRITE, ALLOW_BATCH, IS_IDEMPOTENT) +DEFINE_STORAGE_WRITE_RPC_CODE(RPC_SIMPLE_KV_SIMPLE_KV_APPEND, ALLOW_BATCH, NOT_IDEMPOTENT) // test timer task code DEFINE_TASK_CODE(LPC_SIMPLE_KV_TEST_TIMER, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT) diff --git a/src/core/core/partition_resolver_simple.cpp b/src/core/core/partition_resolver_simple.cpp index 362f2251ca..75cf272e21 100644 --- a/src/core/core/partition_resolver_simple.cpp +++ b/src/core/core/partition_resolver_simple.cpp @@ -83,6 +83,8 @@ void partition_resolver_simple::on_access_failure(int partition_index, error_cod && err != ERR_NOT_ENOUGH_MEMBER // primary won't change and we only r/w on primary in this // provider + && + err != ERR_OPERATION_DISABLED // operation disabled ) { ddebug("clear partition configuration cache %d.%d due to access failure %s", _app_id, @@ -397,9 +399,8 @@ void partition_resolver_simple::handle_pending_requests(std::dequeheader->gpid.value() != 0 && err != ERR_OK && - err != ERR_HANDLER_NOT_FOUND && err != ERR_APP_NOT_EXIST) { + err != ERR_HANDLER_NOT_FOUND && err != ERR_APP_NOT_EXIST && + err != ERR_OPERATION_DISABLED) { auto resolver = req2->server_address.uri_address()->get_resolver(); if (nullptr != resolver) { resolver->on_access_failure(req2->header->gpid.get_partition_index(), err); diff --git a/src/core/core/task_code.cpp b/src/core/core/task_code.cpp index 3adaa41d4b..2624e0dfb5 100644 --- a/src/core/core/task_code.cpp +++ b/src/core/core/task_code.cpp @@ -48,10 +48,12 @@ task_code::task_code(const char *name, dsn_task_priority_t pri, dsn::threadpool_code pool, bool is_storage_write, - bool allow_batch) + bool allow_batch, + bool is_idempotent) : task_code(name) { - task_spec::register_storage_task_code(*this, tt, pri, pool, is_storage_write, allow_batch); + task_spec::register_storage_task_code( + *this, tt, pri, pool, is_storage_write, allow_batch, is_idempotent); } const char *task_code::to_string() const diff --git a/src/core/core/task_spec.cpp b/src/core/core/task_spec.cpp index 78008512fd..12755014cc 100644 --- a/src/core/core/task_spec.cpp +++ b/src/core/core/task_spec.cpp @@ -100,13 +100,15 @@ void task_spec::register_storage_task_code(task_code code, dsn_task_priority_t pri, threadpool_code pool, bool is_write_operation, - bool allow_batch) + bool allow_batch, + bool is_idempotent) { register_task_code(code, type, pri, pool); task_spec *spec = task_spec::get(code); spec->rpc_request_for_storage = true; spec->rpc_request_is_write_operation = is_write_operation; spec->rpc_request_is_write_allow_batch = allow_batch; + spec->rpc_request_is_write_idempotent = is_idempotent; } task_spec *task_spec::get(int code) @@ -126,6 +128,7 @@ task_spec::task_spec(int code, rpc_request_for_storage(false), rpc_request_is_write_operation(false), rpc_request_is_write_allow_batch(false), + rpc_request_is_write_idempotent(false), priority(pri), pool_code(pool), rpc_call_header_format(NET_HDR_DSN), diff --git a/src/dist/replication/client_lib/replication_common.cpp b/src/dist/replication/client_lib/replication_common.cpp index 8fdf302eb7..58632c2f2c 100644 --- a/src/dist/replication/client_lib/replication_common.cpp +++ b/src/dist/replication/client_lib/replication_common.cpp @@ -47,6 +47,7 @@ replication_options::replication_options() verbose_commit_log_on_start = false; delay_for_fd_timeout_on_start = false; empty_write_disabled = false; + allow_non_idempotent_write = false; prepare_timeout_ms_for_secondaries = 1000; prepare_timeout_ms_for_potential_secondaries = 3000; @@ -256,6 +257,11 @@ void replication_options::initialize() "empty_write_disabled", empty_write_disabled, "whether to disable empty write, default is false"); + allow_non_idempotent_write = + dsn_config_get_value_bool("replication", + "allow_non_idempotent_write", + allow_non_idempotent_write, + "whether to allow non-idempotent write, default is false"); prepare_timeout_ms_for_secondaries = (int)dsn_config_get_value_uint64( "replication", diff --git a/src/dist/replication/client_lib/replication_common.h b/src/dist/replication/client_lib/replication_common.h index d32a897cd5..cdba0dbd09 100644 --- a/src/dist/replication/client_lib/replication_common.h +++ b/src/dist/replication/client_lib/replication_common.h @@ -63,6 +63,7 @@ class replication_options bool verbose_commit_log_on_start; bool delay_for_fd_timeout_on_start; bool empty_write_disabled; + bool allow_non_idempotent_write; int32_t prepare_timeout_ms_for_secondaries; int32_t prepare_timeout_ms_for_potential_secondaries; diff --git a/src/dist/replication/lib/replica_2pc.cpp b/src/dist/replication/lib/replica_2pc.cpp index 58a969746e..01eeeae490 100644 --- a/src/dist/replication/lib/replica_2pc.cpp +++ b/src/dist/replication/lib/replica_2pc.cpp @@ -46,6 +46,12 @@ void replica::on_client_write(task_code code, dsn_message_t request) { check_hashed_access(); + task_spec *spec = task_spec::get(code); + if (!_options->allow_non_idempotent_write && !spec->rpc_request_is_write_idempotent) { + response_client_message(false, request, ERR_OPERATION_DISABLED); + return; + } + if (partition_status::PS_PRIMARY != status()) { response_client_message(false, request, ERR_INVALID_STATE); return; diff --git a/src/dist/replication/test/simple_kv/simple_kv.code.definition.h b/src/dist/replication/test/simple_kv/simple_kv.code.definition.h index 4647c2ac7d..3885c60a21 100644 --- a/src/dist/replication/test/simple_kv/simple_kv.code.definition.h +++ b/src/dist/replication/test/simple_kv/simple_kv.code.definition.h @@ -31,8 +31,8 @@ namespace replication { namespace test { DEFINE_STORAGE_READ_RPC_CODE(RPC_SIMPLE_KV_SIMPLE_KV_READ) -DEFINE_STORAGE_WRITE_RPC_CODE(RPC_SIMPLE_KV_SIMPLE_KV_WRITE, true) -DEFINE_STORAGE_WRITE_RPC_CODE(RPC_SIMPLE_KV_SIMPLE_KV_APPEND, true) +DEFINE_STORAGE_WRITE_RPC_CODE(RPC_SIMPLE_KV_SIMPLE_KV_WRITE, ALLOW_BATCH, IS_IDEMPOTENT) +DEFINE_STORAGE_WRITE_RPC_CODE(RPC_SIMPLE_KV_SIMPLE_KV_APPEND, ALLOW_BATCH, NOT_IDEMPOTENT) // test timer task code DEFINE_TASK_CODE(LPC_SIMPLE_KV_TEST_TIMER, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT)