From 1dd0df0a9670ff078c8ca39db35cfdf83acd8827 Mon Sep 17 00:00:00 2001 From: git-hulk Date: Sun, 8 Sep 2024 15:00:56 +0800 Subject: [PATCH] Fix didn't reload namespaces from DB after the full sync Namespaces will store inside DB if the repl-namespace-enabled was set to yes. And then replicas will intercept and parse the increment replication log to see if it needs to reload namespaces. But it won't have the namespace update log when doing the full sync, so we need to forcely reload from DB once the full replication was done. --- src/cluster/replication.cc | 7 ++++ src/server/namespace.cc | 3 +- src/server/namespace.h | 4 +- tests/gocase/unit/namespace/namespace_test.go | 39 +++++++++++++++++++ 4 files changed, 49 insertions(+), 4 deletions(-) diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc index 3de51a94047..29da294169f 100644 --- a/src/cluster/replication.cc +++ b/src/cluster/replication.cc @@ -732,6 +732,13 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev) { LOG(INFO) << "[replication] Succeeded restoring the backup, fullsync was finish"; post_fullsync_cb_(); + // It needs to reload namespaces from DB after the full sync is done, + // or namespaces are not visible in the replica. + s = srv_->GetNamespace()->LoadAndRewrite(); + if (!s.IsOK()) { + LOG(ERROR) << "[replication] Failed to load and rewrite namespace: " << s.Msg(); + } + // Switch to psync state machine again psync_steps_.Start(); return CBState::QUIT; diff --git a/src/server/namespace.cc b/src/server/namespace.cc index e10c08dc29b..d1f7c228483 100644 --- a/src/server/namespace.cc +++ b/src/server/namespace.cc @@ -54,7 +54,8 @@ bool Namespace::IsAllowModify() const { Status Namespace::loadFromDB(std::map* db_tokens) const { std::string value; engine::Context ctx(storage_); - auto s = storage_->Get(ctx, ctx.GetReadOptions(), cf_, kNamespaceDBKey, &value); + auto cf = storage_->GetCFHandle(ColumnFamilyID::Propagate); + auto s = storage_->Get(ctx, ctx.GetReadOptions(), cf, kNamespaceDBKey, &value); if (!s.ok()) { if (s.IsNotFound()) return Status::OK(); return {Status::NotOK, s.ToString()}; diff --git a/src/server/namespace.h b/src/server/namespace.h index 3e22d382419..d2b45cf3d2a 100644 --- a/src/server/namespace.h +++ b/src/server/namespace.h @@ -26,8 +26,7 @@ constexpr const char *kNamespaceDBKey = "__namespace_keys__"; class Namespace { public: - explicit Namespace(engine::Storage *storage) - : storage_(storage), cf_(storage_->GetCFHandle(ColumnFamilyID::Propagate)) {} + explicit Namespace(engine::Storage *storage) : storage_(storage) {} ~Namespace() = default; Namespace(const Namespace &) = delete; @@ -45,7 +44,6 @@ class Namespace { private: engine::Storage *storage_; - rocksdb::ColumnFamilyHandle *cf_ = nullptr; std::shared_mutex tokens_mu_; // mapping from token to namespace name diff --git a/tests/gocase/unit/namespace/namespace_test.go b/tests/gocase/unit/namespace/namespace_test.go index 9409b61a5a7..ff11af84757 100644 --- a/tests/gocase/unit/namespace/namespace_test.go +++ b/tests/gocase/unit/namespace/namespace_test.go @@ -21,6 +21,8 @@ package namespace import ( "context" + "fmt" + "strings" "sync" "testing" "time" @@ -252,6 +254,43 @@ func TestNamespaceReplicate(t *testing.T) { }) } +func TestNamespaceReplicateWithFullSync(t *testing.T) { + config := map[string]string{ + "rocksdb.write_buffer_size": "4", + "rocksdb.target_file_size_base": "16", + "rocksdb.max_write_buffer_number": "1", + "rocksdb.wal_ttl_seconds": "0", + "rocksdb.wal_size_limit_mb": "0", + "repl-namespace-enabled": "yes", + "requirepass": "123", + "masterauth": "123", + } + master := util.StartServer(t, config) + defer master.Close() + masterClient := master.NewClientWithOption(&redis.Options{Password: "123"}) + defer func() { require.NoError(t, masterClient.Close()) }() + + slave := util.StartServer(t, config) + defer slave.Close() + slaveClient := slave.NewClientWithOption(&redis.Options{Password: "123"}) + defer func() { require.NoError(t, slaveClient.Close()) }() + + ctx := context.Background() + value := strings.Repeat("a", 128*1024) + for i := 0; i < 1024; i++ { + require.NoError(t, masterClient.Set(ctx, fmt.Sprintf("key%d", i), value, 0).Err()) + } + require.NoError(t, masterClient.Do(ctx, "NAMESPACE", "ADD", "foo", "bar").Err()) + + util.SlaveOf(t, slaveClient, master) + util.WaitForOffsetSync(t, masterClient, slaveClient, 60*time.Second) + + // Namespaces should be replicated after the full sync + token, err := slaveClient.Do(ctx, "NAMESPACE", "GET", "foo").Result() + require.NoError(t, err) + require.EqualValues(t, "bar", token) +} + func TestNamespaceRewrite(t *testing.T) { password := "pwd" srv := util.StartServerWithCLIOptions(t, false, map[string]string{