From 4597af8b0f7d767067c23d1bc5123a6b30a054fb Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Thu, 3 Aug 2023 07:27:03 +0800 Subject: [PATCH 01/15] Support TLS for replication --- kvrocks.conf | 7 +++ src/cluster/replication.cc | 74 ++++++++++++++++++++++++------- src/cluster/replication.h | 7 +-- src/common/io_util.cc | 91 +++++++++++++++++++++++++++++++++++++- src/common/io_util.h | 13 ++++++ src/config/config.cc | 1 + src/config/config.h | 3 ++ src/server/worker.cc | 6 +-- 8 files changed, 177 insertions(+), 25 deletions(-) diff --git a/kvrocks.conf b/kvrocks.conf index 6e703edcef2..1113de40602 100644 --- a/kvrocks.conf +++ b/kvrocks.conf @@ -380,6 +380,13 @@ redis-cursor-compatible no # # tls-session-cache-timeout 60 +# By default, a replica does not attempt to establish a TLS connection +# with its master. +# +# Use the following directive to enable TLS on replication links. +# +# tls-replication yes + ################################## SLOW LOG ################################### # The Kvrocks Slow Log is a mechanism to log queries that exceeded a specified diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc index 39a2ade7c2c..a6c6c8c39b2 100644 --- a/src/cluster/replication.cc +++ b/src/cluster/replication.cc @@ -37,6 +37,7 @@ #include "fmt/format.h" #include "io_util.h" #include "rocksdb_crc32c.h" +#include "scope_exit.h" #include "server/redis_reply.h" #include "server/server.h" #include "status.h" @@ -45,6 +46,12 @@ #include "time_util.h" #include "unique_fd.h" +#ifdef ENABLE_OPENSSL +#include +#include +#include +#endif + Status FeedSlaveThread::Start() { auto s = util::CreateThread("feed-replica", [this] { sigset_t mask, omask; @@ -54,7 +61,7 @@ Status FeedSlaveThread::Start() { sigaddset(&mask, SIGHUP); sigaddset(&mask, SIGPIPE); pthread_sigmask(SIG_BLOCK, &mask, &omask); - auto s = util::SockSend(conn_->GetFD(), "+OK\r\n"); + auto s = util::SockSend(conn_->GetFD(), "+OK\r\n", conn_->GetBufferEvent()); if (!s.IsOK()) { LOG(ERROR) << "failed to send OK response to the replica: " << s.Msg(); return; @@ -85,7 +92,7 @@ void FeedSlaveThread::Join() { void FeedSlaveThread::checkLivenessIfNeed() { if (++interval_ % 1000) return; const auto ping_command = redis::BulkString("ping"); - auto s = util::SockSend(conn_->GetFD(), ping_command); + auto s = util::SockSend(conn_->GetFD(), ping_command, conn_->GetBufferEvent()); if (!s.IsOK()) { LOG(ERROR) << "Ping slave[" << conn_->GetAddr() << "] err: " << s.Msg() << ", would stop the thread"; Stop(); @@ -134,7 +141,7 @@ void FeedSlaveThread::loop() { if (is_first_repl_batch || batches_bulk.size() >= kMaxDelayBytes || updates_in_batches >= kMaxDelayUpdates || srv_->storage->LatestSeqNumber() - batch.sequence <= kMaxDelayUpdates) { // Send entire bulk which contain multiple batches - auto s = util::SockSend(conn_->GetFD(), batches_bulk); + auto s = util::SockSend(conn_->GetFD(), batches_bulk, conn_->GetBufferEvent()); if (!s.IsOK()) { LOG(ERROR) << "Write error while sending batch to slave: " << s.Msg() << ". batches: 0x" << util::StringToHex(batches_bulk); @@ -261,12 +268,35 @@ void ReplicationThread::CallbacksStateMachine::Start() { LOG(ERROR) << "[replication] Failed to connect the master, err: " << cfd.Msg(); continue; } +#ifdef ENABLE_OPENSSL + SSL *ssl = nullptr; + if (repl_->srv_->GetConfig()->tls_replication) { + ssl = SSL_new(repl_->srv_->ssl_ctx.get()); + if (!ssl) { + LOG(ERROR) << "Failed to construct SSL structure for new connection: " << SSLErrors{}; + evutil_closesocket(*cfd); + return; + } + bev = bufferevent_openssl_socket_new(repl_->base_, *cfd, ssl, BUFFEREVENT_SSL_CONNECTING, BEV_OPT_CLOSE_ON_FREE); + } else { + bev = bufferevent_socket_new(repl_->base_, *cfd, BEV_OPT_CLOSE_ON_FREE); + } +#else bev = bufferevent_socket_new(repl_->base_, *cfd, BEV_OPT_CLOSE_ON_FREE); +#endif if (bev == nullptr) { +#ifdef ENABLE_OPENSSL + if (ssl) SSL_free(ssl); +#endif close(*cfd); LOG(ERROR) << "[replication] Failed to create the event socket"; continue; } +#ifdef ENABLE_OPENSSL + if (repl_->srv_->GetConfig()->tls_replication) { + bufferevent_openssl_set_allow_dirty_shutdown(bev, 1); + } +#endif } if (bev == nullptr) { // failed to connect the master and received the stop signal return; @@ -743,9 +773,19 @@ Status ReplicationThread::parallelFetchFile(const std::string &dir, if (this->stop_flag_) { return {Status::NotOK, "replication thread was stopped"}; } - int sock_fd = GET_OR_RET(util::SockConnect(this->host_, this->port_).Prefixed("connect the server err")); + ssl_st *ssl = nullptr; +#ifdef ENABLE_OPENSSL + if (this->srv_->GetConfig()->tls_replication) { + ssl = SSL_new(this->srv_->ssl_ctx.get()); + } + auto exit = MakeScopeExit([ssl] { SSL_free(ssl); }); +#endif + int sock_fd = GET_OR_RET(util::SockConnect(this->host_, this->port_, ssl).Prefixed("connect the server err")); +#ifdef ENABLE_OPENSSL + exit.Disable(); +#endif UniqueFD unique_fd{sock_fd}; - auto s = this->sendAuth(sock_fd); + auto s = this->sendAuth(sock_fd, ssl); if (!s.IsOK()) { return s.Prefixed("send the auth command err"); } @@ -785,12 +825,12 @@ Status ReplicationThread::parallelFetchFile(const std::string &dir, // command, so we need to fetch all files by multiple command interactions. if (srv_->GetConfig()->master_use_repl_port) { for (unsigned i = 0; i < fetch_files.size(); i++) { - s = this->fetchFiles(sock_fd, dir, {fetch_files[i]}, {crcs[i]}, fn); + s = this->fetchFiles(sock_fd, dir, {fetch_files[i]}, {crcs[i]}, fn, ssl); if (!s.IsOK()) break; } } else { if (!fetch_files.empty()) { - s = this->fetchFiles(sock_fd, dir, fetch_files, crcs, fn); + s = this->fetchFiles(sock_fd, dir, fetch_files, crcs, fn, ssl); } } return s; @@ -805,13 +845,13 @@ Status ReplicationThread::parallelFetchFile(const std::string &dir, return Status::OK(); } -Status ReplicationThread::sendAuth(int sock_fd) { +Status ReplicationThread::sendAuth(int sock_fd, ssl_st *ssl) { // Send auth when needed std::string auth = srv_->GetConfig()->masterauth; if (!auth.empty()) { UniqueEvbuf evbuf; const auto auth_command = redis::MultiBulkString({"AUTH", auth}); - auto s = util::SockSend(sock_fd, auth_command); + auto s = util::SockSend(sock_fd, auth_command, ssl); if (!s.IsOK()) return s.Prefixed("send auth command err"); while (true) { if (evbuffer_read(evbuf.get(), sock_fd, -1) <= 0) { @@ -829,15 +869,15 @@ Status ReplicationThread::sendAuth(int sock_fd) { } Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, const std::string &dir, const std::string &file, - uint32_t crc, const FetchFileCallback &fn) { + uint32_t crc, const FetchFileCallback &fn, ssl_st *ssl) { size_t file_size = 0; // Read file size line while (true) { UniqueEvbufReadln line(evbuf, EVBUFFER_EOL_CRLF_STRICT); if (!line) { - if (evbuffer_read(evbuf, sock_fd, -1) <= 0) { - return {Status::NotOK, fmt::format("read size: {}", strerror(errno))}; + if (auto s = util::EvbufferRead(evbuf, sock_fd, -1, ssl); !s) { + return std::move(s).Prefixed("read size"); } continue; } @@ -869,8 +909,8 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, const std::str tmp_crc = rocksdb::crc32c::Extend(tmp_crc, data, data_len); remain -= data_len; } else { - if (evbuffer_read(evbuf, sock_fd, -1) <= 0) { - return {Status::NotOK, fmt::format("read sst file: {}", strerror(errno))}; + if (auto s = util::EvbufferRead(evbuf, sock_fd, -1, ssl); !s) { + return std::move(s).Prefixed("read sst file"); } } } @@ -888,7 +928,7 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, const std::str } Status ReplicationThread::fetchFiles(int sock_fd, const std::string &dir, const std::vector &files, - const std::vector &crcs, const FetchFileCallback &fn) { + const std::vector &crcs, const FetchFileCallback &fn, ssl_st *ssl) { std::string files_str; for (const auto &file : files) { files_str += file; @@ -897,13 +937,13 @@ Status ReplicationThread::fetchFiles(int sock_fd, const std::string &dir, const files_str.pop_back(); const auto fetch_command = redis::MultiBulkString({"_fetch_file", files_str}); - auto s = util::SockSend(sock_fd, fetch_command); + auto s = util::SockSend(sock_fd, fetch_command, ssl); if (!s.IsOK()) return s.Prefixed("send fetch file command"); UniqueEvbuf evbuf; for (unsigned i = 0; i < files.size(); i++) { DLOG(INFO) << "[fetch] Start to fetch file " << files[i]; - s = fetchFile(sock_fd, evbuf.get(), dir, files[i], crcs[i], fn); + s = fetchFile(sock_fd, evbuf.get(), dir, files[i], crcs[i], fn, ssl); if (!s.IsOK()) { s = Status(Status::NotOK, "fetch file err: " + s.Msg()); LOG(WARNING) << "[fetch] Fail to fetch file " << files[i] << ", err: " << s.Msg(); diff --git a/src/cluster/replication.h b/src/cluster/replication.h index 2b6f4c86cc2..8e5e3fe793b 100644 --- a/src/cluster/replication.h +++ b/src/cluster/replication.h @@ -32,6 +32,7 @@ #include #include "event_util.h" +#include "io_util.h" #include "server/redis_connection.h" #include "status.h" #include "storage/storage.h" @@ -194,11 +195,11 @@ class ReplicationThread : private EventCallbackBase { static CBState fullSyncReadCB(bufferevent *bev, void *ctx); // Synchronized-Blocking ops - Status sendAuth(int sock_fd); + Status sendAuth(int sock_fd, ssl_st *ssl); Status fetchFile(int sock_fd, evbuffer *evbuf, const std::string &dir, const std::string &file, uint32_t crc, - const FetchFileCallback &fn); + const FetchFileCallback &fn, ssl_st *ssl); Status fetchFiles(int sock_fd, const std::string &dir, const std::vector &files, - const std::vector &crcs, const FetchFileCallback &fn); + const std::vector &crcs, const FetchFileCallback &fn, ssl_st *ssl); Status parallelFetchFile(const std::string &dir, const std::vector> &files); static bool isRestoringError(const char *err); static bool isWrongPsyncNum(const char *err); diff --git a/src/common/io_util.cc b/src/common/io_util.cc index cb30a48047b..f68c90ac23a 100644 --- a/src/common/io_util.cc +++ b/src/common/io_util.cc @@ -26,13 +26,23 @@ #include #include #include +#include #include #include +#include "fmt/ostream.h" +#include "server/tls_util.h" + #ifdef __linux__ #include #endif +#ifdef ENABLE_OPENSSL +#include + +#include "event2/bufferevent_ssl.h" +#endif + #include "event_util.h" #include "scope_exit.h" #include "unique_fd.h" @@ -387,8 +397,8 @@ std::vector GetLocalIPAddresses() { return ip_addresses; } -template -Status WriteImpl(int fd, std::string_view data, Args &&...args) { +template +Status WriteImpl(FD fd, std::string_view data, Args &&...args) { ssize_t n = 0; while (n < static_cast(data.size())) { ssize_t nwritten = syscall(fd, data.data() + n, data.size() - n, std::forward(args)...); @@ -404,4 +414,81 @@ Status Write(int fd, const std::string &data) { return WriteImpl(fd, data Status Pwrite(int fd, const std::string &data, off_t offset) { return WriteImpl(fd, data, offset); } +Status SockSend(int fd, const std::string &data, ssl_st *ssl) { +#ifdef ENABLE_OPENSSL + if (ssl) { + return WriteImpl(ssl, data); + } else { + return SockSend(fd, data); + } +#else + return SockSend(fd, data); +#endif +} + +Status SockSend(int fd, const std::string &data, bufferevent *bev) { +#ifdef ENABLE_OPENSSL + return SockSend(fd, data, bufferevent_openssl_get_ssl(bev)); +#else + return SockSend(fd, data); +#endif +} + +StatusOr SockConnect(const std::string &host, uint32_t port, ssl_st *ssl, int conn_timeout, int timeout) { +#ifdef ENABLE_OPENSSL + if (ssl) { + auto fd = GET_OR_RET(SockConnect(host, port, conn_timeout, timeout)); + SSL_set_fd(ssl, fd); + + auto bio = BIO_new_socket(fd, BIO_NOCLOSE); + SSL_set_bio(ssl, bio, bio); + + if (int err = SSL_connect(ssl); err != 1) { + BIO_free(bio); + return {Status::NotOK, fmt::format("socket failed to do SSL handshake: {}", fmt::streamed(SSLError(err)))}; + } + + return fd; + } else { + return SockConnect(host, port, conn_timeout, timeout); + } +#else + return SockConnect(host, port, conn_timeout, timeout); +#endif +} + +StatusOr EvbufferRead(evbuffer *buf, evutil_socket_t fd, int howmuch, ssl_st *ssl) { +#ifdef ENABLE_OPENSSL + if (ssl) { + constexpr int BUFFER_SIZE = 4096; + char tmp[BUFFER_SIZE]; + + if (howmuch <= 0 || howmuch > BUFFER_SIZE) { + howmuch = BUFFER_SIZE; + } + if (howmuch = SSL_read(ssl, tmp, howmuch); howmuch <= 0) { + return {Status::NotOK, fmt::format("failed to read from SSL connection: {}", fmt::streamed(SSLError(howmuch)))}; + } + + if (int ret = evbuffer_add(buf, tmp, howmuch); ret == -1) { + return {Status::NotOK, fmt::format("failed to add buffer: {}", strerror(errno))}; + } + + return howmuch; + } else { + if (int ret = evbuffer_read(buf, fd, howmuch); ret != -1) { + return ret; + } else { + return {Status::NotOK, fmt::format("failed to read from socket: {}", strerror(errno))}; + } + } +#else + if (int ret = evbuffer_read(buf, fd, howmuch); ret != -1) { + return ret; + } else { + return {Status::NotOK, fmt::format("failed to read from socket: {}", strerror(errno))}; + } +#endif +} + } // namespace util diff --git a/src/common/io_util.h b/src/common/io_util.h index 8391d2cc129..d9bae04bc56 100644 --- a/src/common/io_util.h +++ b/src/common/io_util.h @@ -22,8 +22,14 @@ #include +#include "event2/util.h" #include "status.h" +// forward declarations +struct ssl_st; +struct bufferevent; +struct evbuffer; + namespace util { sockaddr_in NewSockaddrInet(const std::string &host, uint32_t port); @@ -46,4 +52,11 @@ int AeWait(int fd, int mask, int milliseconds); Status Write(int fd, const std::string &data); Status Pwrite(int fd, const std::string &data, off_t offset); +Status SockSend(int fd, const std::string &data, ssl_st *ssl); +Status SockSend(int fd, const std::string &data, bufferevent *bev); + +StatusOr SockConnect(const std::string &host, uint32_t port, ssl_st *ssl, int conn_timeout = 0, int timeout = 0); + +StatusOr EvbufferRead(evbuffer *buf, evutil_socket_t fd, int howmuch, ssl_st *ssl); + } // namespace util diff --git a/src/config/config.cc b/src/config/config.cc index 08ab4c96ffb..cea31b964ba 100644 --- a/src/config/config.cc +++ b/src/config/config.cc @@ -98,6 +98,7 @@ Config::Config() { {"tls-session-caching", false, new YesNoField(&tls_session_caching, true)}, {"tls-session-cache-size", false, new IntField(&tls_session_cache_size, 1024 * 20, 0, INT_MAX)}, {"tls-session-cache-timeout", false, new IntField(&tls_session_cache_timeout, 300, 0, INT_MAX)}, + {"tls-replication", true, new YesNoField(&tls_replication, false)}, #endif {"workers", true, new IntField(&workers, 8, 1, 256)}, {"timeout", false, new IntField(&timeout, 0, 0, INT_MAX)}, diff --git a/src/config/config.h b/src/config/config.h index 3eabc3bed05..8a40cdc8958 100644 --- a/src/config/config.h +++ b/src/config/config.h @@ -74,6 +74,7 @@ struct Config { Config(); ~Config() = default; uint32_t port = 0; + uint32_t tls_port = 0; std::string tls_cert_file; std::string tls_key_file; @@ -88,6 +89,8 @@ struct Config { bool tls_session_caching = true; int tls_session_cache_size = 1024 * 20; int tls_session_cache_timeout = 300; + bool tls_replication = false; + int workers = 0; int timeout = 0; int log_level = 0; diff --git a/src/server/worker.cc b/src/server/worker.cc index befaf97a10d..ef54608c309 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -130,8 +130,8 @@ void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sock BEV_OPT_THREADSAFE | BEV_OPT_DEFER_CALLBACKS | BEV_OPT_UNLOCK_CALLBACKS | BEV_OPT_CLOSE_ON_FREE; bufferevent *bev = nullptr; + ssl_st *ssl = nullptr; #ifdef ENABLE_OPENSSL - SSL *ssl = nullptr; if (uint32_t(local_port) == worker->svr->GetConfig()->tls_port) { ssl = SSL_new(worker->svr->ssl_ctx.get()); if (!ssl) { @@ -169,7 +169,7 @@ void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sock s = worker->AddConnection(conn); if (!s.IsOK()) { std::string err_msg = redis::Error("ERR " + s.Msg()); - s = util::SockSend(fd, err_msg); + s = util::SockSend(fd, err_msg, ssl); if (!s.IsOK()) { LOG(WARNING) << "Failed to send error response to socket: " << s.Msg(); } @@ -319,7 +319,7 @@ Status Worker::AddConnection(redis::Connection *c) { return {Status::NotOK, "max number of clients reached"}; } - conns_.insert(std::pair(c->GetFD(), c)); + conns_.emplace(c->GetFD(), c); uint64_t id = svr->GetClientID(); c->SetID(id); From 3edf5d12a53d9956e816a336399aa16c87225e56 Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Thu, 3 Aug 2023 08:05:10 +0800 Subject: [PATCH 02/15] fix --- src/common/io_util.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/src/common/io_util.cc b/src/common/io_util.cc index f68c90ac23a..630039dba4f 100644 --- a/src/common/io_util.cc +++ b/src/common/io_util.cc @@ -26,7 +26,6 @@ #include #include #include -#include #include #include From c83d423205ef89bd12ac69181064f9efc94dcc8e Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Thu, 3 Aug 2023 10:45:31 +0800 Subject: [PATCH 03/15] fix --- src/common/io_util.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/common/io_util.cc b/src/common/io_util.cc index 630039dba4f..f3fe4824008 100644 --- a/src/common/io_util.cc +++ b/src/common/io_util.cc @@ -475,14 +475,14 @@ StatusOr EvbufferRead(evbuffer *buf, evutil_socket_t fd, int howmuch, ssl_s return howmuch; } else { - if (int ret = evbuffer_read(buf, fd, howmuch); ret != -1) { + if (int ret = evbuffer_read(buf, fd, howmuch); ret <= 0) { return ret; } else { return {Status::NotOK, fmt::format("failed to read from socket: {}", strerror(errno))}; } } #else - if (int ret = evbuffer_read(buf, fd, howmuch); ret != -1) { + if (int ret = evbuffer_read(buf, fd, howmuch); ret <= 0) { return ret; } else { return {Status::NotOK, fmt::format("failed to read from socket: {}", strerror(errno))}; From 3d08909638f60e6fe72622159067ce2a7913a2f2 Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Thu, 3 Aug 2023 13:37:31 +0800 Subject: [PATCH 04/15] fix --- src/common/io_util.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/common/io_util.cc b/src/common/io_util.cc index f3fe4824008..42beb4127f0 100644 --- a/src/common/io_util.cc +++ b/src/common/io_util.cc @@ -475,14 +475,14 @@ StatusOr EvbufferRead(evbuffer *buf, evutil_socket_t fd, int howmuch, ssl_s return howmuch; } else { - if (int ret = evbuffer_read(buf, fd, howmuch); ret <= 0) { + if (int ret = evbuffer_read(buf, fd, howmuch); ret > 0) { return ret; } else { return {Status::NotOK, fmt::format("failed to read from socket: {}", strerror(errno))}; } } #else - if (int ret = evbuffer_read(buf, fd, howmuch); ret <= 0) { + if (int ret = evbuffer_read(buf, fd, howmuch); ret > 0) { return ret; } else { return {Status::NotOK, fmt::format("failed to read from socket: {}", strerror(errno))}; From d75f85d4ba9a96a180bafef67320da048f5d05bb Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Thu, 3 Aug 2023 14:54:16 +0800 Subject: [PATCH 05/15] add test --- tests/gocase/tls/tls_test.go | 38 ++++++++++++++++++++++++++++++++++++ tests/gocase/util/server.go | 4 ++++ 2 files changed, 42 insertions(+) diff --git a/tests/gocase/tls/tls_test.go b/tests/gocase/tls/tls_test.go index 77df2202425..abe42cda6e5 100644 --- a/tests/gocase/tls/tls_test.go +++ b/tests/gocase/tls/tls_test.go @@ -22,7 +22,9 @@ package tls import ( "context" "crypto/tls" + "fmt" "testing" + "time" "github.com/apache/kvrocks/tests/gocase/util" "github.com/redis/go-redis/v9" @@ -136,3 +138,39 @@ func TestTLS(t *testing.T) { require.NoError(t, rdb.ConfigSet(ctx, "tls-ciphers", "DEFAULT").Err()) }) } + +func TestTLSReplica(t *testing.T) { + if !util.TLSEnable() { + t.Skip("TLS tests run only if tls enabled.") + } + + ctx := context.Background() + + srv := util.StartTLSServer(t, map[string]string{"tls-replication": "yes"}) + defer srv.Close() + + defaultTLSConfig, err := util.DefaultTLSConfig() + require.NoError(t, err) + + sc := srv.NewClientWithOption(&redis.Options{TLSConfig: defaultTLSConfig, Addr: srv.TLSAddr()}) + defer func() { require.NoError(t, sc.Close()) }() + + replica := util.StartTLSServer(t, map[string]string{ + "tls-replication": "yes", + "slaveof": fmt.Sprintf("%s %d", srv.Host(), srv.TLSPort()), + }) + defer replica.Close() + + rc := replica.NewClientWithOption(&redis.Options{TLSConfig: defaultTLSConfig, Addr: replica.TLSAddr()}) + defer func() { require.NoError(t, rc.Close()) }() + + t.Run("TLS: Simple test for replication", func(t *testing.T) { + require.Equal(t, rc.Get(ctx, "a").Val(), "") + require.Equal(t, rc.Get(ctx, "b").Val(), "") + require.NoError(t, sc.Set(ctx, "a", "1", 0).Err()) + require.NoError(t, sc.Set(ctx, "b", "2", 0).Err()) + time.Sleep(100 * time.Millisecond) + require.Equal(t, rc.Get(ctx, "a").Val(), "1") + require.Equal(t, rc.Get(ctx, "b").Val(), "2") + }) +} diff --git a/tests/gocase/util/server.go b/tests/gocase/util/server.go index 00a90350020..b61e000b776 100644 --- a/tests/gocase/util/server.go +++ b/tests/gocase/util/server.go @@ -62,6 +62,10 @@ func (s *KvrocksServer) Port() uint64 { return uint64(s.addr.AddrPort().Port()) } +func (s *KvrocksServer) TLSPort() uint64 { + return uint64(s.tlsAddr.AddrPort().Port()) +} + func (s *KvrocksServer) TLSAddr() string { return s.tlsAddr.String() } From 02796fb334b8a8763ef1d65797629f8d55fe398f Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Thu, 3 Aug 2023 16:43:46 +0800 Subject: [PATCH 06/15] more tests --- tests/gocase/tls/tls_test.go | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/tests/gocase/tls/tls_test.go b/tests/gocase/tls/tls_test.go index abe42cda6e5..95520d35d57 100644 --- a/tests/gocase/tls/tls_test.go +++ b/tests/gocase/tls/tls_test.go @@ -146,7 +146,7 @@ func TestTLSReplica(t *testing.T) { ctx := context.Background() - srv := util.StartTLSServer(t, map[string]string{"tls-replication": "yes"}) + srv := util.StartTLSServer(t, map[string]string{}) defer srv.Close() defaultTLSConfig, err := util.DefaultTLSConfig() @@ -164,13 +164,31 @@ func TestTLSReplica(t *testing.T) { rc := replica.NewClientWithOption(&redis.Options{TLSConfig: defaultTLSConfig, Addr: replica.TLSAddr()}) defer func() { require.NoError(t, rc.Close()) }() - t.Run("TLS: Simple test for replication", func(t *testing.T) { + t.Run("TLS: Replication (incremental)", func(t *testing.T) { require.Equal(t, rc.Get(ctx, "a").Val(), "") require.Equal(t, rc.Get(ctx, "b").Val(), "") require.NoError(t, sc.Set(ctx, "a", "1", 0).Err()) require.NoError(t, sc.Set(ctx, "b", "2", 0).Err()) - time.Sleep(100 * time.Millisecond) + time.Sleep(500 * time.Millisecond) require.Equal(t, rc.Get(ctx, "a").Val(), "1") require.Equal(t, rc.Get(ctx, "b").Val(), "2") }) + + replica2 := util.StartTLSServer(t, map[string]string{ + "tls-replication": "yes", + "slaveof": fmt.Sprintf("%s %d", srv.Host(), srv.TLSPort()), + }) + defer replica2.Close() + + rc2 := replica2.NewClientWithOption(&redis.Options{TLSConfig: defaultTLSConfig, Addr: replica2.TLSAddr()}) + defer func() { require.NoError(t, rc2.Close()) }() + + t.Run("TLS: Replication (full)", func(t *testing.T) { + require.NoError(t, sc.Set(ctx, "c", "3", 0).Err()) + time.Sleep(500 * time.Millisecond) + require.Equal(t, rc2.Get(ctx, "a").Val(), "1") + require.Equal(t, rc2.Get(ctx, "b").Val(), "2") + require.Equal(t, rc2.Get(ctx, "c").Val(), "3") + require.Equal(t, rc.Get(ctx, "c").Val(), "3") + }) } From 54071704333086eda915ab7c3307d1cc9935211d Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Thu, 3 Aug 2023 18:52:20 +0800 Subject: [PATCH 07/15] fix cmd_replication.cc --- src/commands/cmd_replication.cc | 10 ++++----- src/common/io_util.cc | 39 +++++++++++++++++++++++++++------ src/common/io_util.h | 4 +++- tests/gocase/tls/tls_test.go | 1 + 4 files changed, 41 insertions(+), 13 deletions(-) diff --git a/src/commands/cmd_replication.cc b/src/commands/cmd_replication.cc index 8ccdfc19de1..23ae269acbe 100644 --- a/src/commands/cmd_replication.cc +++ b/src/commands/cmd_replication.cc @@ -102,7 +102,7 @@ class CommandPSync : public Commander { s = svr->AddSlave(conn, next_repl_seq_); if (!s.IsOK()) { std::string err = "-ERR " + s.Msg() + "\r\n"; - s = util::SockSend(conn->GetFD(), err); + s = util::SockSend(conn->GetFD(), err, conn->GetBufferEvent()); if (!s.IsOK()) { LOG(WARNING) << "failed to send error message to the replica: " << s.Msg(); } @@ -229,7 +229,7 @@ class CommandFetchMeta : public Commander { std::string files; auto s = engine::Storage::ReplDataManager::GetFullReplDataInfo(svr->storage, &files); if (!s.IsOK()) { - s = util::SockSend(repl_fd, "-ERR can't create db checkpoint"); + s = util::SockSend(repl_fd, "-ERR can't create db checkpoint", bev); if (!s.IsOK()) { LOG(WARNING) << "[replication] Failed to send error response: " << s.Msg(); } @@ -237,7 +237,7 @@ class CommandFetchMeta : public Commander { return; } // Send full data file info - if (util::SockSend(repl_fd, files + CRLF).IsOK()) { + if (util::SockSend(repl_fd, files + CRLF, bev).IsOK()) { LOG(INFO) << "[replication] Succeed sending full data file info to " << ip; } else { LOG(WARNING) << "[replication] Fail to send full data file info " << ip << ", error: " << strerror(errno); @@ -291,8 +291,8 @@ class CommandFetchFile : public Commander { if (!fd) break; // Send file size and content - if (util::SockSend(repl_fd, std::to_string(file_size) + CRLF).IsOK() && - util::SockSendFile(repl_fd, *fd, file_size).IsOK()) { + if (util::SockSend(repl_fd, std::to_string(file_size) + CRLF, bev).IsOK() && + util::SockSendFile(repl_fd, *fd, file_size, bev).IsOK()) { LOG(INFO) << "[replication] Succeed sending file " << file << " to " << ip; } else { LOG(WARNING) << "[replication] Fail to send file " << file << " to " << ip << ", error: " << strerror(errno); diff --git a/src/common/io_util.cc b/src/common/io_util.cc index 42beb4127f0..643d58d1a80 100644 --- a/src/common/io_util.cc +++ b/src/common/io_util.cc @@ -203,7 +203,7 @@ StatusOr SockConnect(const std::string &host, uint32_t port, int conn_timeo // NOTE: fd should be blocking here Status SockSend(int fd, const std::string &data) { return Write(fd, data); } -// Implements SockSendFileCore to transfer data between file descriptors and +// Implements SockSendFileImpl to transfer data between file descriptors and // avoid transferring data to and from user space. // // The function prototype is just like sendfile(2) on Linux. in_fd is a file @@ -213,7 +213,7 @@ Status SockSend(int fd, const std::string &data) { return Write(fd, data); } // // The return value is the number of bytes written to out_fd, if the transfer // was successful. On error, -1 is returned, and errno is set appropriately. -ssize_t SockSendFileCore(int out_fd, int in_fd, off_t offset, size_t count) { +ssize_t SendFileImpl(int out_fd, int in_fd, off_t offset, size_t count) { #if defined(__linux__) return sendfile(out_fd, in_fd, &offset, count); @@ -224,18 +224,19 @@ ssize_t SockSendFileCore(int out_fd, int in_fd, off_t offset, size_t count) { else return (ssize_t)len; -#endif +#else errno = ENOSYS; return -1; + +#endif } -// Send file by sendfile actually according to different operation systems, -// please note that, the out socket fd should be in blocking mode. -Status SockSendFile(int out_fd, int in_fd, size_t size) { +template +Status SockSendFileImpl(FD out_fd, int in_fd, size_t size, Args... args) { off_t offset = 0; while (size != 0) { size_t n = size <= 16 * 1024 ? size : 16 * 1024; - ssize_t nwritten = SockSendFileCore(out_fd, in_fd, offset, n); + ssize_t nwritten = F(out_fd, in_fd, offset, n, args...); if (nwritten == -1) { if (errno == EINTR) continue; @@ -248,6 +249,30 @@ Status SockSendFile(int out_fd, int in_fd, size_t size) { return Status::OK(); } +// Send file by sendfile actually according to different operation systems, +// please note that, the out socket fd should be in blocking mode. +Status SockSendFile(int out_fd, int in_fd, size_t size) { return SockSendFileImpl(out_fd, in_fd, size); } + +Status SockSendFile(int out_fd, int in_fd, size_t size, ssl_st *ssl) { +#ifdef ENABLE_OPENSSL + if (ssl) { + return SockSendFileImpl(ssl, in_fd, size, 0); + } else { + return SockSendFile(out_fd, in_fd, size); + } +#else + return SockSendFile(out_fd, in_fd, size); +#endif +} + +Status SockSendFile(int out_fd, int in_fd, size_t size, bufferevent *bev) { +#ifdef ENABLE_OPENSSL + return SockSendFile(out_fd, in_fd, size, bufferevent_openssl_get_ssl(bev)); +#else + return SockSendFile(out_fd, in_fd, size); +#endif +} + Status SockSetBlocking(int fd, int blocking) { int flags = 0; // Old flags diff --git a/src/common/io_util.h b/src/common/io_util.h index d9bae04bc56..2117e077fc5 100644 --- a/src/common/io_util.h +++ b/src/common/io_util.h @@ -55,8 +55,10 @@ Status Pwrite(int fd, const std::string &data, off_t offset); Status SockSend(int fd, const std::string &data, ssl_st *ssl); Status SockSend(int fd, const std::string &data, bufferevent *bev); -StatusOr SockConnect(const std::string &host, uint32_t port, ssl_st *ssl, int conn_timeout = 0, int timeout = 0); +Status SockSendFile(int out_fd, int in_fd, size_t size, ssl_st *ssl); +Status SockSendFile(int out_fd, int in_fd, size_t size, bufferevent *bev); +StatusOr SockConnect(const std::string &host, uint32_t port, ssl_st *ssl, int conn_timeout = 0, int timeout = 0); StatusOr EvbufferRead(evbuffer *buf, evutil_socket_t fd, int howmuch, ssl_st *ssl); } // namespace util diff --git a/tests/gocase/tls/tls_test.go b/tests/gocase/tls/tls_test.go index 95520d35d57..e9b200cb14b 100644 --- a/tests/gocase/tls/tls_test.go +++ b/tests/gocase/tls/tls_test.go @@ -165,6 +165,7 @@ func TestTLSReplica(t *testing.T) { defer func() { require.NoError(t, rc.Close()) }() t.Run("TLS: Replication (incremental)", func(t *testing.T) { + time.Sleep(500 * time.Millisecond) require.Equal(t, rc.Get(ctx, "a").Val(), "") require.Equal(t, rc.Get(ctx, "b").Val(), "") require.NoError(t, sc.Set(ctx, "a", "1", 0).Err()) From 7afa4a9731612171c843e2e74b6a402719b5fb2a Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Thu, 3 Aug 2023 20:30:19 +0800 Subject: [PATCH 08/15] fix --- src/common/io_util.cc | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/src/common/io_util.cc b/src/common/io_util.cc index 643d58d1a80..da88d66ef6c 100644 --- a/src/common/io_util.cc +++ b/src/common/io_util.cc @@ -231,11 +231,29 @@ ssize_t SendFileImpl(int out_fd, int in_fd, off_t offset, size_t count) { #endif } +#ifdef ENABLE_OPENSSL +ssize_t SendFileSSLImpl(ssl_st *ssl, int in_fd, off_t offset, size_t count) { + constexpr size_t BUFFER_SIZE = 16 * 1024; + char buf[BUFFER_SIZE]; + if (off_t ret = lseek(in_fd, offset, SEEK_SET); ret == -1) { + return -1; + } + count = count <= BUFFER_SIZE ? count : BUFFER_SIZE; + if (ssize_t ret = read(in_fd, buf, count); ret == -1) { + return -1; + } else { + count = ret; + } + return SSL_write(ssl, buf, (int)count); +} +#endif + template Status SockSendFileImpl(FD out_fd, int in_fd, size_t size, Args... args) { + constexpr size_t BUFFER_SIZE = 16 * 1024; off_t offset = 0; while (size != 0) { - size_t n = size <= 16 * 1024 ? size : 16 * 1024; + size_t n = size <= BUFFER_SIZE ? size : BUFFER_SIZE; ssize_t nwritten = F(out_fd, in_fd, offset, n, args...); if (nwritten == -1) { if (errno == EINTR) @@ -256,7 +274,7 @@ Status SockSendFile(int out_fd, int in_fd, size_t size) { return SockSendFileImp Status SockSendFile(int out_fd, int in_fd, size_t size, ssl_st *ssl) { #ifdef ENABLE_OPENSSL if (ssl) { - return SockSendFileImpl(ssl, in_fd, size, 0); + return SockSendFileImpl(ssl, in_fd, size); } else { return SockSendFile(out_fd, in_fd, size); } From 5529f000f7ac6f483ad67653ddb4261359bc171f Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Thu, 3 Aug 2023 23:21:58 +0800 Subject: [PATCH 09/15] fix --- src/common/io_util.cc | 24 ++++-------------------- tests/gocase/tls/tls_test.go | 4 ++-- 2 files changed, 6 insertions(+), 22 deletions(-) diff --git a/src/common/io_util.cc b/src/common/io_util.cc index da88d66ef6c..3519f942f96 100644 --- a/src/common/io_util.cc +++ b/src/common/io_util.cc @@ -275,12 +275,9 @@ Status SockSendFile(int out_fd, int in_fd, size_t size, ssl_st *ssl) { #ifdef ENABLE_OPENSSL if (ssl) { return SockSendFileImpl(ssl, in_fd, size); - } else { - return SockSendFile(out_fd, in_fd, size); } -#else - return SockSendFile(out_fd, in_fd, size); #endif + return SockSendFile(out_fd, in_fd, size); } Status SockSendFile(int out_fd, int in_fd, size_t size, bufferevent *bev) { @@ -460,12 +457,9 @@ Status SockSend(int fd, const std::string &data, ssl_st *ssl) { #ifdef ENABLE_OPENSSL if (ssl) { return WriteImpl(ssl, data); - } else { - return SockSend(fd, data); } -#else - return SockSend(fd, data); #endif + return SockSend(fd, data); } Status SockSend(int fd, const std::string &data, bufferevent *bev) { @@ -491,12 +485,9 @@ StatusOr SockConnect(const std::string &host, uint32_t port, ssl_st *ssl, i } return fd; - } else { - return SockConnect(host, port, conn_timeout, timeout); } -#else - return SockConnect(host, port, conn_timeout, timeout); #endif + return SockConnect(host, port, conn_timeout, timeout); } StatusOr EvbufferRead(evbuffer *buf, evutil_socket_t fd, int howmuch, ssl_st *ssl) { @@ -517,20 +508,13 @@ StatusOr EvbufferRead(evbuffer *buf, evutil_socket_t fd, int howmuch, ssl_s } return howmuch; - } else { - if (int ret = evbuffer_read(buf, fd, howmuch); ret > 0) { - return ret; - } else { - return {Status::NotOK, fmt::format("failed to read from socket: {}", strerror(errno))}; - } } -#else +#endif if (int ret = evbuffer_read(buf, fd, howmuch); ret > 0) { return ret; } else { return {Status::NotOK, fmt::format("failed to read from socket: {}", strerror(errno))}; } -#endif } } // namespace util diff --git a/tests/gocase/tls/tls_test.go b/tests/gocase/tls/tls_test.go index e9b200cb14b..f23fc97c9e3 100644 --- a/tests/gocase/tls/tls_test.go +++ b/tests/gocase/tls/tls_test.go @@ -170,7 +170,7 @@ func TestTLSReplica(t *testing.T) { require.Equal(t, rc.Get(ctx, "b").Val(), "") require.NoError(t, sc.Set(ctx, "a", "1", 0).Err()) require.NoError(t, sc.Set(ctx, "b", "2", 0).Err()) - time.Sleep(500 * time.Millisecond) + time.Sleep(1000 * time.Millisecond) require.Equal(t, rc.Get(ctx, "a").Val(), "1") require.Equal(t, rc.Get(ctx, "b").Val(), "2") }) @@ -186,7 +186,7 @@ func TestTLSReplica(t *testing.T) { t.Run("TLS: Replication (full)", func(t *testing.T) { require.NoError(t, sc.Set(ctx, "c", "3", 0).Err()) - time.Sleep(500 * time.Millisecond) + time.Sleep(1000 * time.Millisecond) require.Equal(t, rc2.Get(ctx, "a").Val(), "1") require.Equal(t, rc2.Get(ctx, "b").Val(), "2") require.Equal(t, rc2.Get(ctx, "c").Val(), "3") From 5a774169775f55b27d41879dfee2aaffdfe076d9 Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Thu, 3 Aug 2023 23:44:17 +0800 Subject: [PATCH 10/15] remove header include --- src/common/io_util.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/common/io_util.h b/src/common/io_util.h index 2117e077fc5..9fafbc7bdfc 100644 --- a/src/common/io_util.h +++ b/src/common/io_util.h @@ -22,7 +22,6 @@ #include -#include "event2/util.h" #include "status.h" // forward declarations @@ -59,6 +58,6 @@ Status SockSendFile(int out_fd, int in_fd, size_t size, ssl_st *ssl); Status SockSendFile(int out_fd, int in_fd, size_t size, bufferevent *bev); StatusOr SockConnect(const std::string &host, uint32_t port, ssl_st *ssl, int conn_timeout = 0, int timeout = 0); -StatusOr EvbufferRead(evbuffer *buf, evutil_socket_t fd, int howmuch, ssl_st *ssl); +StatusOr EvbufferRead(evbuffer *buf, int fd, int howmuch, ssl_st *ssl); } // namespace util From 68d3709a8d4d1a22530e2fbab1ce16961b8e5e9a Mon Sep 17 00:00:00 2001 From: Twice Date: Tue, 8 Aug 2023 07:38:14 +0800 Subject: [PATCH 11/15] fix conflict --- src/server/worker.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/worker.cc b/src/server/worker.cc index 52a7cee63f8..5af707b29de 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -132,7 +132,7 @@ void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sock ssl_st *ssl = nullptr; #ifdef ENABLE_OPENSSL if (uint32_t(local_port) == svr->GetConfig()->tls_port) { - ssl = SSL_new(worker->svr->ssl_ctx.get()); + ssl = SSL_new(svr->ssl_ctx.get()); if (!ssl) { LOG(ERROR) << "Failed to construct SSL structure for new connection: " << SSLErrors{}; evutil_closesocket(fd); From cc5cb977c245082810226358ce2b1d82a8888bc9 Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Tue, 15 Aug 2023 13:27:01 +0800 Subject: [PATCH 12/15] fix go test --- tests/gocase/tls/tls_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/gocase/tls/tls_test.go b/tests/gocase/tls/tls_test.go index f23fc97c9e3..508d5c484b7 100644 --- a/tests/gocase/tls/tls_test.go +++ b/tests/gocase/tls/tls_test.go @@ -165,16 +165,18 @@ func TestTLSReplica(t *testing.T) { defer func() { require.NoError(t, rc.Close()) }() t.Run("TLS: Replication (incremental)", func(t *testing.T) { - time.Sleep(500 * time.Millisecond) + time.Sleep(1000 * time.Millisecond) require.Equal(t, rc.Get(ctx, "a").Val(), "") require.Equal(t, rc.Get(ctx, "b").Val(), "") require.NoError(t, sc.Set(ctx, "a", "1", 0).Err()) require.NoError(t, sc.Set(ctx, "b", "2", 0).Err()) - time.Sleep(1000 * time.Millisecond) + util.WaitForSync(t, rc) require.Equal(t, rc.Get(ctx, "a").Val(), "1") require.Equal(t, rc.Get(ctx, "b").Val(), "2") }) + require.NoError(t, sc.Set(ctx, "c", "3", 0).Err()) + replica2 := util.StartTLSServer(t, map[string]string{ "tls-replication": "yes", "slaveof": fmt.Sprintf("%s %d", srv.Host(), srv.TLSPort()), @@ -185,11 +187,9 @@ func TestTLSReplica(t *testing.T) { defer func() { require.NoError(t, rc2.Close()) }() t.Run("TLS: Replication (full)", func(t *testing.T) { - require.NoError(t, sc.Set(ctx, "c", "3", 0).Err()) - time.Sleep(1000 * time.Millisecond) + util.WaitForSync(t, rc2) require.Equal(t, rc2.Get(ctx, "a").Val(), "1") require.Equal(t, rc2.Get(ctx, "b").Val(), "2") require.Equal(t, rc2.Get(ctx, "c").Val(), "3") - require.Equal(t, rc.Get(ctx, "c").Val(), "3") }) } From c47e32bff328187042fc7b894e1c0d7babe25216 Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Tue, 15 Aug 2023 16:38:44 +0800 Subject: [PATCH 13/15] fix ssl ctx init --- src/server/server.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/server.cc b/src/server/server.cc index 04e349b1206..1ff57959c2c 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -66,7 +66,7 @@ Server::Server(engine::Storage *storage, Config *config) #ifdef ENABLE_OPENSSL // init ssl context - if (config->tls_port) { + if (config->tls_port || config->tls_replication) { ssl_ctx = CreateSSLContext(config); if (!ssl_ctx) { exit(1); From 3b6ab618c07b6749e33e447ba1780da2e47c82cd Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Wed, 16 Aug 2023 11:51:19 +0800 Subject: [PATCH 14/15] fix --- src/main.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.cc b/src/main.cc index fc3b4611984..5d83f1322e0 100644 --- a/src/main.cc +++ b/src/main.cc @@ -349,7 +349,7 @@ int main(int argc, char *argv[]) { #ifdef ENABLE_OPENSSL // initialize OpenSSL - if (config.tls_port) { + if (config.tls_port || config.tls_replication) { InitSSL(); } #endif From 5f6c78176a06dcbd7a659073d41b83afdbd033c6 Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Mon, 11 Sep 2023 22:31:08 +0900 Subject: [PATCH 15/15] fix --- tests/gocase/tls/tls_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/gocase/tls/tls_test.go b/tests/gocase/tls/tls_test.go index 508d5c484b7..460f53c4601 100644 --- a/tests/gocase/tls/tls_test.go +++ b/tests/gocase/tls/tls_test.go @@ -170,7 +170,7 @@ func TestTLSReplica(t *testing.T) { require.Equal(t, rc.Get(ctx, "b").Val(), "") require.NoError(t, sc.Set(ctx, "a", "1", 0).Err()) require.NoError(t, sc.Set(ctx, "b", "2", 0).Err()) - util.WaitForSync(t, rc) + util.WaitForOffsetSync(t, sc, rc) require.Equal(t, rc.Get(ctx, "a").Val(), "1") require.Equal(t, rc.Get(ctx, "b").Val(), "2") }) @@ -187,7 +187,7 @@ func TestTLSReplica(t *testing.T) { defer func() { require.NoError(t, rc2.Close()) }() t.Run("TLS: Replication (full)", func(t *testing.T) { - util.WaitForSync(t, rc2) + util.WaitForOffsetSync(t, sc, rc2) require.Equal(t, rc2.Get(ctx, "a").Val(), "1") require.Equal(t, rc2.Get(ctx, "b").Val(), "2") require.Equal(t, rc2.Get(ctx, "c").Val(), "3")