Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

replica: add unit tests for mutation log replay #233

Merged
merged 3 commits into from
Mar 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions src/dist/replication/lib/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,22 @@ void mutation_log_shared::write_pending_mutations(bool release_lock_required)

////////////////////////////////////////////////////

mutation_log_private::mutation_log_private(const std::string &dir,
int32_t max_log_file_mb,
gpid gpid,
replica *r,
uint32_t batch_buffer_bytes,
uint32_t batch_buffer_max_count,
uint64_t batch_buffer_flush_interval_ms)
: mutation_log(dir, max_log_file_mb, gpid, r),
replica_base(r),
_batch_buffer_bytes(batch_buffer_bytes),
_batch_buffer_max_count(batch_buffer_max_count),
_batch_buffer_flush_interval_ms(batch_buffer_flush_interval_ms)
{
mutation_log_private::init_states();
}

::dsn::task_ptr mutation_log_private::append(mutation_ptr &mu,
dsn::task_code callback_code,
dsn::task_tracker *tracker,
Expand Down Expand Up @@ -483,8 +499,6 @@ void mutation_log::init_states()
_private_max_commit_on_disk = 0;
}

mutation_log::~mutation_log() { close(); }

error_code mutation_log::open(replay_callback read_callback,
io_failure_callback write_error_callback)
{
Expand Down Expand Up @@ -538,7 +552,7 @@ error_code mutation_log::open(replay_callback read_callback,

if (_is_private) {
ddebug("open private log %s succeed, start_offset = %" PRId64 ", end_offset = %" PRId64
", size = %" PRId64 ", privious_max_decree = %" PRId64,
", size = %" PRId64 ", previous_max_decree = %" PRId64,
fpath.c_str(),
log->start_offset(),
log->end_offset(),
Expand Down Expand Up @@ -2266,5 +2280,5 @@ int log_file::write_file_header(binary_writer &writer, const replica_log_info_ma

return get_file_header_size();
}
}
} // end namespace
} // namespace replication
} // namespace dsn
36 changes: 24 additions & 12 deletions src/dist/replication/lib/mutation_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include <atomic>
#include <dsn/tool-api/zlocks.h>
#include <dsn/perf_counter/perf_counter_wrapper.h>
#include <dsn/dist/replication/replica_base.h>

namespace dsn {
namespace replication {
Expand Down Expand Up @@ -164,7 +165,8 @@ class mutation_log : public ref_counter
// when is_private = true, should specify "private_gpid"
//
mutation_log(const std::string &dir, int32_t max_log_file_mb, gpid gpid, replica *r = nullptr);
virtual ~mutation_log();

virtual ~mutation_log() = default;

//
// initialization
Expand Down Expand Up @@ -349,6 +351,8 @@ class mutation_log : public ref_counter
dsn::task_tracker _tracker;

private:
friend class mutation_log_test;

///////////////////////////////////////////////
//// memory states
///////////////////////////////////////////////
Expand Down Expand Up @@ -396,7 +400,12 @@ class mutation_log_shared : public mutation_log
{
}

virtual ~mutation_log_shared() override { _tracker.cancel_outstanding_tasks(); }
virtual ~mutation_log_shared() override
{
close();
_tracker.cancel_outstanding_tasks();
}

virtual ::dsn::task_ptr append(mutation_ptr &mu,
dsn::task_code callback_code,
dsn::task_tracker *tracker,
Expand Down Expand Up @@ -435,25 +444,28 @@ class mutation_log_shared : public mutation_log
perf_counter_wrapper *_write_size_counter;
};

class mutation_log_private : public mutation_log
class mutation_log_private : public mutation_log, private replica_base
{
public:
// Parameters:
// - batch_buffer_max_count, batch_buffer_bytes
// The hint of limited size for the write buffer storing the pending mutations.
// Note that the actual log block is still possible to be larger than the
// hinted size.
mutation_log_private(const std::string &dir,
int32_t max_log_file_mb,
gpid gpid,
replica *r,
uint32_t batch_buffer_bytes,
uint32_t batch_buffer_max_count,
uint64_t batch_buffer_flush_interval_ms)
: mutation_log(dir, max_log_file_mb, gpid, r),
_batch_buffer_bytes(batch_buffer_bytes),
_batch_buffer_max_count(batch_buffer_max_count),
_batch_buffer_flush_interval_ms(batch_buffer_flush_interval_ms)
uint64_t batch_buffer_flush_interval_ms);

~mutation_log_private() override
{
mutation_log_private::init_states();
close();
_tracker.cancel_outstanding_tasks();
}

virtual ~mutation_log_private() override { _tracker.cancel_outstanding_tasks(); }
virtual ::dsn::task_ptr append(mutation_ptr &mu,
dsn::task_code callback_code,
dsn::task_tracker *tracker,
Expand Down Expand Up @@ -639,5 +651,5 @@ class log_file : public ref_counter
// for write, the value is set by write_file_header().
replica_log_info_map _previous_log_max_decrees;
};
}
} // namespace
} // namespace replication
} // namespace dsn
2 changes: 2 additions & 0 deletions src/dist/replication/lib/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
friend class ::dsn::replication::replica;
friend class ::dsn::replication::potential_secondary_context;
friend class ::dsn::replication::cold_backup_context;
friend class mock_replica_stub;

typedef std::unordered_map<gpid, ::dsn::task_ptr> opening_replicas;
typedef std::unordered_map<gpid, std::tuple<task_ptr, replica_ptr, app_info, replica_info>>
closing_replicas; // <gpid, <close_task, replica, app_info, replica_info> >
Expand Down
72 changes: 72 additions & 0 deletions src/dist/replication/test/replica_test/unit_test/mock_utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

#pragma once

#include <dsn/dist/replication/replication_app_base.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/filesystem.h>

#include "dist/replication/lib/replica.h"
#include "dist/replication/lib/replica_stub.h"

namespace dsn {
namespace replication {

class mock_replica : public replica
{
public:
mock_replica(replica_stub *stub, gpid gpid, const app_info &app, const char *dir)
: replica(stub, gpid, app, dir, false)
{
}

~mock_replica() override {}
};

inline std::unique_ptr<mock_replica> create_mock_replica(replica_stub *stub,
int appid = 1,
int partition_index = 1,
const char *dir = "./")
{
gpid gpid(appid, partition_index);
app_info app_info;
app_info.app_type = "replica";
app_info.app_name = "temp";

return make_unique<mock_replica>(stub, gpid, app_info, dir);
}

class mock_replica_stub : public replica_stub
{
public:
mock_replica_stub() = default;

~mock_replica_stub() override = default;
};

} // namespace replication
} // namespace dsn
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,27 @@
* THE SOFTWARE.
*/

/*
* Description:
* What is this file about?
*
* Revision history:
* xxxx-xx-xx, author, first version
* xxxx-xx-xx, author, fix bug about xxx
*/
#include "dist/replication/lib/mutation_log.h"
#include "dist/replication/test/replica_test/unit_test/replica_test_base.h"

#include <dsn/utility/filesystem.h>
#include <gtest/gtest.h>
#include <chrono>
#include <condition_variable>

using namespace ::dsn;
using namespace ::dsn::replication;
namespace dsn {
namespace replication {

TEST(replication, mutation_log_learn)
class mutation_log_test : public replica_test_base
{
};

TEST_F(mutation_log_test, learn)
{
std::chrono::steady_clock clock;
gpid gpid(1, 1);
std::string str = "hello, world!";
std::string logp = "./test-log";
std::string logp = _log_dir;

// prepare mutations
std::vector<mutation_ptr> mutations;
Expand Down Expand Up @@ -86,7 +84,8 @@ TEST(replication, mutation_log_learn)

// writing logs
time_tic = clock.now();
mutation_log_ptr mlog = new mutation_log_private(logp, 32, gpid, nullptr, 4096, 512, 10000);
mutation_log_ptr mlog =
new mutation_log_private(logp, 32, gpid, _replica.get(), 4096, 512, 10000);
mlog->open(nullptr, nullptr);
for (auto &mu : mutations) {
mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0);
Expand All @@ -111,7 +110,7 @@ TEST(replication, mutation_log_learn)

// reading logs
time_tic = clock.now();
mlog = new mutation_log_private(logp, 1, gpid, nullptr, 1024, 512, 10000);
mlog = new mutation_log_private(logp, 1, gpid, _replica.get(), 1024, 512, 10000);
mlog->open([](int log_length, mutation_ptr &mu) -> bool { return true; }, nullptr);
time_toc = clock.now();
std::cout
Expand Down Expand Up @@ -182,3 +181,6 @@ TEST(replication, mutation_log_learn)
utils::filesystem::remove_path(logp);
}
}

} // namespace replication
} // namespace dsn
Loading