Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(replication): slave blocks until keepalive timer is reached when master is gone without fin/rst notification #2662

Merged
merged 17 commits into from
Nov 16, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,20 @@ slave-read-only yes
# By default the priority is 100.
slave-priority 100

# Change the default timeout in milliseconds for socket connect during replication.
# The default value is 3100, and 0 means no timeout.
#
# If the master is unreachable before connecting, not having a timeout may block future
# 'clusterx setnodes' commands because the replication thread is blocked on connect.
replication-connect-timeout-ms 3100

# Change the default timeout in milliseconds for socket recv during fullsync.
# The default value is 3200, and 0 means no timeout.
#
# If the master is unreachable when fetching SST files, not having a timeout may block
# future 'clusterx setnodes' commands because the replication thread is blocked on recv.
replication-recv-timeout-ms 3200

# TCP listen() backlog.
#
# In high requests-per-second environments you need an high backlog in order
Expand Down
20 changes: 17 additions & 3 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,14 @@ void ReplicationThread::CallbacksStateMachine::Start() {
}

uint64_t last_connect_timestamp = 0;
int connect_timeout_ms = 3100;

while (!repl_->stop_flag_ && bev == nullptr) {
if (util::GetTimeStampMS() - last_connect_timestamp < 1000) {
// prevent frequent re-connect when the master is down with the connection refused error
sleep(1);
}
last_connect_timestamp = util::GetTimeStampMS();
auto cfd = util::SockConnect(repl_->host_, repl_->port_, connect_timeout_ms);
auto cfd = util::SockConnect(repl_->host_, repl_->port_, repl_->srv_->GetConfig()->replication_connect_timeout_ms);
if (!cfd) {
LOG(ERROR) << "[replication] Failed to connect the master, err: " << cfd.Msg();
continue;
Expand Down Expand Up @@ -777,7 +776,10 @@ Status ReplicationThread::parallelFetchFile(const std::string &dir,
}
auto exit = MakeScopeExit([ssl] { SSL_free(ssl); });
#endif
int sock_fd = GET_OR_RET(util::SockConnect(this->host_, this->port_, ssl).Prefixed("connect the server err"));
int sock_fd = GET_OR_RET(util::SockConnect(this->host_, this->port_, ssl,
this->srv_->GetConfig()->replication_connect_timeout_ms,
this->srv_->GetConfig()->replication_recv_timeout_ms)
.Prefixed("connect the server err"));
#ifdef ENABLE_OPENSSL
exit.Disable();
#endif
Expand Down Expand Up @@ -874,6 +876,12 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, const std::str
UniqueEvbufReadln line(evbuf, EVBUFFER_EOL_CRLF_STRICT);
if (!line) {
if (auto s = util::EvbufferRead(evbuf, sock_fd, -1, ssl); !s) {
if (s.Is<Status::TryAgain>()) {
if (stop_flag_) {
return {Status::NotOK, "replication thread was stopped"};
}
continue;
}
return std::move(s).Prefixed("read size");
}
continue;
Expand Down Expand Up @@ -907,6 +915,12 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, const std::str
remain -= data_len;
} else {
if (auto s = util::EvbufferRead(evbuf, sock_fd, -1, ssl); !s) {
if (s.Is<Status::TryAgain>()) {
if (stop_flag_) {
return {Status::NotOK, "replication thread was stopped"};
}
continue;
}
return std::move(s).Prefixed("read sst file");
}
}
Expand Down
12 changes: 10 additions & 2 deletions src/common/io_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,12 @@ StatusOr<int> EvbufferRead(evbuffer *buf, evutil_socket_t fd, int howmuch, [[may
howmuch = BUFFER_SIZE;
}
if (howmuch = SSL_read(ssl, tmp, howmuch); howmuch <= 0) {
return {Status::NotOK, fmt::format("failed to read from SSL connection: {}", fmt::streamed(SSLError(howmuch)))};
int err = SSL_get_error(ssl, howmuch);
if (err == SSL_ERROR_ZERO_RETURN) {
return {Status::EndOfFile, "EOF encountered while reading from SSL connection"};
}
return {(err == SSL_ERROR_WANT_READ) ? Status::TryAgain : Status::NotOK,
fmt::format("failed to read from SSL connection: {}", fmt::streamed(SSLError(howmuch)))};
}

if (int ret = evbuffer_add(buf, tmp, howmuch); ret == -1) {
Expand All @@ -514,8 +519,11 @@ StatusOr<int> EvbufferRead(evbuffer *buf, evutil_socket_t fd, int howmuch, [[may
#endif
if (int ret = evbuffer_read(buf, fd, howmuch); ret > 0) {
PragmaTwice marked this conversation as resolved.
Show resolved Hide resolved
return ret;
} else if (ret == 0) {
return {Status::EndOfFile, "EOF encountered while reading from socket"};
} else {
return {Status::NotOK, fmt::format("failed to read from socket: {}", strerror(errno))};
return {(errno == EWOULDBLOCK || errno == EAGAIN) ? Status::TryAgain : Status::NotOK,
fmt::format("failed to read from socket: {}", strerror(errno))};
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ class [[nodiscard]] Status {
// Search
NoPrefixMatched,
TypeMismatched,

// IO
TryAgain,
EndOfFile,
};

Status() : impl_{nullptr} {}
Expand Down
2 changes: 2 additions & 0 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ Config::Config() {
{"slave-empty-db-before-fullsync", false, new YesNoField(&slave_empty_db_before_fullsync, false)},
{"slave-priority", false, new IntField(&slave_priority, 100, 0, INT_MAX)},
{"slave-read-only", false, new YesNoField(&slave_readonly, true)},
{"replication-connect-timeout-ms", false, new IntField(&replication_connect_timeout_ms, 3100, 0, INT_MAX)},
{"replication-recv-timeout-ms", false, new IntField(&replication_recv_timeout_ms, 3200, 0, INT_MAX)},
{"use-rsid-psync", true, new YesNoField(&use_rsid_psync, false)},
{"profiling-sample-ratio", false, new IntField(&profiling_sample_ratio, 0, 0, 100)},
{"profiling-sample-record-max-len", false, new IntField(&profiling_sample_record_max_len, 256, 0, INT_MAX)},
Expand Down
2 changes: 2 additions & 0 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ struct Config {
bool slave_serve_stale_data = true;
bool slave_empty_db_before_fullsync = false;
int slave_priority = 100;
int replication_connect_timeout_ms = 3200;
torwig marked this conversation as resolved.
Show resolved Hide resolved
int replication_recv_timeout_ms = 3100;
sryanyuan marked this conversation as resolved.
Show resolved Hide resolved
int max_db_size = 0;
int max_replication_mb = 0;
int max_io_mb = 0;
Expand Down
Loading