Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Supports learner mode. #1

Merged
merged 14 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ macro(use_cxx11)
if(CMAKE_VERSION VERSION_LESS "3.1.3")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
else()
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
endif()
endmacro(use_cxx11)
Expand Down
53 changes: 52 additions & 1 deletion src/braft/configuration_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,46 @@ int ConfigurationManager::add(const ConfigurationEntry& entry) {
return 0;
}

int ConfigurationManager::add_learner_conf(const ConfigurationEntry& entry) {
if (!_learner_configurations.empty()) {
if (_learner_configurations.back().id.index >= entry.id.index) {
CHECK(false) << "Did you forget to call truncate_suffix before "
" the last log index goes back";
return -1;
}
}
_learner_configurations.push_back(entry);
return 0;
}

void ConfigurationManager::truncate_prefix(const int64_t first_index_kept) {
while (!_configurations.empty()
&& _configurations.front().id.index < first_index_kept) {
_configurations.pop_front();
}

while (!_learner_configurations.empty() &&
_learner_configurations.front().id.index < first_index_kept) {
_learner_configurations.pop_front();
}
}

void ConfigurationManager::truncate_suffix(const int64_t last_index_kept) {
while (!_configurations.empty()
&& _configurations.back().id.index > last_index_kept) {
_configurations.pop_back();
}

while (!_learner_configurations.empty()
&& _learner_configurations.back().id.index > last_index_kept) {
_learner_configurations.pop_back();
}
}

void ConfigurationManager::set_snapshot(const ConfigurationEntry& entry) {
void ConfigurationManager::set_snapshot(const ConfigurationEntry& entry, const ConfigurationEntry& learner_entry) {
CHECK_GE(entry.id, _snapshot.id);
_snapshot = entry;
_learner_snapshot = learner_entry;
}

void ConfigurationManager::get(int64_t last_included_index,
Expand All @@ -70,11 +93,39 @@ void ConfigurationManager::get(int64_t last_included_index,
*conf = *it;
}

void ConfigurationManager::get_learner_conf(int64_t last_included_index,
ConfigurationEntry* conf) {
if (_learner_configurations.empty()) {
CHECK_GE(last_included_index, _snapshot.id.index);
*conf = _snapshot;
return;
}
std::deque<ConfigurationEntry>::iterator it;
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
for (it = _learner_configurations.begin(); it != _learner_configurations.end(); ++it) {
if (it->id.index > last_included_index) {
break;
}
}
if (it == _learner_configurations.begin()) {
*conf = _snapshot;
return;
}
--it;
*conf = *it;
}

const ConfigurationEntry& ConfigurationManager::last_configuration() const {
if (!_configurations.empty()) {
return _configurations.back();
}
return _snapshot;
}

const ConfigurationEntry& ConfigurationManager::last_learner_configuration() const {
if (!_learner_configurations.empty()) {
return _learner_configurations.back();
}
return _learner_snapshot;
}

} // namespace braft
11 changes: 10 additions & 1 deletion src/braft/configuration_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,31 @@ class ConfigurationManager {
// add new configuration at index
int add(const ConfigurationEntry& entry);

// add new learner configuration at index
int add_learner_conf(const ConfigurationEntry& entry);

// [1, first_index_kept) are being discarded
void truncate_prefix(int64_t first_index_kept);

// (last_index_kept, infinity) are being discarded
void truncate_suffix(int64_t last_index_kept);

void set_snapshot(const ConfigurationEntry& snapshot);
void set_snapshot(const ConfigurationEntry& snapshot, const ConfigurationEntry& learner_entry);

void get(int64_t last_included_index, ConfigurationEntry* entry);

void get_learner_conf(int64_t last_included_index, ConfigurationEntry* entry);

const ConfigurationEntry& last_configuration() const;

const ConfigurationEntry& last_learner_configuration() const;

private:

std::deque<ConfigurationEntry> _configurations;
std::deque<ConfigurationEntry> _learner_configurations;
ConfigurationEntry _snapshot;
ConfigurationEntry _learner_snapshot;
};

} // namespace braft
Expand Down
1 change: 1 addition & 0 deletions src/braft/enum.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ enum EntryType {
ENTRY_TYPE_NO_OP = 1;
ENTRY_TYPE_DATA = 2;
ENTRY_TYPE_CONFIGURATION= 3;
ENTRY_TYPE_LEARNER_CHANGE = 4;
};

enum ErrorType {
Expand Down
12 changes: 12 additions & 0 deletions src/braft/fsm_caller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// Xiong,Kai([email protected])

#include <butil/logging.h>
#include "braft/configuration_manager.h"
#include "braft/raft.h"
#include "braft/log_manager.h"
#include "braft/node.h"
Expand Down Expand Up @@ -288,6 +289,10 @@ void FSMCaller::do_committed(int64_t committed_index) {
iter_impl.entry()->id.index);
}
}
if (iter_impl.entry()->type == ENTRY_TYPE_LEARNER_CHANGE) {
_node->on_learner_config_apply(iter_impl.entry());
// TODO: should we notify state machine?
}
// For other entries, we have nothing to do besides flush the
// pending tasks and run this closure to notify the caller that the
// entries before this one were successfully committed and applied.
Expand Down Expand Up @@ -335,6 +340,8 @@ void FSMCaller::do_snapshot_save(SaveSnapshotClosure* done) {
meta.set_last_included_term(_last_applied_term);
ConfigurationEntry conf_entry;
_log_manager->get_configuration(last_applied_index, &conf_entry);
ConfigurationEntry learner_conf_entry;
_log_manager->get_learner_configuration(last_applied_index, &learner_conf_entry);
for (Configuration::const_iterator
iter = conf_entry.conf.begin();
iter != conf_entry.conf.end(); ++iter) {
Expand All @@ -345,6 +352,11 @@ void FSMCaller::do_snapshot_save(SaveSnapshotClosure* done) {
iter != conf_entry.old_conf.end(); ++iter) {
*meta.add_old_peers() = iter->to_string();
}
for (Configuration::const_iterator
iter = learner_conf_entry.conf.begin();
iter != learner_conf_entry.conf.end(); ++iter) {
*meta.add_learners() = iter->to_string();
}

SnapshotWriter* writer = done->start(meta);
if (!writer) {
Expand Down
41 changes: 41 additions & 0 deletions src/braft/log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <butil/fd_utility.h> // butil::make_close_on_exec
#include <brpc/reloadable_flags.h> //

#include "braft/enum.pb.h"
#include "braft/local_storage.pb.h"
#include "braft/log_entry.h"
#include "braft/protobuf_file.h"
Expand Down Expand Up @@ -323,6 +324,25 @@ int Segment::load(ConfigurationManager* configuration_manager) {
break;
}
}
if (header.type == ENTRY_TYPE_LEARNER_CHANGE) {
butil::IOBuf data;
if (_load_entry(entry_off, NULL, &data, skip_len) != 0) {
break;
}
scoped_refptr<LogEntry> entry = new LogEntry();
entry->id.index = i;
entry->id.term = header.term;
butil::Status status = parse_learner_meta(data, entry);
if (status.ok()) {
ConfigurationEntry conf_entry(*entry);
configuration_manager->add_learner_conf(conf_entry);
} else {
LOG(ERROR) << "fail to parse learner meta, path: " << _path
<< " entry_off " << entry_off;
ret = -1;
break;
}
}
_offset_and_term.push_back(std::make_pair(entry_off, header.term));
++actual_last_index;
entry_off += skip_len;
Expand Down Expand Up @@ -395,6 +415,16 @@ int Segment::append(const LogEntry* entry) {
}
}
break;
case ENTRY_TYPE_LEARNER_CHANGE:
{
butil::Status status = serialize_learner_meta(entry, data);
if (!status.ok()) {
LOG(ERROR) << "Fail to serialize learner's ConfigurationPBMeta, path: "
<< _path;
return -1;
}
}
break;
default:
LOG(FATAL) << "unknow entry type: " << entry->type
<< ", path: " << _path;
Expand Down Expand Up @@ -494,6 +524,17 @@ LogEntry* Segment::get(const int64_t index) const {
}
}
break;
case ENTRY_TYPE_LEARNER_CHANGE:
{
butil::Status status = parse_learner_meta(data, entry);
if (!status.ok()) {
LOG(WARNING) << "Fail to parse learner's ConfigurationPBMeta, path: "
<< _path;
ok = false;
break;
}
}
break;
default:
CHECK(false) << "Unknown entry type, path: " << _path;
break;
Expand Down
39 changes: 39 additions & 0 deletions src/braft/log_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,43 @@ butil::Status serialize_configuration_meta(const LogEntry* entry, butil::IOBuf&
return status;
}

butil::Status parse_learner_meta(const butil::IOBuf& data, LogEntry* entry) {
butil::Status status;
if (data.size() == 0) {
// All the learners were removed.
entry->peers = new std::vector<PeerId>;
return status;
}
ConfigurationPBMeta meta;
butil::IOBufAsZeroCopyInputStream wrapper(data);
if (!meta.ParseFromZeroCopyStream(&wrapper)) {
status.set_error(EINVAL, "Fail to parse learner's ConfigurationPBMeta");
return status;
}
entry->peers = new std::vector<PeerId>;
for (int j = 0; j < meta.peers_size(); ++j) {
entry->peers->push_back(PeerId(meta.peers(j)));
}
CHECK_EQ(meta.old_peers_size(), 0) << "Learner's old_peers should be empty";
return status;
}

butil::Status serialize_learner_meta(const LogEntry* entry, butil::IOBuf& data) {
butil::Status status;
if (entry->peers == NULL || entry->peers->empty()) {
// All the learners were removed.
return status;
}
ConfigurationPBMeta meta;
for (size_t i = 0; i < entry->peers->size(); ++i) {
meta.add_peers((*(entry->peers))[i].to_string());
}
CHECK(!entry->old_peers) << "Learner's old_peers should be empty";
butil::IOBufAsZeroCopyOutputStream wrapper(&data);
if (!meta.SerializeToZeroCopyStream(&wrapper)) {
status.set_error(EINVAL, "Fail to serialize learner's ConfigurationPBMeta");
}
return status;
}

}
4 changes: 4 additions & 0 deletions src/braft/log_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ butil::Status parse_configuration_meta(const butil::IOBuf& data, LogEntry* entry

butil::Status serialize_configuration_meta(const LogEntry* entry, butil::IOBuf& data);

butil::Status parse_learner_meta(const butil::IOBuf& data, LogEntry* entry);

butil::Status serialize_learner_meta(const LogEntry* entry, butil::IOBuf& data);

} // namespace braft

#endif //BRAFT_LOG_ENTRY_H
22 changes: 21 additions & 1 deletion src/braft/log_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
#include <bthread/unstable.h> // bthread_flush
#include <bthread/countdown_event.h> // bthread::CountdownEvent
#include <brpc/reloadable_flags.h> // BRPC_VALIDATE_GFLAG
#include <cstddef>
#include "braft/configuration.h"
#include "braft/enum.pb.h"
#include "braft/storage.h" // LogStorage
#include "braft/fsm_caller.h" // FSMCaller

Expand Down Expand Up @@ -635,11 +638,18 @@ void LogManager::set_snapshot(const SnapshotMeta* meta) {
for (int i = 0; i < meta->old_peers_size(); ++i) {
old_conf.add_peer(meta->old_peers(i));
}
Configuration learner_conf;
for (int i = 0; i < meta->learners_size(); ++i) {
learner_conf.add_peer(meta->learners(i));
}
ConfigurationEntry entry;
entry.id = LogId(meta->last_included_index(), meta->last_included_term());
entry.conf = conf;
entry.old_conf = old_conf;
_config_manager->set_snapshot(entry);
ConfigurationEntry learner_entry;
learner_entry.id = LogId(meta->last_included_index(), meta->last_included_term());
learner_entry.conf = learner_conf;
_config_manager->set_snapshot(entry, learner_entry);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Learner 不用两阶段,所以 learner 只用到了 peer,没有用到 old peer。

int64_t term = unsafe_get_term(meta->last_included_index());

const LogId last_but_one_snapshot_id = _last_snapshot_id;
Expand Down Expand Up @@ -783,6 +793,16 @@ void LogManager::get_configuration(const int64_t index, ConfigurationEntry* conf
return _config_manager->get(index, conf);
}

void LogManager::get_learner_configuration(const int64_t index, ConfigurationEntry* conf) {
BAIDU_SCOPED_LOCK(_mutex);
return _config_manager->get_learner_conf(index, conf);
}

void LogManager::set_learner_configuration(const ConfigurationEntry& conf) {
BAIDU_SCOPED_LOCK(_mutex);
_config_manager->add_learner_conf(conf);
}

bool LogManager::check_and_set_configuration(ConfigurationEntry* current) {
if (current == NULL) {
CHECK(false) << "current should not be NULL";
Expand Down
4 changes: 4 additions & 0 deletions src/braft/log_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ class BAIDU_CACHELINE_ALIGNMENT LogManager {

void get_configuration(int64_t index, ConfigurationEntry* conf);

void get_learner_configuration(int64_t index, ConfigurationEntry* conf);

void set_learner_configuration(const ConfigurationEntry& conf);

// Check if |current| should be updated to the latest configuration
// Returns true and |current| is assigned to the lastest configuration, returns
// false otherweise
Expand Down
Loading