diff --git a/src/base/pegasus_key_schema.h b/src/base/pegasus_key_schema.h index 0b6183d33f..4f52b962ec 100644 --- a/src/base/pegasus_key_schema.h +++ b/src/base/pegasus_key_schema.h @@ -150,4 +150,10 @@ inline uint64_t pegasus_key_hash(const ::dsn::blob &key) } } +/// Calculate hash value from hash key. +inline uint64_t pegasus_hash_key_hash(const ::dsn::blob &hash_key) +{ + return dsn::utils::crc64_calc(hash_key.data(), hash_key.length(), 0); +} + } // namespace pegasus diff --git a/src/base/pegasus_rpc_types.h b/src/base/pegasus_rpc_types.h index 3d7373413d..ac1eff49cd 100644 --- a/src/base/pegasus_rpc_types.h +++ b/src/base/pegasus_rpc_types.h @@ -22,6 +22,8 @@ using incr_rpc = dsn::rpc_holder; +using duplicate_rpc = dsn::apps::duplicate_rpc; + using check_and_mutate_rpc = dsn::rpc_holder; diff --git a/src/base/rrdb_types.cpp b/src/base/rrdb_types.cpp index 441b1f9d9d..ef9e9a08c9 100644 --- a/src/base/rrdb_types.cpp +++ b/src/base/rrdb_types.cpp @@ -4254,5 +4254,288 @@ void scan_response::printTo(std::ostream &out) const << "server=" << to_string(server); out << ")"; } + +duplicate_request::~duplicate_request() throw() {} + +void duplicate_request::__set_timestamp(const int64_t val) +{ + this->timestamp = val; + __isset.timestamp = true; +} + +void duplicate_request::__set_task_code(const ::dsn::task_code &val) +{ + this->task_code = val; + __isset.task_code = true; +} + +void duplicate_request::__set_raw_message(const ::dsn::blob &val) +{ + this->raw_message = val; + __isset.raw_message = true; +} + +void duplicate_request::__set_cluster_id(const int8_t val) +{ + this->cluster_id = val; + __isset.cluster_id = true; +} + +uint32_t duplicate_request::read(::apache::thrift::protocol::TProtocol *iprot) +{ + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + while (true) { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) { + case 1: + if (ftype == ::apache::thrift::protocol::T_I64) { + xfer += iprot->readI64(this->timestamp); + this->__isset.timestamp = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->task_code.read(iprot); + this->__isset.task_code = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 3: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->raw_message.read(iprot); + this->__isset.raw_message = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 4: + if (ftype == ::apache::thrift::protocol::T_BYTE) { + xfer += iprot->readByte(this->cluster_id); + this->__isset.cluster_id = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t duplicate_request::write(::apache::thrift::protocol::TProtocol *oprot) const +{ + uint32_t xfer = 0; + apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("duplicate_request"); + + if (this->__isset.timestamp) { + xfer += oprot->writeFieldBegin("timestamp", ::apache::thrift::protocol::T_I64, 1); + xfer += oprot->writeI64(this->timestamp); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.task_code) { + xfer += oprot->writeFieldBegin("task_code", ::apache::thrift::protocol::T_STRUCT, 2); + xfer += this->task_code.write(oprot); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.raw_message) { + xfer += oprot->writeFieldBegin("raw_message", ::apache::thrift::protocol::T_STRUCT, 3); + xfer += this->raw_message.write(oprot); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.cluster_id) { + xfer += oprot->writeFieldBegin("cluster_id", ::apache::thrift::protocol::T_BYTE, 4); + xfer += oprot->writeByte(this->cluster_id); + xfer += oprot->writeFieldEnd(); + } + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(duplicate_request &a, duplicate_request &b) +{ + using ::std::swap; + swap(a.timestamp, b.timestamp); + swap(a.task_code, b.task_code); + swap(a.raw_message, b.raw_message); + swap(a.cluster_id, b.cluster_id); + swap(a.__isset, b.__isset); +} + +duplicate_request::duplicate_request(const duplicate_request &other126) +{ + timestamp = other126.timestamp; + task_code = other126.task_code; + raw_message = other126.raw_message; + cluster_id = other126.cluster_id; + __isset = other126.__isset; +} +duplicate_request::duplicate_request(duplicate_request &&other127) +{ + timestamp = std::move(other127.timestamp); + task_code = std::move(other127.task_code); + raw_message = std::move(other127.raw_message); + cluster_id = std::move(other127.cluster_id); + __isset = std::move(other127.__isset); +} +duplicate_request &duplicate_request::operator=(const duplicate_request &other128) +{ + timestamp = other128.timestamp; + task_code = other128.task_code; + raw_message = other128.raw_message; + cluster_id = other128.cluster_id; + __isset = other128.__isset; + return *this; +} +duplicate_request &duplicate_request::operator=(duplicate_request &&other129) +{ + timestamp = std::move(other129.timestamp); + task_code = std::move(other129.task_code); + raw_message = std::move(other129.raw_message); + cluster_id = std::move(other129.cluster_id); + __isset = std::move(other129.__isset); + return *this; +} +void duplicate_request::printTo(std::ostream &out) const +{ + using ::apache::thrift::to_string; + out << "duplicate_request("; + out << "timestamp="; + (__isset.timestamp ? (out << to_string(timestamp)) : (out << "")); + out << ", " + << "task_code="; + (__isset.task_code ? (out << to_string(task_code)) : (out << "")); + out << ", " + << "raw_message="; + (__isset.raw_message ? (out << to_string(raw_message)) : (out << "")); + out << ", " + << "cluster_id="; + (__isset.cluster_id ? (out << to_string(cluster_id)) : (out << "")); + out << ")"; +} + +duplicate_response::~duplicate_response() throw() {} + +void duplicate_response::__set_error(const int32_t val) +{ + this->error = val; + __isset.error = true; +} + +uint32_t duplicate_response::read(::apache::thrift::protocol::TProtocol *iprot) +{ + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + while (true) { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) { + case 1: + if (ftype == ::apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->error); + this->__isset.error = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t duplicate_response::write(::apache::thrift::protocol::TProtocol *oprot) const +{ + uint32_t xfer = 0; + apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("duplicate_response"); + + if (this->__isset.error) { + xfer += oprot->writeFieldBegin("error", ::apache::thrift::protocol::T_I32, 1); + xfer += oprot->writeI32(this->error); + xfer += oprot->writeFieldEnd(); + } + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(duplicate_response &a, duplicate_response &b) +{ + using ::std::swap; + swap(a.error, b.error); + swap(a.__isset, b.__isset); +} + +duplicate_response::duplicate_response(const duplicate_response &other130) +{ + error = other130.error; + __isset = other130.__isset; +} +duplicate_response::duplicate_response(duplicate_response &&other131) +{ + error = std::move(other131.error); + __isset = std::move(other131.__isset); +} +duplicate_response &duplicate_response::operator=(const duplicate_response &other132) +{ + error = other132.error; + __isset = other132.__isset; + return *this; +} +duplicate_response &duplicate_response::operator=(duplicate_response &&other133) +{ + error = std::move(other133.error); + __isset = std::move(other133.__isset); + return *this; +} +void duplicate_response::printTo(std::ostream &out) const +{ + using ::apache::thrift::to_string; + out << "duplicate_response("; + out << "error="; + (__isset.error ? (out << to_string(error)) : (out << "")); + out << ")"; +} } } // namespace diff --git a/src/client_lib/pegasus_client_impl.cpp b/src/client_lib/pegasus_client_impl.cpp index fcf7b2a33d..f866a90eb1 100644 --- a/src/client_lib/pegasus_client_impl.cpp +++ b/src/client_lib/pegasus_client_impl.cpp @@ -1244,6 +1244,13 @@ int pegasus_client_impl::get_unordered_scanners(int max_split_count, return ret; } +void pegasus_client_impl::async_duplicate(dsn::apps::duplicate_rpc rpc, + std::function &&callback, + dsn::task_tracker *tracker) +{ + _client->duplicate(rpc, std::move(callback), tracker); +} + const char *pegasus_client_impl::get_error_string(int error_code) const { auto it = _client_error_to_string.find(error_code); diff --git a/src/client_lib/pegasus_client_impl.h b/src/client_lib/pegasus_client_impl.h index b4edd0e1cb..1d11a84bc7 100644 --- a/src/client_lib/pegasus_client_impl.h +++ b/src/client_lib/pegasus_client_impl.h @@ -221,6 +221,13 @@ class pegasus_client_impl : public pegasus_client const scan_options &options, async_get_unordered_scanners_callback_t &&callback) override; + /// \internal + /// This is an internal function for duplication. + /// \see pegasus::server::pegasus_mutation_duplicator + void async_duplicate(dsn::apps::duplicate_rpc rpc, + std::function &&callback, + dsn::task_tracker *tracker); + virtual const char *get_error_string(int error_code) const override; static void init_error(); @@ -279,6 +286,9 @@ class pegasus_client_impl : public pegasus_client static const ::dsn::blob _max; }; + static int get_client_error(int server_error); + static int get_rocksdb_server_error(int rocskdb_error); + private: class pegasus_scanner_impl_wrapper : public abstract_pegasus_scanner { @@ -298,9 +308,6 @@ class pegasus_client_impl : public pegasus_client } }; - static int get_client_error(int server_error); - static int get_rocksdb_server_error(int rocskdb_error); - private: std::string _cluster_name; std::string _app_name; diff --git a/src/idl/dsn.thrift b/src/idl/dsn.thrift index 626e57bca0..aefba84d86 100644 --- a/src/idl/dsn.thrift +++ b/src/idl/dsn.thrift @@ -4,3 +4,7 @@ namespace cpp dsn struct blob { } + +struct task_code +{ +} diff --git a/src/idl/rrdb.thrift b/src/idl/rrdb.thrift index 07f55ba94b..5718c95f9f 100644 --- a/src/idl/rrdb.thrift +++ b/src/idl/rrdb.thrift @@ -251,6 +251,26 @@ struct scan_response 6:string server; } +struct duplicate_request +{ + // The timestamp of this write. + 1: optional i64 timestamp + + // The code to identify this write. + 2: optional dsn.task_code task_code + + // The binary form of the write. + 3: optional dsn.blob raw_message + + // ID of the cluster where this write comes from. + 4: optional byte cluster_id +} + +struct duplicate_response +{ + 1: optional i32 error; +} + service rrdb { update_response put(1:update_request update); diff --git a/src/include/rrdb/rrdb.client.h b/src/include/rrdb/rrdb.client.h index 553817e705..438eefa6cb 100644 --- a/src/include/rrdb/rrdb.client.h +++ b/src/include/rrdb/rrdb.client.h @@ -8,6 +8,9 @@ namespace dsn { namespace apps { + +typedef rpc_holder duplicate_rpc; + class rrdb_client { public: @@ -405,6 +408,15 @@ class rrdb_client partition_hash); } + // ---------- call RPC_RRDB_RRDB_DUPLICATE ------------ + + // - asynchronous with on-stack duplicate_request and duplicate_response + template + task_ptr duplicate(duplicate_rpc &rpc, TCallback &&callback, dsn::task_tracker *tracker) + { + return rpc.call(_resolver, tracker, std::forward(callback)); + } + private: dsn::replication::partition_resolver_ptr _resolver; dsn::task_tracker _tracker; diff --git a/src/include/rrdb/rrdb.code.definition.h b/src/include/rrdb/rrdb.code.definition.h index 4d4a1bbd9f..621b49497a 100644 --- a/src/include/rrdb/rrdb.code.definition.h +++ b/src/include/rrdb/rrdb.code.definition.h @@ -10,6 +10,7 @@ DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_MULTI_REMOVE, NOT_ALLOW_BATCH, IS_ID DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_INCR, NOT_ALLOW_BATCH, NOT_IDEMPOTENT) DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_CHECK_AND_SET, NOT_ALLOW_BATCH, NOT_IDEMPOTENT) DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_CHECK_AND_MUTATE, NOT_ALLOW_BATCH, NOT_IDEMPOTENT) +DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_DUPLICATE, NOT_ALLOW_BATCH, IS_IDEMPOTENT) DEFINE_STORAGE_READ_RPC_CODE(RPC_RRDB_RRDB_GET) DEFINE_STORAGE_READ_RPC_CODE(RPC_RRDB_RRDB_MULTI_GET) DEFINE_STORAGE_READ_RPC_CODE(RPC_RRDB_RRDB_SORTKEY_COUNT) diff --git a/src/include/rrdb/rrdb_types.h b/src/include/rrdb/rrdb_types.h index 563019414c..b9ed126f5a 100644 --- a/src/include/rrdb/rrdb_types.h +++ b/src/include/rrdb/rrdb_types.h @@ -113,6 +113,10 @@ class scan_request; class scan_response; +class duplicate_request; + +class duplicate_response; + typedef struct _update_request__isset { _update_request__isset() : key(false), value(false), expire_ts_seconds(false) {} @@ -1791,6 +1795,129 @@ inline std::ostream &operator<<(std::ostream &out, const scan_response &obj) obj.printTo(out); return out; } + +typedef struct _duplicate_request__isset +{ + _duplicate_request__isset() + : timestamp(false), task_code(false), raw_message(false), cluster_id(false) + { + } + bool timestamp : 1; + bool task_code : 1; + bool raw_message : 1; + bool cluster_id : 1; +} _duplicate_request__isset; + +class duplicate_request +{ +public: + duplicate_request(const duplicate_request &); + duplicate_request(duplicate_request &&); + duplicate_request &operator=(const duplicate_request &); + duplicate_request &operator=(duplicate_request &&); + duplicate_request() : timestamp(0), cluster_id(0) {} + + virtual ~duplicate_request() throw(); + int64_t timestamp; + ::dsn::task_code task_code; + ::dsn::blob raw_message; + int8_t cluster_id; + + _duplicate_request__isset __isset; + + void __set_timestamp(const int64_t val); + + void __set_task_code(const ::dsn::task_code &val); + + void __set_raw_message(const ::dsn::blob &val); + + void __set_cluster_id(const int8_t val); + + bool operator==(const duplicate_request &rhs) const + { + if (__isset.timestamp != rhs.__isset.timestamp) + return false; + else if (__isset.timestamp && !(timestamp == rhs.timestamp)) + return false; + if (__isset.task_code != rhs.__isset.task_code) + return false; + else if (__isset.task_code && !(task_code == rhs.task_code)) + return false; + if (__isset.raw_message != rhs.__isset.raw_message) + return false; + else if (__isset.raw_message && !(raw_message == rhs.raw_message)) + return false; + if (__isset.cluster_id != rhs.__isset.cluster_id) + return false; + else if (__isset.cluster_id && !(cluster_id == rhs.cluster_id)) + return false; + return true; + } + bool operator!=(const duplicate_request &rhs) const { return !(*this == rhs); } + + bool operator<(const duplicate_request &) const; + + uint32_t read(::apache::thrift::protocol::TProtocol *iprot); + uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; + + virtual void printTo(std::ostream &out) const; +}; + +void swap(duplicate_request &a, duplicate_request &b); + +inline std::ostream &operator<<(std::ostream &out, const duplicate_request &obj) +{ + obj.printTo(out); + return out; +} + +typedef struct _duplicate_response__isset +{ + _duplicate_response__isset() : error(false) {} + bool error : 1; +} _duplicate_response__isset; + +class duplicate_response +{ +public: + duplicate_response(const duplicate_response &); + duplicate_response(duplicate_response &&); + duplicate_response &operator=(const duplicate_response &); + duplicate_response &operator=(duplicate_response &&); + duplicate_response() : error(0) {} + + virtual ~duplicate_response() throw(); + int32_t error; + + _duplicate_response__isset __isset; + + void __set_error(const int32_t val); + + bool operator==(const duplicate_response &rhs) const + { + if (__isset.error != rhs.__isset.error) + return false; + else if (__isset.error && !(error == rhs.error)) + return false; + return true; + } + bool operator!=(const duplicate_response &rhs) const { return !(*this == rhs); } + + bool operator<(const duplicate_response &) const; + + uint32_t read(::apache::thrift::protocol::TProtocol *iprot); + uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; + + virtual void printTo(std::ostream &out) const; +}; + +void swap(duplicate_response &a, duplicate_response &b); + +inline std::ostream &operator<<(std::ostream &out, const duplicate_response &obj) +{ + obj.printTo(out); + return out; +} } } // namespace diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp new file mode 100644 index 0000000000..7b77542387 --- /dev/null +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -0,0 +1,205 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include "pegasus_mutation_duplicator.h" +#include "pegasus_server_impl.h" +#include "base/pegasus_rpc_types.h" + +#include +#include +#include +#include + +namespace dsn { +namespace replication { + +/// static definition of mutation_duplicator::creator. +/*static*/ std::function( + replica_base *, string_view, string_view)> + mutation_duplicator::creator = [](replica_base *r, string_view remote, string_view app) { + return make_unique(r, remote, app); + }; + +} // namespace replication +} // namespace dsn + +namespace pegasus { +namespace server { + +using namespace dsn::literals::chrono_literals; + +/*extern*/ uint64_t get_hash_from_request(dsn::task_code tc, const dsn::blob &data) +{ + if (tc == dsn::apps::RPC_RRDB_RRDB_PUT) { + dsn::apps::update_request thrift_request; + dsn::from_blob_to_thrift(data, thrift_request); + return pegasus_key_hash(thrift_request.key); + } + if (tc == dsn::apps::RPC_RRDB_RRDB_REMOVE) { + dsn::blob raw_key; + dsn::from_blob_to_thrift(data, raw_key); + return pegasus_key_hash(raw_key); + } + if (tc == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) { + dsn::apps::multi_put_request thrift_request; + dsn::from_blob_to_thrift(data, thrift_request); + return pegasus_hash_key_hash(thrift_request.hash_key); + } + if (tc == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) { + dsn::apps::multi_remove_request thrift_request; + dsn::from_blob_to_thrift(data, thrift_request); + return pegasus_hash_key_hash(thrift_request.hash_key); + } + dfatal("unexpected task code: %s", tc.to_string()); + __builtin_unreachable(); +} + +pegasus_mutation_duplicator::pegasus_mutation_duplicator(dsn::replication::replica_base *r, + dsn::string_view remote_cluster, + dsn::string_view app) + : mutation_duplicator(r), _remote_cluster(remote_cluster) +{ + // initialize pegasus-client when this class is first time used. + static __attribute__((unused)) bool _dummy = pegasus_client_factory::initialize(nullptr); + + pegasus_client *client = pegasus_client_factory::get_client(remote_cluster.data(), app.data()); + _client = static_cast(client); + + auto ret = dsn::replication::get_duplication_cluster_id(remote_cluster.data()); + dassert_replica(ret.is_ok(), // never possible, meta server disallows such remote_cluster. + "invalid remote cluster: {}, err_ret: {}", + remote_cluster, + ret.get_error()); + _remote_cluster_id = static_cast(ret.get_value()); + + ddebug_replica("initialize mutation duplicator for local cluster [id:{}], " + "remote cluster [id:{}, addr:{}]", + get_current_cluster_id(), + _remote_cluster_id, + remote_cluster); + + // never possible to duplicate data to itself + dassert_replica(get_current_cluster_id() != _remote_cluster_id, + "invalid remote cluster: {} {}", + remote_cluster, + _remote_cluster_id); + + std::string str_gpid = fmt::format("{}", get_gpid()); + _shipped_ops.init_app_counter("app.pegasus", + fmt::format("dup_shipped_ops@{}", str_gpid).c_str(), + COUNTER_TYPE_RATE, + "the total ops of DUPLICATE requests sent from this app"); + _failed_shipping_ops.init_app_counter( + "app.pegasus", + fmt::format("dup_failed_shipping_ops@{}", str_gpid).c_str(), + COUNTER_TYPE_RATE, + "the qps of failed DUPLICATE requests sent from this app"); +} + +void pegasus_mutation_duplicator::send(uint64_t hash, callback cb) +{ + duplicate_rpc rpc; + { + dsn::zauto_lock _(_lock); + rpc = _inflights[hash].front(); + _inflights[hash].pop_front(); + } + + _client->async_duplicate(rpc, + [hash, cb, rpc, this](dsn::error_code err) mutable { + on_duplicate_reply(hash, std::move(cb), std::move(rpc), err); + }, + _env.__conf.tracker); +} + +void pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash, + mutation_duplicator::callback cb, + duplicate_rpc rpc, + dsn::error_code err) +{ + int perr = PERR_OK; + if (err == dsn::ERR_OK) { + perr = client::pegasus_client_impl::get_client_error( + client::pegasus_client_impl::get_rocksdb_server_error(rpc.response().error)); + } + + if (perr != PERR_OK || err != dsn::ERR_OK) { + _failed_shipping_ops->increment(); + + // randomly log the 1% of the failed duplicate rpc, because minor number of + // errors are acceptable. + // TODO(wutao1): print the entire request for future debugging. + if (dsn::rand::next_double01() <= 0.01) { + derror_replica("duplicate_rpc failed: {} [code:{}, timestamp:{}]", + err == dsn::ERR_OK ? _client->get_error_string(perr) : err.to_string(), + rpc.request().timestamp); + } + } else { + _shipped_ops->increment(); + _total_shipped_size += + rpc.dsn_request()->header->body_length + rpc.dsn_request()->header->hdr_length; + } + + { + dsn::zauto_lock _(_lock); + if (perr != PERR_OK || err != dsn::ERR_OK) { + // retry this rpc + _inflights[hash].push_front(rpc); + _env.schedule([hash, cb, this]() { send(hash, cb); }, 1_s); + return; + } + if (_inflights[hash].empty()) { + _inflights.erase(hash); + if (_inflights.empty()) { + // move forward to the next step. + cb(_total_shipped_size); + } + } else { + // start next rpc immediately + _env.schedule([hash, cb, this]() { send(hash, cb); }); + return; + } + } +} + +void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb) +{ + _total_shipped_size = 0; + + for (auto mut : muts) { + // mut: 0=timestamp, 1=rpc_code, 2=raw_message + + dsn::task_code rpc_code = std::get<1>(mut); + dsn::blob raw_message = std::get<2>(mut); + auto dreq = dsn::make_unique(); + uint64_t hash = get_hash_from_request(rpc_code, raw_message); + + if (rpc_code == dsn::apps::RPC_RRDB_RRDB_DUPLICATE) { + // ignore if it is a DUPLICATE + } else { + dreq->__set_raw_message(raw_message); + dreq->__set_task_code(rpc_code); + dreq->__set_timestamp(std::get<0>(mut)); + dreq->__set_cluster_id(get_current_cluster_id()); + } + + duplicate_rpc rpc(std::move(dreq), + dsn::apps::RPC_RRDB_RRDB_DUPLICATE, + 10_s, // TODO(wutao1): configurable timeout. + hash); + _inflights[hash].push_back(std::move(rpc)); + } + + if (_inflights.empty()) { + cb(0); + return; + } + auto inflights = _inflights; + for (const auto &kv : inflights) { + send(kv.first, cb); + } +} + +} // namespace server +} // namespace pegasus diff --git a/src/server/pegasus_mutation_duplicator.h b/src/server/pegasus_mutation_duplicator.h new file mode 100644 index 0000000000..681add796e --- /dev/null +++ b/src/server/pegasus_mutation_duplicator.h @@ -0,0 +1,64 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#pragma once + +#include +#include +#include + +#include "client_lib/pegasus_client_factory_impl.h" + +namespace pegasus { +namespace server { + +using namespace dsn::literals::chrono_literals; + +// Duplicates the loaded mutations to the remote pegasus cluster using pegasus client. +class pegasus_mutation_duplicator : public dsn::replication::mutation_duplicator +{ + using mutation_tuple_set = dsn::replication::mutation_tuple_set; + using mutation_tuple = dsn::replication::mutation_tuple; + using duplicate_rpc = dsn::apps::duplicate_rpc; + +public: + pegasus_mutation_duplicator(dsn::replication::replica_base *r, + dsn::string_view remote_cluster, + dsn::string_view app); + + void duplicate(mutation_tuple_set muts, callback cb) override; + + ~pegasus_mutation_duplicator() override { _env.__conf.tracker->wait_outstanding_tasks(); } + +private: + void send(uint64_t hash, callback cb); + + void on_duplicate_reply(uint64_t hash, callback, duplicate_rpc, dsn::error_code err); + +private: + friend class pegasus_mutation_duplicator_test; + + client::pegasus_client_impl *_client{nullptr}; + + uint8_t _remote_cluster_id{0}; + std::string _remote_cluster; + + // The duplicate_rpc are isolated by their hash value from hash key. + // Writes with the same hash are duplicated in mutation order to preserve data consistency, + // otherwise they are duplicated concurrently to improve performance. + std::map> _inflights; // hash -> duplicate_rpc + dsn::zlock _lock; + + size_t _total_shipped_size{0}; + + dsn::perf_counter_wrapper _shipped_ops; + dsn::perf_counter_wrapper _failed_shipping_ops; +}; + +// Decodes the binary `request_data` into write request in thrift struct, and +// calculates the hash value from the write's hash key. +extern uint64_t get_hash_from_request(dsn::task_code rpc_code, const dsn::blob &request_data); + +} // namespace server +} // namespace pegasus diff --git a/src/server/pegasus_write_service.h b/src/server/pegasus_write_service.h index 6f48cffa0f..284ad23195 100644 --- a/src/server/pegasus_write_service.h +++ b/src/server/pegasus_write_service.h @@ -6,6 +6,7 @@ #include #include +#include #include "base/pegasus_value_schema.h" #include "base/pegasus_utils.h" @@ -14,6 +15,14 @@ namespace pegasus { namespace server { +inline uint8_t get_current_cluster_id() +{ + static const uint8_t cluster_id = + dsn::replication::get_duplication_cluster_id(dsn::replication::get_current_cluster_name()) + .get_value(); + return cluster_id; +} + class pegasus_server_impl; class capacity_unit_calculator; diff --git a/src/server/test/CMakeLists.txt b/src/server/test/CMakeLists.txt index 5dc8e3cc0d..df5f5a73f9 100644 --- a/src/server/test/CMakeLists.txt +++ b/src/server/test/CMakeLists.txt @@ -6,6 +6,7 @@ set(MY_PROJ_SRC "../pegasus_server_impl.cpp" "../pegasus_write_service.cpp" "../pegasus_server_write.cpp" "../capacity_unit_calculator.cpp" + "../pegasus_mutation_duplicator.cpp" ) set(MY_SRC_SEARCH_MODE "GLOB") diff --git a/src/server/test/config.ini b/src/server/test/config.ini index a00fe5c9fa..391319eea2 100644 --- a/src/server/test/config.ini +++ b/src/server/test/config.ini @@ -125,6 +125,7 @@ stateful = true [replication] data_dirs_black_list_file = /home/mi/.pegasus_data_dirs_black_list +cluster_name = onebox deny_client_on_start = false delay_for_fd_timeout_on_start = false @@ -497,5 +498,10 @@ profiler::cancelled = false [meta_server] server_list = 0.0.0.0:34701 +[duplication-group] +onebox = 1 +onebox2 = 2 + [pegasus.clusters] onebox = @LOCAL_IP@:34701,@LOCAL_IP@:34702,@LOCAL_IP@:34703 +onebox2 = @LOCAL_IP@:34701,@LOCAL_IP@:34702,@LOCAL_IP@:34703 diff --git a/src/server/test/pegasus_mutation_duplicator_test.cpp b/src/server/test/pegasus_mutation_duplicator_test.cpp new file mode 100644 index 0000000000..f79eea034d --- /dev/null +++ b/src/server/test/pegasus_mutation_duplicator_test.cpp @@ -0,0 +1,296 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include "server/pegasus_mutation_duplicator.h" +#include "base/pegasus_rpc_types.h" +#include "pegasus_server_test_base.h" + +#include +#include +#include +#include + +namespace pegasus { +namespace server { + +using namespace dsn::replication; + +class pegasus_mutation_duplicator_test : public pegasus_server_test_base +{ + dsn::task_tracker _tracker; + dsn::pipeline::environment _env; + +public: + pegasus_mutation_duplicator_test() + { + _env.thread_pool(LPC_REPLICATION_LOW).task_tracker(&_tracker); + } + + void test_duplicate() + { + replica_base replica(dsn::gpid(1, 1), "fake_replica"); + auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp"); + duplicator->set_task_environment(&_env); + + mutation_tuple_set muts; + for (uint64_t i = 0; i < 100; i++) { + uint64_t ts = 200 + i; + dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT; + + dsn::apps::update_request request; + pegasus::pegasus_generate_key(request.key, std::string("hash"), std::string("sort")); + dsn::message_ptr msg = + dsn::from_thrift_request_to_received_message(request, dsn::apps::RPC_RRDB_RRDB_PUT); + auto data = dsn::move_message_to_blob(msg.get()); + + muts.insert(std::make_tuple(ts, code, data)); + } + + size_t total_shipped_size = 0; + auto duplicator_impl = dynamic_cast(duplicator.get()); + RPC_MOCKING(duplicate_rpc) + { + duplicator->duplicate(muts, [](size_t) {}); + + size_t total_size = 100; + while (total_size > 0) { + // ensure mutations having the same hash are sending sequentially. + ASSERT_EQ(duplicator_impl->_inflights.size(), 1); + ASSERT_EQ(duplicate_rpc::mail_box().size(), 1); + + total_size--; + ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), total_size); + + auto rpc = duplicate_rpc::mail_box().back(); + duplicate_rpc::mail_box().pop_back(); + + total_shipped_size += + rpc.dsn_request()->body_size() + rpc.dsn_request()->header->hdr_length; + duplicator_impl->on_duplicate_reply(get_hash(rpc), + [total_shipped_size](size_t final_size) { + ASSERT_EQ(total_shipped_size, final_size); + }, + rpc, + dsn::ERR_OK); + + // schedule next round + _tracker.wait_outstanding_tasks(); + } + + ASSERT_EQ(duplicator_impl->_total_shipped_size, total_shipped_size); + ASSERT_EQ(duplicator_impl->_inflights.size(), 0); + ASSERT_EQ(duplicate_rpc::mail_box().size(), 0); + } + } + + void test_duplicate_failed() + { + replica_base replica(dsn::gpid(1, 1), "fake_replica"); + auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp"); + duplicator->set_task_environment(&_env); + + mutation_tuple_set muts; + for (uint64_t i = 0; i < 10; i++) { + uint64_t ts = 200 + i; + dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT; + + dsn::apps::update_request request; + pegasus::pegasus_generate_key(request.key, std::string("hash"), std::string("sort")); + dsn::message_ptr msg = + dsn::from_thrift_request_to_received_message(request, dsn::apps::RPC_RRDB_RRDB_PUT); + auto data = dsn::move_message_to_blob(msg.get()); + + muts.insert(std::make_tuple(ts, code, data)); + } + + auto duplicator_impl = dynamic_cast(duplicator.get()); + RPC_MOCKING(duplicate_rpc) + { + duplicator->duplicate(muts, [](size_t) {}); + + auto rpc = duplicate_rpc::mail_box().back(); + duplicate_rpc::mail_box().pop_back(); + ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9); + + // failed + duplicator_impl->on_duplicate_reply( + get_hash(rpc), [](size_t) {}, rpc, dsn::ERR_TIMEOUT); + + // schedule next round + _tracker.wait_outstanding_tasks(); + + // retry infinitely + ASSERT_EQ(duplicator_impl->_inflights.size(), 1); + ASSERT_EQ(duplicate_rpc::mail_box().size(), 1); + ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9); + duplicate_rpc::mail_box().clear(); + + // with other error + rpc.response().error = PERR_INVALID_ARGUMENT; + duplicator_impl->on_duplicate_reply(get_hash(rpc), [](size_t) {}, rpc, dsn::ERR_OK); + _tracker.wait_outstanding_tasks(); + ASSERT_EQ(duplicator_impl->_inflights.size(), 1); + ASSERT_EQ(duplicate_rpc::mail_box().size(), 1); + ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9); + duplicate_rpc::mail_box().clear(); + + // with other error + rpc.response().error = PERR_OK; + duplicator_impl->on_duplicate_reply( + get_hash(rpc), [](size_t) {}, rpc, dsn::ERR_IO_PENDING); + _tracker.wait_outstanding_tasks(); + ASSERT_EQ(duplicator_impl->_inflights.size(), 1); + ASSERT_EQ(duplicate_rpc::mail_box().size(), 1); + ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9); + duplicate_rpc::mail_box().clear(); + } + } + + void test_duplicate_isolated_hashkeys() + { + replica_base replica(dsn::gpid(1, 1), "fake_replica"); + auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp"); + duplicator->set_task_environment(&_env); + + size_t total_size = 3000; + mutation_tuple_set muts; + for (uint64_t i = 0; i < total_size; i++) { + uint64_t ts = 200 + i; + dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT; + + dsn::apps::update_request request; + pegasus::pegasus_generate_key( + request.key, std::string("hash") + std::to_string(i), std::string("sort")); + dsn::message_ptr msg = dsn::from_thrift_request_to_received_message(request, code); + auto data = dsn::move_message_to_blob(msg.get()); + + muts.insert(std::make_tuple(ts, code, data)); + } + + auto duplicator_impl = dynamic_cast(duplicator.get()); + RPC_MOCKING(duplicate_rpc) + { + duplicator->duplicate(muts, [](size_t) {}); + + // ensure each bucket has only 1 request and each request is + // isolated with others. + ASSERT_EQ(duplicator_impl->_inflights.size(), total_size); + ASSERT_EQ(duplicate_rpc::mail_box().size(), total_size); + for (const auto &ents : duplicator_impl->_inflights) { + ASSERT_EQ(ents.second.size(), 0); + } + + // reply with success + auto rpc_list = std::move(duplicate_rpc::mail_box()); + for (const auto &rpc : rpc_list) { + rpc.response().error = dsn::ERR_OK; + duplicator_impl->on_duplicate_reply(get_hash(rpc), [](size_t) {}, rpc, dsn::ERR_OK); + } + _tracker.wait_outstanding_tasks(); + ASSERT_EQ(duplicate_rpc::mail_box().size(), 0); + ASSERT_EQ(duplicator_impl->_inflights.size(), 0); + } + } + + void test_create_duplicator() + { + replica_base replica(dsn::gpid(1, 1), "fake_replica"); + auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp"); + duplicator->set_task_environment(&_env); + auto duplicator_impl = dynamic_cast(duplicator.get()); + ASSERT_EQ(duplicator_impl->_remote_cluster_id, 2); + ASSERT_EQ(duplicator_impl->_remote_cluster, "onebox2"); + ASSERT_EQ(get_current_cluster_id(), 1); + } + +private: + static uint64_t get_hash(const duplicate_rpc &rpc) + { + return get_hash_from_request(rpc.request().task_code, rpc.request().raw_message); + } +}; + +TEST_F(pegasus_mutation_duplicator_test, get_hash_from_request) +{ + std::string hash_key("hash"); + std::string sort_key("sort"); + uint64_t hash = + pegasus::pegasus_hash_key_hash(dsn::blob(hash_key.data(), 0, hash_key.length())); + + { + dsn::apps::multi_put_request request; + request.hash_key.assign(hash_key.data(), 0, hash_key.length()); + dsn::message_ptr msg = dsn::from_thrift_request_to_received_message( + request, dsn::apps::RPC_RRDB_RRDB_MULTI_PUT); + auto data = dsn::move_message_to_blob(msg.get()); + ASSERT_EQ(hash, get_hash_from_request(dsn::apps::RPC_RRDB_RRDB_MULTI_PUT, data)); + } + + { + dsn::apps::multi_remove_request request; + request.hash_key.assign(hash_key.data(), 0, hash_key.length()); + dsn::message_ptr msg = dsn::from_thrift_request_to_received_message( + request, dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE); + + auto data = dsn::move_message_to_blob(msg.get()); + ASSERT_EQ(hash, get_hash_from_request(dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE, data)); + } + + { + dsn::apps::update_request request; + pegasus::pegasus_generate_key(request.key, hash_key, sort_key); + dsn::message_ptr msg = + dsn::from_thrift_request_to_received_message(request, dsn::apps::RPC_RRDB_RRDB_PUT); + auto data = dsn::move_message_to_blob(msg.get()); + ASSERT_EQ(hash, get_hash_from_request(dsn::apps::RPC_RRDB_RRDB_PUT, data)); + } + + { + dsn::blob key; + pegasus::pegasus_generate_key(key, hash_key, sort_key); + dsn::message_ptr msg = + dsn::from_thrift_request_to_received_message(key, dsn::apps::RPC_RRDB_RRDB_REMOVE); + auto data = dsn::move_message_to_blob(msg.get()); + ASSERT_EQ(hash, get_hash_from_request(dsn::apps::RPC_RRDB_RRDB_REMOVE, data)); + } +} + +// Verifies that calls on `get_hash_key_from_request` won't make +// message unable to read. (if `get_hash_key_from_request` doesn't +// use copy the message internally, it will.) +TEST_F(pegasus_mutation_duplicator_test, read_after_get_hash_key) +{ + std::string hash_key("hash"); + std::string sort_key("sort"); + uint64_t hash = + pegasus::pegasus_hash_key_hash(dsn::blob(hash_key.data(), 0, hash_key.length())); + + dsn::message_ex *msg; + { + dsn::apps::update_request request; + pegasus::pegasus_generate_key(request.key, hash_key, sort_key); + msg = dsn::from_thrift_request_to_received_message(request, dsn::apps::RPC_RRDB_RRDB_PUT); + } + auto data = dsn::move_message_to_blob(msg); + ASSERT_EQ(hash, get_hash_from_request(dsn::apps::RPC_RRDB_RRDB_PUT, data)); + + pegasus::put_rpc rpc(msg); + dsn::blob raw_key; + pegasus::pegasus_generate_key(raw_key, hash_key, sort_key); + ASSERT_EQ(rpc.request().key.to_string(), raw_key.to_string()); +} + +TEST_F(pegasus_mutation_duplicator_test, duplicate) { test_duplicate(); } + +TEST_F(pegasus_mutation_duplicator_test, duplicate_failed) { test_duplicate_failed(); } + +TEST_F(pegasus_mutation_duplicator_test, duplicate_isolated_hashkeys) +{ + test_duplicate_isolated_hashkeys(); +} + +TEST_F(pegasus_mutation_duplicator_test, create_duplicator) { test_create_duplicator(); } + +} // namespace server +} // namespace pegasus