From 782adba1a182e4a6198e9b0e21bee0480a8609ea Mon Sep 17 00:00:00 2001 From: Wu Tao Date: Thu, 19 Sep 2019 14:54:31 +0800 Subject: [PATCH] feat(dup): implement ship_mutation stage and mutation_batch (#312) --- src/dist/replication/lib/CMakeLists.txt | 1 + .../lib/duplication/duplication_pipeline.cpp | 47 ++++++- .../lib/duplication/duplication_pipeline.h | 16 +++ .../lib/duplication/mutation_batch.cpp | 119 ++++++++++++++++++ .../lib/duplication/mutation_batch.h | 73 +++++++++++ .../lib/duplication/replica_duplicator.h | 2 + .../duplication/test/duplication_test_base.h | 17 ++- .../duplication/test/mutation_batch_test.cpp | 63 ++++++++++ .../duplication/test/ship_mutation_test.cpp | 75 +++++++++++ .../test/replica_test/unit_test/mock_utils.h | 31 +++++ 10 files changed, 440 insertions(+), 4 deletions(-) create mode 100644 src/dist/replication/lib/duplication/mutation_batch.cpp create mode 100644 src/dist/replication/lib/duplication/mutation_batch.h create mode 100644 src/dist/replication/lib/duplication/test/mutation_batch_test.cpp create mode 100644 src/dist/replication/lib/duplication/test/ship_mutation_test.cpp diff --git a/src/dist/replication/lib/CMakeLists.txt b/src/dist/replication/lib/CMakeLists.txt index a47e624460..ec7392a2d9 100644 --- a/src/dist/replication/lib/CMakeLists.txt +++ b/src/dist/replication/lib/CMakeLists.txt @@ -6,6 +6,7 @@ set(DUPLICATION_SRC duplication/replica_duplicator.cpp duplication/duplication_pipeline.cpp duplication/load_from_private_log.cpp + duplication/mutation_batch.cpp ) # Source files under CURRENT project directory will be automatically included. diff --git a/src/dist/replication/lib/duplication/duplication_pipeline.cpp b/src/dist/replication/lib/duplication/duplication_pipeline.cpp index 6771e2ba33..2c5d6a1071 100644 --- a/src/dist/replication/lib/duplication/duplication_pipeline.cpp +++ b/src/dist/replication/lib/duplication/duplication_pipeline.cpp @@ -12,6 +12,14 @@ namespace dsn { namespace replication { +// // +// mutation_duplicator // +// // + +/*static*/ std::function( + replica_base *, string_view /*remote cluster*/, string_view /*app*/)> + mutation_duplicator::creator; + // // // load_mutation // // // @@ -35,14 +43,47 @@ load_mutation::load_mutation(replica_duplicator *duplicator, // ship_mutation // // // +void ship_mutation::ship(mutation_tuple_set &&in) +{ + _mutation_duplicator->duplicate(std::move(in), [this](size_t total_shipped_size) mutable { + update_progress(); + step_down_next_stage(); + }); +} + void ship_mutation::run(decree &&last_decree, mutation_tuple_set &&in) { - // TBD + _last_decree = last_decree; + + if (in.empty()) { + update_progress(); + step_down_next_stage(); + return; + } + + ship(std::move(in)); } -ship_mutation::ship_mutation(replica_duplicator *duplicator) : replica_base(duplicator) +void ship_mutation::update_progress() { - // TBD + dcheck_eq_replica( + _duplicator->update_progress(duplication_progress().set_last_decree(_last_decree)), + error_s::ok()); + + // committed decree never decreases + decree last_committed_decree = _replica->last_committed_decree(); + dcheck_ge_replica(last_committed_decree, _last_decree); +} + +ship_mutation::ship_mutation(replica_duplicator *duplicator) + : replica_base(duplicator), + _duplicator(duplicator), + _replica(duplicator->_replica), + _stub(duplicator->_replica->get_replica_stub()) +{ + _mutation_duplicator = new_mutation_duplicator( + duplicator, _duplicator->remote_cluster_name(), _replica->get_app_info()->app_name); + _mutation_duplicator->set_task_environment(duplicator); } } // namespace replication diff --git a/src/dist/replication/lib/duplication/duplication_pipeline.h b/src/dist/replication/lib/duplication/duplication_pipeline.h index 3bb7049902..60ca72e04c 100644 --- a/src/dist/replication/lib/duplication/duplication_pipeline.h +++ b/src/dist/replication/lib/duplication/duplication_pipeline.h @@ -47,6 +47,22 @@ class ship_mutation : public replica_base, /// ==== Implementation ==== /// explicit ship_mutation(replica_duplicator *duplicator); + + void ship(mutation_tuple_set &&in); + +private: + void update_progress(); + + friend struct ship_mutation_test; + friend class replica_duplicator_test; + + std::unique_ptr _mutation_duplicator; + + replica_duplicator *_duplicator; + replica *_replica; + replica_stub *_stub; + + decree _last_decree{invalid_decree}; }; } // namespace replication diff --git a/src/dist/replication/lib/duplication/mutation_batch.cpp b/src/dist/replication/lib/duplication/mutation_batch.cpp new file mode 100644 index 0000000000..5ed4dbc512 --- /dev/null +++ b/src/dist/replication/lib/duplication/mutation_batch.cpp @@ -0,0 +1,119 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Microsoft Corporation + * + * -=- Robust Distributed System Nucleus (rDSN) -=- + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include +#include + +#include "replica_duplicator.h" +#include "mutation_batch.h" +#include "dist/replication/lib/prepare_list.h" + +namespace dsn { +namespace replication { + +/*static*/ constexpr int64_t mutation_batch::PREPARE_LIST_NUM_ENTRIES; + +error_s mutation_batch::add(mutation_ptr mu) +{ + if (mu->get_decree() <= _mutation_buffer->last_committed_decree()) { + // ignore + return error_s::ok(); + } + + auto old = _mutation_buffer->get_mutation_by_decree(mu->get_decree()); + if (old != nullptr && old->data.header.ballot >= mu->data.header.ballot) { + // ignore + return error_s::ok(); + } + + error_code ec = _mutation_buffer->prepare(mu, partition_status::PS_INACTIVE); + if (ec != ERR_OK) { + return FMT_ERR( + ERR_INVALID_DATA, + "failed to add mutation [err:{}, logged:{}, decree:{}, committed:{}, start_decree:{}]", + ec.to_string(), + mu->is_logged(), + mu->get_decree(), + mu->data.header.last_committed_decree, + _start_decree); + } + + return error_s::ok(); +} + +decree mutation_batch::last_decree() const { return _mutation_buffer->last_committed_decree(); } + +void mutation_batch::set_start_decree(decree d) { _start_decree = d; } + +mutation_tuple_set mutation_batch::move_all_mutations() +{ + // free the internal space + _mutation_buffer->truncate(last_decree()); + return std::move(_loaded_mutations); +} + +mutation_batch::mutation_batch(replica_duplicator *r) : replica_base(r) +{ + // Prepend a special tag identifying this is a mutation_batch, + // so `dxxx_replica` logging in prepare_list will print along with its real caller. + // This helps for debugging. + replica_base base(r->get_gpid(), std::string("mutation_batch@") + r->replica_name()); + _mutation_buffer = + make_unique(&base, 0, PREPARE_LIST_NUM_ENTRIES, [this](mutation_ptr &mu) { + // committer + add_mutation_if_valid(mu, _loaded_mutations, _start_decree); + }); + + // start duplication from confirmed_decree + _mutation_buffer->reset(r->progress().confirmed_decree); +} + +/*extern*/ void +add_mutation_if_valid(mutation_ptr &mu, mutation_tuple_set &mutations, decree start_decree) +{ + if (mu->get_decree() < start_decree) { + // ignore + return; + } + for (mutation_update &update : mu->data.updates) { + // ignore WRITE_EMPTY + if (update.code == RPC_REPLICATION_WRITE_EMPTY) { + continue; + } + + blob bb; + if (update.data.buffer() != nullptr) { + bb = std::move(update.data); + } else { + bb = blob::create_from_bytes(update.data.data(), update.data.length()); + } + + mutations.emplace(std::make_tuple(mu->data.header.timestamp, update.code, std::move(bb))); + } +} + +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/lib/duplication/mutation_batch.h b/src/dist/replication/lib/duplication/mutation_batch.h new file mode 100644 index 0000000000..ab424ec9ea --- /dev/null +++ b/src/dist/replication/lib/duplication/mutation_batch.h @@ -0,0 +1,73 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Microsoft Corporation + * + * -=- Robust Distributed System Nucleus (rDSN) -=- + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#pragma once + +#include + +#include "dist/replication/lib/mutation.h" + +namespace dsn { +namespace replication { + +class replica_duplicator; +class prepare_list; + +// A sorted array of committed mutations that are ready for duplication. +// Not thread-safe. +class mutation_batch : replica_base +{ +public: + static constexpr int64_t PREPARE_LIST_NUM_ENTRIES{200}; + + explicit mutation_batch(replica_duplicator *r); + + error_s add(mutation_ptr mu); + + mutation_tuple_set move_all_mutations(); + + decree last_decree() const; + + // mutations with decree < d will be ignored. + void set_start_decree(decree d); + + size_t size() const { return _loaded_mutations.size(); } + +private: + friend class replica_duplicator_test; + + std::unique_ptr _mutation_buffer; + mutation_tuple_set _loaded_mutations; + decree _start_decree{invalid_decree}; +}; + +using mutation_batch_u_ptr = std::unique_ptr; + +/// Extract mutations into mutation_tuple_set if they are not WRITE_EMPTY. +extern void add_mutation_if_valid(mutation_ptr &, mutation_tuple_set &, decree start_decree); + +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/lib/duplication/replica_duplicator.h b/src/dist/replication/lib/duplication/replica_duplicator.h index 3914c875ab..9a572dba2a 100644 --- a/src/dist/replication/lib/duplication/replica_duplicator.h +++ b/src/dist/replication/lib/duplication/replica_duplicator.h @@ -67,6 +67,8 @@ class replica_duplicator : public replica_base, public pipeline::base dupid_t id() const { return _id; } + const std::string &remote_cluster_name() const { return _remote_cluster_name; } + // Thread-safe duplication_progress progress() const { diff --git a/src/dist/replication/lib/duplication/test/duplication_test_base.h b/src/dist/replication/lib/duplication/test/duplication_test_base.h index e675cdfde9..30a244ae8e 100644 --- a/src/dist/replication/lib/duplication/test/duplication_test_base.h +++ b/src/dist/replication/lib/duplication/test/duplication_test_base.h @@ -14,7 +14,12 @@ namespace replication { class duplication_test_base : public replica_test_base { public: - duplication_test_base() {} + duplication_test_base() + { + mutation_duplicator::creator = [](replica_base *r, dsn::string_view, dsn::string_view) { + return make_unique(r); + }; + } void add_dup(mock_replica *r, replica_duplicator_u_ptr dup) { @@ -29,6 +34,16 @@ class duplication_test_base : public replica_test_base } return dup_entities[dupid].get(); } + + std::unique_ptr create_test_duplicator(decree confirmed = invalid_decree) + { + duplication_entry dup_ent; + dup_ent.dupid = 1; + dup_ent.remote = "remote_address"; + dup_ent.status = duplication_status::DS_PAUSE; + dup_ent.progress[_replica->get_gpid().get_partition_index()] = confirmed; + return make_unique(dup_ent, _replica.get()); + } }; } // namespace replication diff --git a/src/dist/replication/lib/duplication/test/mutation_batch_test.cpp b/src/dist/replication/lib/duplication/test/mutation_batch_test.cpp new file mode 100644 index 0000000000..ab6b6be114 --- /dev/null +++ b/src/dist/replication/lib/duplication/test/mutation_batch_test.cpp @@ -0,0 +1,63 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Microsoft Corporation + * + * -=- Robust Distributed System Nucleus (rDSN) -=- + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include "dist/replication/test/replica_test/unit_test/replica_test_base.h" +#include "dist/replication/lib/duplication/mutation_batch.h" + +namespace dsn { +namespace replication { + +class mutation_batch_test : public replica_test_base +{ +}; + +TEST_F(mutation_batch_test, add_mutation_if_valid) +{ + mutation_tuple_set result; + + std::string s = "hello"; + mutation_ptr mu1 = create_test_mutation(1, s); + add_mutation_if_valid(mu1, result, 0); + mutation_tuple mt1 = *result.begin(); + + result.clear(); + + s = "world"; + mutation_ptr mu2 = create_test_mutation(2, s); + add_mutation_if_valid(mu2, result, 0); + mutation_tuple mt2 = *result.begin(); + + ASSERT_EQ(std::get<2>(mt1).to_string(), "hello"); + ASSERT_EQ(std::get<2>(mt2).to_string(), "world"); + + // decree 1 should be ignored + mutation_ptr mu3 = create_test_mutation(1, s); + add_mutation_if_valid(mu2, result, 2); + ASSERT_EQ(result.size(), 2); +} + +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/lib/duplication/test/ship_mutation_test.cpp b/src/dist/replication/lib/duplication/test/ship_mutation_test.cpp new file mode 100644 index 0000000000..389b626f44 --- /dev/null +++ b/src/dist/replication/lib/duplication/test/ship_mutation_test.cpp @@ -0,0 +1,75 @@ +// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include "dist/replication/lib/duplication/mutation_batch.h" +#include "dist/replication/lib/duplication/duplication_pipeline.h" +#include "duplication_test_base.h" + +namespace dsn { +namespace replication { + +/*static*/ mock_mutation_duplicator::duplicate_function mock_mutation_duplicator::_func; + +struct mock_stage : pipeline::when<> +{ + void run() override {} +}; + +struct ship_mutation_test : public duplication_test_base +{ + ship_mutation_test() + { + _replica->init_private_log(_log_dir); + duplicator = create_test_duplicator(); + } + + // ensure ship_mutation retries after error. + // ensure it clears up all pending mutations after stage ends. + // ensure it update duplicator->last_decree after stage ends. + void test_ship_mutation_tuple_set() + { + ship_mutation shipper(duplicator.get()); + mock_stage end; + + pipeline::base base; + base.thread_pool(LPC_REPLICATION_LONG_LOW).task_tracker(_replica->tracker()); + base.from(shipper).link(end); + + mutation_batch batch(duplicator.get()); + batch.add(create_test_mutation(1, "hello")); + batch.add(create_test_mutation(2, "hello")); + mutation_tuple_set in = batch.move_all_mutations(); + _replica->set_last_committed_decree(2); + + std::vector expected; + for (auto mut : in) { + expected.push_back(std::move(mut)); + } + + mock_mutation_duplicator::mock( + [&expected](mutation_tuple_set muts, mutation_duplicator::callback cb) { + int i = 0; + for (auto mut : muts) { + ASSERT_EQ(std::get<0>(expected[i]), std::get<0>(mut)); + ASSERT_EQ(std::get<1>(expected[i]), std::get<1>(mut)); + ASSERT_EQ(std::get<2>(expected[i]).to_string(), std::get<2>(mut).to_string()); + ASSERT_EQ(std::get<2>(expected[i]).to_string(), "hello"); + i++; + } + cb(0); + }); + + shipper.run(2, std::move(in)); + + base.wait_all(); + ASSERT_EQ(duplicator->progress().last_decree, 2); + } + + std::unique_ptr duplicator; +}; + +TEST_F(ship_mutation_test, ship_mutation_tuple_set) { test_ship_mutation_tuple_set(); } + +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/test/replica_test/unit_test/mock_utils.h b/src/dist/replication/test/replica_test/unit_test/mock_utils.h index 0cd010cf07..61db86ca38 100644 --- a/src/dist/replication/test/replica_test/unit_test/mock_utils.h +++ b/src/dist/replication/test/replica_test/unit_test/mock_utils.h @@ -27,6 +27,7 @@ #pragma once #include +#include #include #include @@ -88,6 +89,24 @@ class mock_replica : public replica _app.reset(nullptr); } + void init_private_log(const std::string &log_dir) + { + utils::filesystem::remove_path(log_dir); + + _private_log = + new mutation_log_private(log_dir, + _options->log_private_file_size_mb, + get_gpid(), + this, + _options->log_private_batch_buffer_kb * 1024, + _options->log_private_batch_buffer_count, + _options->log_private_batch_buffer_flush_interval_ms); + + error_code err = + _private_log->open(nullptr, [this](error_code err) { dcheck_eq_replica(err, ERR_OK); }); + dcheck_eq_replica(err, ERR_OK); + } + replica_duplicator_manager &get_replica_duplicator_manager() { return *_duplication_mgr; } void as_primary() { _config.status = partition_status::PS_PRIMARY; } @@ -99,6 +118,7 @@ class mock_replica : public replica void set_partition_status(partition_status::type status) { _config.status = status; } void set_child_gpid(gpid pid) { _child_gpid = pid; } void set_init_child_ballot(ballot b) { _child_init_ballot = b; } + void set_last_committed_decree(decree d) { _prepare_list->reset(d); } }; typedef dsn::ref_ptr mock_replica_ptr; @@ -253,5 +273,16 @@ class mock_mutation_log_shared : public mutation_log_shared }; typedef dsn::ref_ptr mock_mutation_log_shared_ptr; +struct mock_mutation_duplicator : public mutation_duplicator +{ + explicit mock_mutation_duplicator(replica_base *r) : mutation_duplicator(r) {} + + void duplicate(mutation_tuple_set mut, callback cb) override { _func(mut, cb); } + + typedef std::function duplicate_function; + static void mock(duplicate_function hook) { _func = std::move(hook); } + static duplicate_function _func; +}; + } // namespace replication } // namespace dsn