diff --git a/kvrocks.conf b/kvrocks.conf index 0ff0ce50508..13b1fb6c7f4 100644 --- a/kvrocks.conf +++ b/kvrocks.conf @@ -174,6 +174,20 @@ slave-read-only yes # By default the priority is 100. slave-priority 100 +# Change the default timeout in milliseconds for socket connect during replication. +# The default value is 3100, and 0 means no timeout. +# +# If the master is unreachable before connecting, not having a timeout may block future +# 'clusterx setnodes' commands because the replication thread is blocked on connect. +replication-connect-timeout-ms 3100 + +# Change the default timeout in milliseconds for socket recv during fullsync. +# The default value is 3200, and 0 means no timeout. +# +# If the master is unreachable when fetching SST files, not having a timeout may block +# future 'clusterx setnodes' commands because the replication thread is blocked on recv. +replication-recv-timeout-ms 3200 + # TCP listen() backlog. # # In high requests-per-second environments you need an high backlog in order diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc index dff2d3d7956..cd7fe197a76 100644 --- a/src/cluster/replication.cc +++ b/src/cluster/replication.cc @@ -252,7 +252,6 @@ void ReplicationThread::CallbacksStateMachine::Start() { } uint64_t last_connect_timestamp = 0; - int connect_timeout_ms = 3100; while (!repl_->stop_flag_ && bev == nullptr) { if (util::GetTimeStampMS() - last_connect_timestamp < 1000) { @@ -260,7 +259,7 @@ void ReplicationThread::CallbacksStateMachine::Start() { sleep(1); } last_connect_timestamp = util::GetTimeStampMS(); - auto cfd = util::SockConnect(repl_->host_, repl_->port_, connect_timeout_ms); + auto cfd = util::SockConnect(repl_->host_, repl_->port_, repl_->srv_->GetConfig()->replication_connect_timeout_ms); if (!cfd) { LOG(ERROR) << "[replication] Failed to connect the master, err: " << cfd.Msg(); continue; @@ -777,7 +776,10 @@ Status ReplicationThread::parallelFetchFile(const std::string &dir, } auto exit = MakeScopeExit([ssl] { SSL_free(ssl); }); #endif - int sock_fd = GET_OR_RET(util::SockConnect(this->host_, this->port_, ssl).Prefixed("connect the server err")); + int sock_fd = GET_OR_RET(util::SockConnect(this->host_, this->port_, ssl, + this->srv_->GetConfig()->replication_connect_timeout_ms, + this->srv_->GetConfig()->replication_recv_timeout_ms) + .Prefixed("connect the server err")); #ifdef ENABLE_OPENSSL exit.Disable(); #endif @@ -874,6 +876,12 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, const std::str UniqueEvbufReadln line(evbuf, EVBUFFER_EOL_CRLF_STRICT); if (!line) { if (auto s = util::EvbufferRead(evbuf, sock_fd, -1, ssl); !s) { + if (s.Is()) { + if (stop_flag_) { + return {Status::NotOK, "replication thread was stopped"}; + } + continue; + } return std::move(s).Prefixed("read size"); } continue; @@ -907,6 +915,12 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, const std::str remain -= data_len; } else { if (auto s = util::EvbufferRead(evbuf, sock_fd, -1, ssl); !s) { + if (s.Is()) { + if (stop_flag_) { + return {Status::NotOK, "replication thread was stopped"}; + } + continue; + } return std::move(s).Prefixed("read sst file"); } } diff --git a/src/common/io_util.cc b/src/common/io_util.cc index 35fa80d9472..23cccc69fb7 100644 --- a/src/common/io_util.cc +++ b/src/common/io_util.cc @@ -502,7 +502,12 @@ StatusOr EvbufferRead(evbuffer *buf, evutil_socket_t fd, int howmuch, [[may howmuch = BUFFER_SIZE; } if (howmuch = SSL_read(ssl, tmp, howmuch); howmuch <= 0) { - return {Status::NotOK, fmt::format("failed to read from SSL connection: {}", fmt::streamed(SSLError(howmuch)))}; + int err = SSL_get_error(ssl, howmuch); + if (err == SSL_ERROR_ZERO_RETURN) { + return {Status::EndOfFile, "EOF encountered while reading from SSL connection"}; + } + return {(err == SSL_ERROR_WANT_READ) ? Status::TryAgain : Status::NotOK, + fmt::format("failed to read from SSL connection: {}", fmt::streamed(SSLError(howmuch)))}; } if (int ret = evbuffer_add(buf, tmp, howmuch); ret == -1) { @@ -514,8 +519,11 @@ StatusOr EvbufferRead(evbuffer *buf, evutil_socket_t fd, int howmuch, [[may #endif if (int ret = evbuffer_read(buf, fd, howmuch); ret > 0) { return ret; + } else if (ret == 0) { + return {Status::EndOfFile, "EOF encountered while reading from socket"}; } else { - return {Status::NotOK, fmt::format("failed to read from socket: {}", strerror(errno))}; + return {(errno == EWOULDBLOCK || errno == EAGAIN) ? Status::TryAgain : Status::NotOK, + fmt::format("failed to read from socket: {}", strerror(errno))}; } } diff --git a/src/common/status.h b/src/common/status.h index b4b228a05ef..823e5681c9b 100644 --- a/src/common/status.h +++ b/src/common/status.h @@ -75,6 +75,10 @@ class [[nodiscard]] Status { // Search NoPrefixMatched, TypeMismatched, + + // IO + TryAgain, + EndOfFile, }; Status() : impl_{nullptr} {} diff --git a/src/config/config.cc b/src/config/config.cc index 165e352f25d..c2491844253 100644 --- a/src/config/config.cc +++ b/src/config/config.cc @@ -203,6 +203,8 @@ Config::Config() { {"slave-empty-db-before-fullsync", false, new YesNoField(&slave_empty_db_before_fullsync, false)}, {"slave-priority", false, new IntField(&slave_priority, 100, 0, INT_MAX)}, {"slave-read-only", false, new YesNoField(&slave_readonly, true)}, + {"replication-connect-timeout-ms", false, new IntField(&replication_connect_timeout_ms, 3100, 0, INT_MAX)}, + {"replication-recv-timeout-ms", false, new IntField(&replication_recv_timeout_ms, 3200, 0, INT_MAX)}, {"use-rsid-psync", true, new YesNoField(&use_rsid_psync, false)}, {"profiling-sample-ratio", false, new IntField(&profiling_sample_ratio, 0, 0, 100)}, {"profiling-sample-record-max-len", false, new IntField(&profiling_sample_record_max_len, 256, 0, INT_MAX)}, diff --git a/src/config/config.h b/src/config/config.h index 3b9d99de2e9..3dcc8d87002 100644 --- a/src/config/config.h +++ b/src/config/config.h @@ -105,6 +105,8 @@ struct Config { bool slave_serve_stale_data = true; bool slave_empty_db_before_fullsync = false; int slave_priority = 100; + int replication_connect_timeout_ms = 3100; + int replication_recv_timeout_ms = 3200; int max_db_size = 0; int max_replication_mb = 0; int max_io_mb = 0;