From fe94ab293084de9192da2d5c0723b426205dd18a Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Mon, 4 Nov 2024 21:44:04 +0800 Subject: [PATCH 01/18] feat(coordinator): support update service info when new term --- coordinator/impl/coordinator.go | 6 ++ coordinator/impl/coordinator_e2e_test.go | 76 +++++++++++++++++++++++ coordinator/impl/shard_controller.go | 15 +++++ coordinator/impl/shard_controller_test.go | 4 ++ 4 files changed, 101 insertions(+) diff --git a/coordinator/impl/coordinator.go b/coordinator/impl/coordinator.go index 97367a2f..c978de9c 100644 --- a/coordinator/impl/coordinator.go +++ b/coordinator/impl/coordinator.go @@ -55,6 +55,8 @@ type Coordinator interface { NodeAvailabilityListener ClusterStatus() model.ClusterStatus + + FindServerByInternalAddress(internalAddress string) *model.ServerAddress } type coordinator struct { @@ -512,6 +514,10 @@ func (c *coordinator) rebalanceCluster() error { return nil } +func (c *coordinator) FindServerByInternalAddress(internalAddress string) *model.ServerAddress { + return c.findServerByInternalAddress(c.ClusterConfig, internalAddress) +} + func (*coordinator) findServerByInternalAddress(newClusterConfig model.ClusterConfig, server string) *model.ServerAddress { for _, s := range newClusterConfig.Servers { if server == s.Internal { diff --git a/coordinator/impl/coordinator_e2e_test.go b/coordinator/impl/coordinator_e2e_test.go index b62f6ecc..b8435902 100644 --- a/coordinator/impl/coordinator_e2e_test.go +++ b/coordinator/impl/coordinator_e2e_test.go @@ -19,6 +19,7 @@ import ( "fmt" "log/slog" "math" + "strings" "sync" "testing" "time" @@ -769,3 +770,78 @@ func checkServerLists(t *testing.T, expected, actual []model.ServerAddress) { assert.True(t, ok) } } + +func TestCoordinator_RefreshServerInfo(t *testing.T) { + s1, sa1 := newServer(t) + s2, sa2 := newServer(t) + s3, sa3 := newServer(t) + + metadataProvider := NewMetadataProviderMemory() + clusterConfig := model.ClusterConfig{ + Namespaces: []model.NamespaceConfig{{ + Name: "my-ns-1", + ReplicationFactor: 3, + InitialShardCount: 1, + }}, + Servers: []model.ServerAddress{sa1, sa2, sa3}, + } + configChangesCh := make(chan any) + c, err := NewCoordinator(metadataProvider, func() (model.ClusterConfig, error) { + return clusterConfig, nil + }, configChangesCh, + NewRpcProvider(common.NewClientPool(nil, nil))) + assert.NoError(t, err) + + // wait for all shards to be ready + assert.Eventually(t, func() bool { + for _, ns := range c.ClusterStatus().Namespaces { + for _, shard := range ns.Shards { + if shard.Status != model.ShardStatusSteadyState { + return false + } + } + } + return true + }, 10*time.Second, 10*time.Millisecond) + + // change the localhost to 127.0.0.1 + var clusterServer []model.ServerAddress + for _, sv := range clusterConfig.Servers { + clusterServer = append(clusterServer, model.ServerAddress{ + Public: strings.Replace(sv.Public, "localhost", "127.0.0.1", -1), + Internal: sv.Internal, + }) + } + + clusterConfig.Servers = clusterServer + configChangesCh <- nil + + // new term + coordinatorInstance := c.(*coordinator) + controller := coordinatorInstance.shardControllers[0] + controllerInstance := controller.(*shardController) + controllerInstance.electLeaderWithRetries() + + assert.Eventually(t, func() bool { + for _, ns := range c.ClusterStatus().Namespaces { + for _, shard := range ns.Shards { + if shard.Status != model.ShardStatusSteadyState { + return false + } + for _, sv := range shard.Ensemble { + if !strings.HasPrefix(sv.Public, "127.0.0.1") { + return false + } + } + } + } + return true + }, 10*time.Second, 10*time.Millisecond) + + err = s1.Close() + assert.NoError(t, err) + err = s2.Close() + assert.NoError(t, err) + err = s3.Close() + assert.NoError(t, err) +} diff --git a/coordinator/impl/shard_controller.go b/coordinator/impl/shard_controller.go index cb760060..bc16dfa8 100644 --- a/coordinator/impl/shard_controller.go +++ b/coordinator/impl/shard_controller.go @@ -295,6 +295,8 @@ func (s *shardController) electLeader() error { s.shardMetadata.Status = model.ShardStatusElection s.shardMetadata.Leader = nil s.shardMetadata.Term++ + // it's a safe point to update the service info + s.shardMetadata.Ensemble = s.getRefreshedEnsemble() s.shardMetadataMutex.Unlock() s.log.Info( @@ -369,6 +371,19 @@ func (s *shardController) electLeader() error { return nil } +func (s *shardController) getRefreshedEnsemble() []model.ServerAddress { + refreshedEnsembleServiceInfo := make([]model.ServerAddress, 0) + for _, currentServer := range s.shardMetadata.Ensemble { + logicalNodeId := currentServer.Internal + if refreshedServiceInfo := s.coordinator.FindServerByInternalAddress(logicalNodeId); refreshedServiceInfo != nil { + refreshedEnsembleServiceInfo = append(refreshedEnsembleServiceInfo, *refreshedServiceInfo) + continue + } + refreshedEnsembleServiceInfo = append(refreshedEnsembleServiceInfo, currentServer) + } + return refreshedEnsembleServiceInfo +} + func (s *shardController) deletingRemovedNodes() error { for _, ds := range s.shardMetadata.RemovedNodes { if _, err := s.rpc.DeleteShard(s.ctx, ds, &proto.DeleteShardRequest{ diff --git a/coordinator/impl/shard_controller_test.go b/coordinator/impl/shard_controller_test.go index d798cd7d..a2148ee5 100644 --- a/coordinator/impl/shard_controller_test.go +++ b/coordinator/impl/shard_controller_test.go @@ -365,6 +365,10 @@ func (m *mockCoordinator) WaitForNextUpdate(ctx context.Context, currentValue *p panic("not implemented") } +func (m *mockCoordinator) FindServerByInternalAddress(internalAddress string) *model.ServerAddress { + return nil +} + func (m *mockCoordinator) InitiateLeaderElection(namespace string, shard int64, metadata model.ShardMetadata) error { m.Lock() defer m.Unlock() From 5060d05c20ac7e433489bb11ad827c5dc17ed2e4 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Mon, 4 Nov 2024 22:12:41 +0800 Subject: [PATCH 02/18] fix lint --- coordinator/impl/coordinator_e2e_test.go | 4 ++-- oxia/options_list.go | 2 +- oxia/sync_client_impl_test.go | 3 ++- server/kv/db_test.go | 3 ++- server/kv/kv_pebble.go | 3 ++- server/wal/codec/codec.go | 4 ++-- server/wal/codec/v1.go | 7 ++++--- server/wal/codec/v1_test.go | 3 ++- server/wal/codec/v2.go | 9 +++++---- server/wal/codec/v2_test.go | 3 ++- server/wal/wal_ro_segment.go | 2 +- server/wal/wal_ro_segment_test.go | 3 ++- server/wal/wal_rw_segment_test.go | 3 ++- 13 files changed, 29 insertions(+), 20 deletions(-) diff --git a/coordinator/impl/coordinator_e2e_test.go b/coordinator/impl/coordinator_e2e_test.go index b8435902..ff162776 100644 --- a/coordinator/impl/coordinator_e2e_test.go +++ b/coordinator/impl/coordinator_e2e_test.go @@ -805,10 +805,10 @@ func TestCoordinator_RefreshServerInfo(t *testing.T) { }, 10*time.Second, 10*time.Millisecond) // change the localhost to 127.0.0.1 - var clusterServer []model.ServerAddress + clusterServer := make([]model.ServerAddress, 0) for _, sv := range clusterConfig.Servers { clusterServer = append(clusterServer, model.ServerAddress{ - Public: strings.Replace(sv.Public, "localhost", "127.0.0.1", -1), + Public: strings.ReplaceAll(sv.Public, "localhost", "127.0.0.1"), Internal: sv.Internal, }) } diff --git a/oxia/options_list.go b/oxia/options_list.go index a3add2fb..19260eb1 100644 --- a/oxia/options_list.go +++ b/oxia/options_list.go @@ -47,7 +47,7 @@ func (u *useIndex) applyRangeScan(opts *rangeScanOptions) { } // UseIndex let the users specify a different index to follow for the -// Note: The returned list will contain they primary keys of the records +// Note: The returned list will contain they primary keys of the records. func UseIndex(indexName string) ListOption { return &useIndex{indexName} } diff --git a/oxia/sync_client_impl_test.go b/oxia/sync_client_impl_test.go index 6fec5645..bc4b4186 100644 --- a/oxia/sync_client_impl_test.go +++ b/oxia/sync_client_impl_test.go @@ -17,11 +17,12 @@ package oxia import ( "context" "fmt" - "github.com/streamnative/oxia/server" "log/slog" "strings" "testing" + "github.com/streamnative/oxia/server" + "github.com/stretchr/testify/assert" ) diff --git a/server/kv/db_test.go b/server/kv/db_test.go index 55a38ad3..0ee43f26 100644 --- a/server/kv/db_test.go +++ b/server/kv/db_test.go @@ -16,11 +16,12 @@ package kv import ( "fmt" - "github.com/google/uuid" "os" "path" "testing" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" pb "google.golang.org/protobuf/proto" diff --git a/server/kv/kv_pebble.go b/server/kv/kv_pebble.go index 37ef0f77..c404eaaa 100644 --- a/server/kv/kv_pebble.go +++ b/server/kv/kv_pebble.go @@ -16,7 +16,6 @@ package kv import ( "fmt" - "golang.org/x/net/context" "io" "log/slog" "os" @@ -24,6 +23,8 @@ import ( "sync/atomic" "time" + "golang.org/x/net/context" + "github.com/cockroachdb/pebble/bloom" "github.com/streamnative/oxia/common/compare" diff --git a/server/wal/codec/codec.go b/server/wal/codec/codec.go index 45506e0d..4beb34a3 100644 --- a/server/wal/codec/codec.go +++ b/server/wal/codec/codec.go @@ -117,7 +117,7 @@ type Codec interface { // - buf: the byte slice containing the raw data. // - startFileOffset: the starting file offset from which recovery begins. // - baseEntryOffset: the base offset for the index entries, used to adjust entry offsets. - // - commitOffset: a pointer to the commit offset, which is using for auto-discard uncommited corruption data + // - commitOffset: a pointer to the commit offset, which is using for auto-discard uncommitted corruption data // // Returns: // - index: the recovered index data as a byte slice. @@ -165,7 +165,7 @@ func ReadInt(b []byte, offset uint32) uint32 { return binary.BigEndian.Uint32(b[offset : offset+4]) } -// Index buf +// Index buf. var bufferPool = sync.Pool{} const initialIndexBufferCapacity = 16 * 1024 diff --git a/server/wal/codec/v1.go b/server/wal/codec/v1.go index ed7dd263..e0e3e3e8 100644 --- a/server/wal/codec/v1.go +++ b/server/wal/codec/v1.go @@ -16,10 +16,11 @@ package codec import ( "encoding/binary" - "github.com/pkg/errors" - "go.uber.org/multierr" "io" "os" + + "github.com/pkg/errors" + "go.uber.org/multierr" ) // Txn File: @@ -33,7 +34,7 @@ import ( // +----------------+----------------+----------------+ // | Index(4 Bytes) | Index(4 Bytes) | ... | // +----------------+----------------+----------------+ -// Index: The file offset index +// Index: The file offset index. var _ Codec = &V1{} const v1PayloadSizeLen uint32 = 4 diff --git a/server/wal/codec/v1_test.go b/server/wal/codec/v1_test.go index 7188a154..62568c51 100644 --- a/server/wal/codec/v1_test.go +++ b/server/wal/codec/v1_test.go @@ -16,11 +16,12 @@ package codec import ( "encoding/binary" - "github.com/google/uuid" "os" "path" "testing" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" ) diff --git a/server/wal/codec/v2.go b/server/wal/codec/v2.go index 0d94b18e..5680baf2 100644 --- a/server/wal/codec/v2.go +++ b/server/wal/codec/v2.go @@ -16,11 +16,12 @@ package codec import ( "encoding/binary" - "go.uber.org/multierr" "io" "log/slog" "os" + "go.uber.org/multierr" + "github.com/pkg/errors" "github.com/streamnative/oxia/server/util/crc" @@ -40,7 +41,7 @@ import ( // | CRC(4Bytes) | Index(4B) | Index(4B) | ... | // +--------------+-----------+-----------+-----+ // CRC: 32bit hash computed over the payload using CRC. -// Index: The file offset index +// Index: The file offset index. var _ Codec = &V2{} const v2PayloadSizeLen uint32 = 4 @@ -217,8 +218,8 @@ func (v *V2) RecoverIndex(buf []byte, startFileOffset uint32, baseEntryOffset in // data corruption if errors.Is(err, ErrOffsetOutOfBounds) || errors.Is(err, ErrDataCorrupted) { if commitOffset != nil && currentEntryOffset > *commitOffset { - // uncommited data corruption, simply discard it - slog.Warn("discard the corrupted uncommited data.", + // uncommitted data corruption, simply discard it + slog.Warn("discard the corrupted uncommitted data.", slog.Int64("entryId", currentEntryOffset), slog.Any("error", err)) break } diff --git a/server/wal/codec/v2_test.go b/server/wal/codec/v2_test.go index a1fa0d78..9bd03096 100644 --- a/server/wal/codec/v2_test.go +++ b/server/wal/codec/v2_test.go @@ -16,11 +16,12 @@ package codec import ( "encoding/binary" - "github.com/google/uuid" "os" "path" "testing" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" "github.com/streamnative/oxia/server/util/crc" diff --git a/server/wal/wal_ro_segment.go b/server/wal/wal_ro_segment.go index 41c8dff7..c49f4a8e 100644 --- a/server/wal/wal_ro_segment.go +++ b/server/wal/wal_ro_segment.go @@ -329,7 +329,7 @@ func (r *readOnlySegmentsGroup) PollHighestSegment() (common.RefCount[ReadOnlySe defer r.Unlock() if r.allSegments.Empty() { - return nil, nil // nolint: nilnil + return nil, nil //nolint: nilnil } offset, _ := r.allSegments.Max() diff --git a/server/wal/wal_ro_segment_test.go b/server/wal/wal_ro_segment_test.go index ae93cde1..30d56640 100644 --- a/server/wal/wal_ro_segment_test.go +++ b/server/wal/wal_ro_segment_test.go @@ -16,10 +16,11 @@ package wal import ( "fmt" - "github.com/google/uuid" "os" "testing" + "github.com/google/uuid" + "github.com/streamnative/oxia/server/wal/codec" "github.com/stretchr/testify/assert" diff --git a/server/wal/wal_rw_segment_test.go b/server/wal/wal_rw_segment_test.go index 4d4a3120..66c19c0a 100644 --- a/server/wal/wal_rw_segment_test.go +++ b/server/wal/wal_rw_segment_test.go @@ -16,9 +16,10 @@ package wal import ( "encoding/binary" - "github.com/streamnative/oxia/server/wal/codec" "testing" + "github.com/streamnative/oxia/server/wal/codec" + "github.com/stretchr/testify/assert" ) From 174dba8c8060d7cbf3b9eefc60979b45c3260d2a Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Mon, 4 Nov 2024 22:19:05 +0800 Subject: [PATCH 03/18] revert useless changes --- oxia/options_list.go | 2 +- oxia/sync_client_impl_test.go | 3 +-- server/kv/db_test.go | 3 +-- server/kv/kv_pebble.go | 3 +-- server/wal/codec/codec.go | 4 ++-- server/wal/codec/v1.go | 7 +++---- server/wal/codec/v1_test.go | 3 +-- server/wal/codec/v2.go | 9 ++++----- server/wal/codec/v2_test.go | 3 +-- server/wal/wal_ro_segment.go | 2 +- server/wal/wal_ro_segment_test.go | 3 +-- server/wal/wal_rw_segment_test.go | 3 +-- 12 files changed, 18 insertions(+), 27 deletions(-) diff --git a/oxia/options_list.go b/oxia/options_list.go index 19260eb1..a3add2fb 100644 --- a/oxia/options_list.go +++ b/oxia/options_list.go @@ -47,7 +47,7 @@ func (u *useIndex) applyRangeScan(opts *rangeScanOptions) { } // UseIndex let the users specify a different index to follow for the -// Note: The returned list will contain they primary keys of the records. +// Note: The returned list will contain they primary keys of the records func UseIndex(indexName string) ListOption { return &useIndex{indexName} } diff --git a/oxia/sync_client_impl_test.go b/oxia/sync_client_impl_test.go index bc4b4186..6fec5645 100644 --- a/oxia/sync_client_impl_test.go +++ b/oxia/sync_client_impl_test.go @@ -17,12 +17,11 @@ package oxia import ( "context" "fmt" + "github.com/streamnative/oxia/server" "log/slog" "strings" "testing" - "github.com/streamnative/oxia/server" - "github.com/stretchr/testify/assert" ) diff --git a/server/kv/db_test.go b/server/kv/db_test.go index 0ee43f26..55a38ad3 100644 --- a/server/kv/db_test.go +++ b/server/kv/db_test.go @@ -16,12 +16,11 @@ package kv import ( "fmt" + "github.com/google/uuid" "os" "path" "testing" - "github.com/google/uuid" - "github.com/stretchr/testify/assert" pb "google.golang.org/protobuf/proto" diff --git a/server/kv/kv_pebble.go b/server/kv/kv_pebble.go index c404eaaa..37ef0f77 100644 --- a/server/kv/kv_pebble.go +++ b/server/kv/kv_pebble.go @@ -16,6 +16,7 @@ package kv import ( "fmt" + "golang.org/x/net/context" "io" "log/slog" "os" @@ -23,8 +24,6 @@ import ( "sync/atomic" "time" - "golang.org/x/net/context" - "github.com/cockroachdb/pebble/bloom" "github.com/streamnative/oxia/common/compare" diff --git a/server/wal/codec/codec.go b/server/wal/codec/codec.go index 4beb34a3..45506e0d 100644 --- a/server/wal/codec/codec.go +++ b/server/wal/codec/codec.go @@ -117,7 +117,7 @@ type Codec interface { // - buf: the byte slice containing the raw data. // - startFileOffset: the starting file offset from which recovery begins. // - baseEntryOffset: the base offset for the index entries, used to adjust entry offsets. - // - commitOffset: a pointer to the commit offset, which is using for auto-discard uncommitted corruption data + // - commitOffset: a pointer to the commit offset, which is using for auto-discard uncommited corruption data // // Returns: // - index: the recovered index data as a byte slice. @@ -165,7 +165,7 @@ func ReadInt(b []byte, offset uint32) uint32 { return binary.BigEndian.Uint32(b[offset : offset+4]) } -// Index buf. +// Index buf var bufferPool = sync.Pool{} const initialIndexBufferCapacity = 16 * 1024 diff --git a/server/wal/codec/v1.go b/server/wal/codec/v1.go index e0e3e3e8..ed7dd263 100644 --- a/server/wal/codec/v1.go +++ b/server/wal/codec/v1.go @@ -16,11 +16,10 @@ package codec import ( "encoding/binary" - "io" - "os" - "github.com/pkg/errors" "go.uber.org/multierr" + "io" + "os" ) // Txn File: @@ -34,7 +33,7 @@ import ( // +----------------+----------------+----------------+ // | Index(4 Bytes) | Index(4 Bytes) | ... | // +----------------+----------------+----------------+ -// Index: The file offset index. +// Index: The file offset index var _ Codec = &V1{} const v1PayloadSizeLen uint32 = 4 diff --git a/server/wal/codec/v1_test.go b/server/wal/codec/v1_test.go index 62568c51..7188a154 100644 --- a/server/wal/codec/v1_test.go +++ b/server/wal/codec/v1_test.go @@ -16,12 +16,11 @@ package codec import ( "encoding/binary" + "github.com/google/uuid" "os" "path" "testing" - "github.com/google/uuid" - "github.com/stretchr/testify/assert" ) diff --git a/server/wal/codec/v2.go b/server/wal/codec/v2.go index 5680baf2..0d94b18e 100644 --- a/server/wal/codec/v2.go +++ b/server/wal/codec/v2.go @@ -16,12 +16,11 @@ package codec import ( "encoding/binary" + "go.uber.org/multierr" "io" "log/slog" "os" - "go.uber.org/multierr" - "github.com/pkg/errors" "github.com/streamnative/oxia/server/util/crc" @@ -41,7 +40,7 @@ import ( // | CRC(4Bytes) | Index(4B) | Index(4B) | ... | // +--------------+-----------+-----------+-----+ // CRC: 32bit hash computed over the payload using CRC. -// Index: The file offset index. +// Index: The file offset index var _ Codec = &V2{} const v2PayloadSizeLen uint32 = 4 @@ -218,8 +217,8 @@ func (v *V2) RecoverIndex(buf []byte, startFileOffset uint32, baseEntryOffset in // data corruption if errors.Is(err, ErrOffsetOutOfBounds) || errors.Is(err, ErrDataCorrupted) { if commitOffset != nil && currentEntryOffset > *commitOffset { - // uncommitted data corruption, simply discard it - slog.Warn("discard the corrupted uncommitted data.", + // uncommited data corruption, simply discard it + slog.Warn("discard the corrupted uncommited data.", slog.Int64("entryId", currentEntryOffset), slog.Any("error", err)) break } diff --git a/server/wal/codec/v2_test.go b/server/wal/codec/v2_test.go index 9bd03096..a1fa0d78 100644 --- a/server/wal/codec/v2_test.go +++ b/server/wal/codec/v2_test.go @@ -16,12 +16,11 @@ package codec import ( "encoding/binary" + "github.com/google/uuid" "os" "path" "testing" - "github.com/google/uuid" - "github.com/stretchr/testify/assert" "github.com/streamnative/oxia/server/util/crc" diff --git a/server/wal/wal_ro_segment.go b/server/wal/wal_ro_segment.go index c49f4a8e..41c8dff7 100644 --- a/server/wal/wal_ro_segment.go +++ b/server/wal/wal_ro_segment.go @@ -329,7 +329,7 @@ func (r *readOnlySegmentsGroup) PollHighestSegment() (common.RefCount[ReadOnlySe defer r.Unlock() if r.allSegments.Empty() { - return nil, nil //nolint: nilnil + return nil, nil // nolint: nilnil } offset, _ := r.allSegments.Max() diff --git a/server/wal/wal_ro_segment_test.go b/server/wal/wal_ro_segment_test.go index 30d56640..ae93cde1 100644 --- a/server/wal/wal_ro_segment_test.go +++ b/server/wal/wal_ro_segment_test.go @@ -16,11 +16,10 @@ package wal import ( "fmt" + "github.com/google/uuid" "os" "testing" - "github.com/google/uuid" - "github.com/streamnative/oxia/server/wal/codec" "github.com/stretchr/testify/assert" diff --git a/server/wal/wal_rw_segment_test.go b/server/wal/wal_rw_segment_test.go index 66c19c0a..4d4a3120 100644 --- a/server/wal/wal_rw_segment_test.go +++ b/server/wal/wal_rw_segment_test.go @@ -16,9 +16,8 @@ package wal import ( "encoding/binary" - "testing" - "github.com/streamnative/oxia/server/wal/codec" + "testing" "github.com/stretchr/testify/assert" ) From 9339ae891831b466a79b0396fdd3eec52e249052 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Mon, 4 Nov 2024 22:48:09 +0800 Subject: [PATCH 04/18] close coordinator --- coordinator/impl/coordinator_e2e_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/coordinator/impl/coordinator_e2e_test.go b/coordinator/impl/coordinator_e2e_test.go index ff162776..11d92c85 100644 --- a/coordinator/impl/coordinator_e2e_test.go +++ b/coordinator/impl/coordinator_e2e_test.go @@ -844,4 +844,6 @@ func TestCoordinator_RefreshServerInfo(t *testing.T) { assert.NoError(t, err) err = s3.Close() assert.NoError(t, err) + err = c.Close() + assert.NoError(t, err) } From e4b67202eeafc7a753a37476b13ff24c48a3d55c Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Thu, 7 Nov 2024 14:04:24 +0800 Subject: [PATCH 05/18] feat(coordinator): support update service info when new term --- coordinator/impl/coordinator_e2e_test.go | 2 +- coordinator/impl/shard_controller.go | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/coordinator/impl/coordinator_e2e_test.go b/coordinator/impl/coordinator_e2e_test.go index 11d92c85..28783ded 100644 --- a/coordinator/impl/coordinator_e2e_test.go +++ b/coordinator/impl/coordinator_e2e_test.go @@ -820,7 +820,7 @@ func TestCoordinator_RefreshServerInfo(t *testing.T) { coordinatorInstance := c.(*coordinator) controller := coordinatorInstance.shardControllers[0] controllerInstance := controller.(*shardController) - controllerInstance.electLeaderWithRetries() + controllerInstance.electionOp <- nil assert.Eventually(t, func() bool { for _, ns := range c.ClusterStatus().Namespaces { diff --git a/coordinator/impl/shard_controller.go b/coordinator/impl/shard_controller.go index bc16dfa8..939d0844 100644 --- a/coordinator/impl/shard_controller.go +++ b/coordinator/impl/shard_controller.go @@ -81,6 +81,7 @@ type shardController struct { rpc RpcProvider coordinator Coordinator + electionOp chan any deleteOp chan any nodeFailureOp chan model.ServerAddress swapNodeOp chan swapNodeRequest @@ -110,6 +111,7 @@ func NewShardController(namespace string, shard int64, namespaceConfig *model.Na shardMetadata: shardMetadata, rpc: rpc, coordinator: coordinator, + electionOp: make(chan any, chanBufferSize), deleteOp: make(chan any, chanBufferSize), nodeFailureOp: make(chan model.ServerAddress, chanBufferSize), swapNodeOp: make(chan swapNodeRequest, chanBufferSize), @@ -193,6 +195,9 @@ func (s *shardController) run() { case a := <-s.newTermAndAddFollowerOp: s.internalNewTermAndAddFollower(a.ctx, a.node, a.res) + + case <-s.electionOp: // for testing + s.electLeaderWithRetries() } } } From a53d726d2032048cb1b19ac8d3c62391f344037a Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Thu, 7 Nov 2024 17:07:34 +0800 Subject: [PATCH 06/18] performance improvement --- coordinator/impl/coordinator.go | 11 ++++++++--- coordinator/impl/shard_controller.go | 20 ++++++++++++++------ coordinator/impl/shard_controller_test.go | 4 ++-- 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/coordinator/impl/coordinator.go b/coordinator/impl/coordinator.go index c978de9c..a90b10e5 100644 --- a/coordinator/impl/coordinator.go +++ b/coordinator/impl/coordinator.go @@ -54,13 +54,12 @@ type Coordinator interface { NodeAvailabilityListener + GetServers() []model.ServerAddress ClusterStatus() model.ClusterStatus - - FindServerByInternalAddress(internalAddress string) *model.ServerAddress } type coordinator struct { - sync.Mutex + sync.RWMutex assignmentsChanged common.ConditionContext MetadataProvider @@ -514,6 +513,12 @@ func (c *coordinator) rebalanceCluster() error { return nil } +func (c *coordinator) GetServers() []model.ServerAddress { + c.RLock() + defer c.RUnlock() + return c.ClusterConfig.Servers +} + func (c *coordinator) FindServerByInternalAddress(internalAddress string) *model.ServerAddress { return c.findServerByInternalAddress(c.ClusterConfig, internalAddress) } diff --git a/coordinator/impl/shard_controller.go b/coordinator/impl/shard_controller.go index 939d0844..5d9a48d3 100644 --- a/coordinator/impl/shard_controller.go +++ b/coordinator/impl/shard_controller.go @@ -377,14 +377,22 @@ func (s *shardController) electLeader() error { } func (s *shardController) getRefreshedEnsemble() []model.ServerAddress { - refreshedEnsembleServiceInfo := make([]model.ServerAddress, 0) - for _, currentServer := range s.shardMetadata.Ensemble { - logicalNodeId := currentServer.Internal - if refreshedServiceInfo := s.coordinator.FindServerByInternalAddress(logicalNodeId); refreshedServiceInfo != nil { - refreshedEnsembleServiceInfo = append(refreshedEnsembleServiceInfo, *refreshedServiceInfo) + serversInfos := s.coordinator.GetServers() + // build a logic index here. + // todo: might introduce global index in the coordinator in the future + index := map[string]model.ServerAddress{} + for _, server := range serversInfos { + index[server.Internal] = server + } + + currentEnsemble := s.shardMetadata.Ensemble + refreshedEnsembleServiceInfo := make([]model.ServerAddress, len(currentEnsemble)) + for idx, candidate := range currentEnsemble { + if refreshedInfo, exist := index[candidate.Internal]; exist { + refreshedEnsembleServiceInfo[idx] = refreshedInfo continue } - refreshedEnsembleServiceInfo = append(refreshedEnsembleServiceInfo, currentServer) + refreshedEnsembleServiceInfo[idx] = candidate } return refreshedEnsembleServiceInfo } diff --git a/coordinator/impl/shard_controller_test.go b/coordinator/impl/shard_controller_test.go index a2148ee5..2075d5fe 100644 --- a/coordinator/impl/shard_controller_test.go +++ b/coordinator/impl/shard_controller_test.go @@ -365,8 +365,8 @@ func (m *mockCoordinator) WaitForNextUpdate(ctx context.Context, currentValue *p panic("not implemented") } -func (m *mockCoordinator) FindServerByInternalAddress(internalAddress string) *model.ServerAddress { - return nil +func (m *mockCoordinator) GetServers() []model.ServerAddress { + return []model.ServerAddress{} } func (m *mockCoordinator) InitiateLeaderElection(namespace string, shard int64, metadata model.ShardMetadata) error { From 742aeec542053adda1025a3d21ffd546c62ab914 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Thu, 7 Nov 2024 19:00:25 +0800 Subject: [PATCH 07/18] support auto refresh --- coordinator/impl/coordinator.go | 26 ++++++++++---- coordinator/impl/coordinator_e2e_test.go | 6 ---- coordinator/impl/shard_controller.go | 42 ++++++++++++++++------- coordinator/impl/shard_controller_test.go | 4 +-- coordinator/model/cluster_status.go | 8 +++++ 5 files changed, 59 insertions(+), 27 deletions(-) diff --git a/coordinator/impl/coordinator.go b/coordinator/impl/coordinator.go index a90b10e5..b93a048e 100644 --- a/coordinator/impl/coordinator.go +++ b/coordinator/impl/coordinator.go @@ -54,17 +54,20 @@ type Coordinator interface { NodeAvailabilityListener - GetServers() []model.ServerAddress + FindNodeInfoById(id model.NodeId) (*model.ServerAddress, bool) + ClusterStatus() model.ClusterStatus } type coordinator struct { - sync.RWMutex + sync.Mutex assignmentsChanged common.ConditionContext MetadataProvider clusterConfigProvider func() (model.ClusterConfig, error) model.ClusterConfig + nodeIndexes sync.Map + clusterConfigChangeCh chan any shardControllers map[int64]ShardController @@ -101,6 +104,7 @@ func NewCoordinator(metadataProvider MetadataProvider, shardControllers: make(map[int64]ShardController), nodeControllers: make(map[string]NodeController), drainingNodes: make(map[string]NodeController), + nodeIndexes: sync.Map{}, rpc: rpc, log: slog.With( slog.String("component", "coordinator"), @@ -118,6 +122,7 @@ func NewCoordinator(metadataProvider MetadataProvider, for _, sa := range c.ClusterConfig.Servers { c.nodeControllers[sa.Internal] = NewNodeController(sa, c, c, c.rpc) + c.nodeIndexes.Store(sa.GetNodeId(), sa) } if c.clusterStatus == nil { @@ -471,6 +476,10 @@ func (c *coordinator) handleClusterConfigUpdated() error { } } + for _, sc := range c.shardControllers { + sc.SyncServerInfo() + } + c.ClusterConfig = newClusterConfig c.clusterStatus = clusterStatus @@ -513,10 +522,12 @@ func (c *coordinator) rebalanceCluster() error { return nil } -func (c *coordinator) GetServers() []model.ServerAddress { - c.RLock() - defer c.RUnlock() - return c.ClusterConfig.Servers +func (c *coordinator) FindNodeInfoById(id model.NodeId) (*model.ServerAddress, bool) { + if info, exist := c.nodeIndexes.Load(id); exist { + address := info.(model.ServerAddress) + return &address, true + } + return nil, false } func (c *coordinator) FindServerByInternalAddress(internalAddress string) *model.ServerAddress { @@ -536,6 +547,8 @@ func (*coordinator) findServerByInternalAddress(newClusterConfig model.ClusterCo func (c *coordinator) checkClusterNodeChanges(newClusterConfig model.ClusterConfig) { // Check for nodes to add for _, sa := range newClusterConfig.Servers { + c.nodeIndexes.Store(sa.GetNodeId(), sa) + if _, ok := c.nodeControllers[sa.Internal]; ok { continue } @@ -559,6 +572,7 @@ func (c *coordinator) checkClusterNodeChanges(newClusterConfig model.ClusterConf } c.log.Info("Detected a removed node", slog.Any("addr", ia)) + c.nodeIndexes.Delete(model.NodeId(ia)) // Moved the node delete(c.nodeControllers, ia) nc.SetStatus(Draining) diff --git a/coordinator/impl/coordinator_e2e_test.go b/coordinator/impl/coordinator_e2e_test.go index 28783ded..6043b8b9 100644 --- a/coordinator/impl/coordinator_e2e_test.go +++ b/coordinator/impl/coordinator_e2e_test.go @@ -816,12 +816,6 @@ func TestCoordinator_RefreshServerInfo(t *testing.T) { clusterConfig.Servers = clusterServer configChangesCh <- nil - // new term - coordinatorInstance := c.(*coordinator) - controller := coordinatorInstance.shardControllers[0] - controllerInstance := controller.(*shardController) - controllerInstance.electionOp <- nil - assert.Eventually(t, func() bool { for _, ns := range c.ClusterStatus().Namespaces { for _, shard := range ns.Shards { diff --git a/coordinator/impl/shard_controller.go b/coordinator/impl/shard_controller.go index 5d9a48d3..8e2dee12 100644 --- a/coordinator/impl/shard_controller.go +++ b/coordinator/impl/shard_controller.go @@ -64,6 +64,8 @@ type ShardController interface { HandleNodeFailure(failedNode model.ServerAddress) + SyncServerInfo() + SwapNode(from model.ServerAddress, to model.ServerAddress) error DeleteShard() @@ -77,7 +79,7 @@ type shardController struct { shard int64 namespaceConfig *model.NamespaceConfig shardMetadata model.ShardMetadata - shardMetadataMutex sync.Mutex + shardMetadataMutex sync.RWMutex rpc RpcProvider coordinator Coordinator @@ -139,6 +141,8 @@ func NewShardController(namespace string, shard int64, namespaceConfig *model.Na s.ctx, s.cancel = context.WithCancel(context.Background()) + s.SyncServerInfo() + s.log.Info( "Started shard controller", slog.Any("shard-metadata", s.shardMetadata), @@ -196,7 +200,7 @@ func (s *shardController) run() { case a := <-s.newTermAndAddFollowerOp: s.internalNewTermAndAddFollower(a.ctx, a.node, a.res) - case <-s.electionOp: // for testing + case <-s.electionOp: s.electLeaderWithRetries() } } @@ -214,7 +218,7 @@ func (s *shardController) handleNodeFailure(failedNode model.ServerAddress) { ) if s.shardMetadata.Leader != nil && - *s.shardMetadata.Leader == failedNode { + s.shardMetadata.Leader.Internal == failedNode.Internal { s.log.Info( "Detected failure on shard leader", slog.Any("leader", failedNode), @@ -377,19 +381,11 @@ func (s *shardController) electLeader() error { } func (s *shardController) getRefreshedEnsemble() []model.ServerAddress { - serversInfos := s.coordinator.GetServers() - // build a logic index here. - // todo: might introduce global index in the coordinator in the future - index := map[string]model.ServerAddress{} - for _, server := range serversInfos { - index[server.Internal] = server - } - currentEnsemble := s.shardMetadata.Ensemble refreshedEnsembleServiceInfo := make([]model.ServerAddress, len(currentEnsemble)) for idx, candidate := range currentEnsemble { - if refreshedInfo, exist := index[candidate.Internal]; exist { - refreshedEnsembleServiceInfo[idx] = refreshedInfo + if refreshedInfo, exist := s.coordinator.FindNodeInfoById(candidate.GetNodeId()); exist { + refreshedEnsembleServiceInfo[idx] = *refreshedInfo continue } refreshedEnsembleServiceInfo[idx] = candidate @@ -894,6 +890,26 @@ func (s *shardController) waitForFollowersToCatchUp(ctx context.Context, leader return nil } +func (s *shardController) SyncServerInfo() { + s.shardMetadataMutex.RLock() + exist := false + for _, candidate := range s.shardMetadata.Ensemble { + if newInfo, ok := s.coordinator.FindNodeInfoById(candidate.GetNodeId()); ok { + if *newInfo != candidate { + exist = true + break + } + } + } + if !exist { + s.shardMetadataMutex.RUnlock() + return + } + s.shardMetadataMutex.RUnlock() + s.log.Info("node info changed, start a new leader election") + s.electionOp <- nil +} + func listContains(list []model.ServerAddress, sa model.ServerAddress) bool { for _, item := range list { if item.Public == sa.Public && item.Internal == sa.Internal { diff --git a/coordinator/impl/shard_controller_test.go b/coordinator/impl/shard_controller_test.go index 2075d5fe..1906b845 100644 --- a/coordinator/impl/shard_controller_test.go +++ b/coordinator/impl/shard_controller_test.go @@ -365,8 +365,8 @@ func (m *mockCoordinator) WaitForNextUpdate(ctx context.Context, currentValue *p panic("not implemented") } -func (m *mockCoordinator) GetServers() []model.ServerAddress { - return []model.ServerAddress{} +func (m *mockCoordinator) FindNodeInfoById(id model.NodeId) (*model.ServerAddress, bool) { + return nil, false } func (m *mockCoordinator) InitiateLeaderElection(namespace string, shard int64, metadata model.ShardMetadata) error { diff --git a/coordinator/model/cluster_status.go b/coordinator/model/cluster_status.go index defd45f8..60177887 100644 --- a/coordinator/model/cluster_status.go +++ b/coordinator/model/cluster_status.go @@ -14,6 +14,8 @@ package model +type NodeId string + type ServerAddress struct { // Public is the endpoint that is advertised to clients Public string `json:"public" yaml:"public"` @@ -22,6 +24,12 @@ type ServerAddress struct { Internal string `json:"internal" yaml:"internal"` } +func (s *ServerAddress) GetNodeId() NodeId { + // use the internal address as the node id by default. + // todo: introduce node id in the future + return NodeId(s.Internal) +} + type Int32HashRange struct { // The minimum inclusive hash that the shard can contain Min uint32 `json:"min"` From 9ed5678d7df6635a4d95da25b9b24cd51fb01392 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Thu, 7 Nov 2024 19:05:03 +0800 Subject: [PATCH 08/18] fix lint --- coordinator/impl/coordinator.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/coordinator/impl/coordinator.go b/coordinator/impl/coordinator.go index b93a048e..7b124ed2 100644 --- a/coordinator/impl/coordinator.go +++ b/coordinator/impl/coordinator.go @@ -524,7 +524,10 @@ func (c *coordinator) rebalanceCluster() error { func (c *coordinator) FindNodeInfoById(id model.NodeId) (*model.ServerAddress, bool) { if info, exist := c.nodeIndexes.Load(id); exist { - address := info.(model.ServerAddress) + address, ok := info.(model.ServerAddress) + if !ok { + panic("unexpected cast") + } return &address, true } return nil, false From 2e0656004e72967caa9a210e7c0438c9cc1555eb Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Thu, 7 Nov 2024 20:39:15 +0800 Subject: [PATCH 09/18] remove unused method --- coordinator/impl/coordinator.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/coordinator/impl/coordinator.go b/coordinator/impl/coordinator.go index 7b124ed2..b6df3d16 100644 --- a/coordinator/impl/coordinator.go +++ b/coordinator/impl/coordinator.go @@ -533,10 +533,6 @@ func (c *coordinator) FindNodeInfoById(id model.NodeId) (*model.ServerAddress, b return nil, false } -func (c *coordinator) FindServerByInternalAddress(internalAddress string) *model.ServerAddress { - return c.findServerByInternalAddress(c.ClusterConfig, internalAddress) -} - func (*coordinator) findServerByInternalAddress(newClusterConfig model.ClusterConfig, server string) *model.ServerAddress { for _, s := range newClusterConfig.Servers { if server == s.Internal { From f4f71b0fb48afe58ff6fd489c504376f351dc325 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Thu, 7 Nov 2024 20:44:40 +0800 Subject: [PATCH 10/18] use server address to replace node id --- coordinator/impl/coordinator.go | 16 ++++++++-------- coordinator/impl/shard_controller.go | 10 +++++----- coordinator/impl/shard_controller_test.go | 2 +- coordinator/model/cluster_status.go | 8 -------- 4 files changed, 14 insertions(+), 22 deletions(-) diff --git a/coordinator/impl/coordinator.go b/coordinator/impl/coordinator.go index b6df3d16..c01a2a06 100644 --- a/coordinator/impl/coordinator.go +++ b/coordinator/impl/coordinator.go @@ -54,7 +54,7 @@ type Coordinator interface { NodeAvailabilityListener - FindNodeInfoById(id model.NodeId) (*model.ServerAddress, bool) + FindNodeInfoByInternalAddress(internalAddress string) (*model.ServerAddress, bool) ClusterStatus() model.ClusterStatus } @@ -66,7 +66,7 @@ type coordinator struct { MetadataProvider clusterConfigProvider func() (model.ClusterConfig, error) model.ClusterConfig - nodeIndexes sync.Map + serverIndexes sync.Map clusterConfigChangeCh chan any @@ -104,7 +104,7 @@ func NewCoordinator(metadataProvider MetadataProvider, shardControllers: make(map[int64]ShardController), nodeControllers: make(map[string]NodeController), drainingNodes: make(map[string]NodeController), - nodeIndexes: sync.Map{}, + serverIndexes: sync.Map{}, rpc: rpc, log: slog.With( slog.String("component", "coordinator"), @@ -122,7 +122,7 @@ func NewCoordinator(metadataProvider MetadataProvider, for _, sa := range c.ClusterConfig.Servers { c.nodeControllers[sa.Internal] = NewNodeController(sa, c, c, c.rpc) - c.nodeIndexes.Store(sa.GetNodeId(), sa) + c.serverIndexes.Store(sa.Internal, sa) } if c.clusterStatus == nil { @@ -522,8 +522,8 @@ func (c *coordinator) rebalanceCluster() error { return nil } -func (c *coordinator) FindNodeInfoById(id model.NodeId) (*model.ServerAddress, bool) { - if info, exist := c.nodeIndexes.Load(id); exist { +func (c *coordinator) FindNodeInfoByInternalAddress(internalAddress string) (*model.ServerAddress, bool) { + if info, exist := c.serverIndexes.Load(internalAddress); exist { address, ok := info.(model.ServerAddress) if !ok { panic("unexpected cast") @@ -546,7 +546,7 @@ func (*coordinator) findServerByInternalAddress(newClusterConfig model.ClusterCo func (c *coordinator) checkClusterNodeChanges(newClusterConfig model.ClusterConfig) { // Check for nodes to add for _, sa := range newClusterConfig.Servers { - c.nodeIndexes.Store(sa.GetNodeId(), sa) + c.serverIndexes.Store(sa.Internal, sa) if _, ok := c.nodeControllers[sa.Internal]; ok { continue @@ -571,7 +571,7 @@ func (c *coordinator) checkClusterNodeChanges(newClusterConfig model.ClusterConf } c.log.Info("Detected a removed node", slog.Any("addr", ia)) - c.nodeIndexes.Delete(model.NodeId(ia)) + c.serverIndexes.Delete(ia) // Moved the node delete(c.nodeControllers, ia) nc.SetStatus(Draining) diff --git a/coordinator/impl/shard_controller.go b/coordinator/impl/shard_controller.go index 8e2dee12..c6d64cc1 100644 --- a/coordinator/impl/shard_controller.go +++ b/coordinator/impl/shard_controller.go @@ -382,15 +382,15 @@ func (s *shardController) electLeader() error { func (s *shardController) getRefreshedEnsemble() []model.ServerAddress { currentEnsemble := s.shardMetadata.Ensemble - refreshedEnsembleServiceInfo := make([]model.ServerAddress, len(currentEnsemble)) + refreshedEnsembleServiceAddress := make([]model.ServerAddress, len(currentEnsemble)) for idx, candidate := range currentEnsemble { - if refreshedInfo, exist := s.coordinator.FindNodeInfoById(candidate.GetNodeId()); exist { - refreshedEnsembleServiceInfo[idx] = *refreshedInfo + if refreshedAddress, exist := s.coordinator.FindNodeInfoByInternalAddress(candidate.Internal); exist { + refreshedEnsembleServiceAddress[idx] = *refreshedAddress continue } - refreshedEnsembleServiceInfo[idx] = candidate + refreshedEnsembleServiceAddress[idx] = candidate } - return refreshedEnsembleServiceInfo + return refreshedEnsembleServiceAddress } func (s *shardController) deletingRemovedNodes() error { diff --git a/coordinator/impl/shard_controller_test.go b/coordinator/impl/shard_controller_test.go index 1906b845..8b957217 100644 --- a/coordinator/impl/shard_controller_test.go +++ b/coordinator/impl/shard_controller_test.go @@ -365,7 +365,7 @@ func (m *mockCoordinator) WaitForNextUpdate(ctx context.Context, currentValue *p panic("not implemented") } -func (m *mockCoordinator) FindNodeInfoById(id model.NodeId) (*model.ServerAddress, bool) { +func (m *mockCoordinator) FindNodeInfoByInternalAddress(_ string) (*model.ServerAddress, bool) { return nil, false } diff --git a/coordinator/model/cluster_status.go b/coordinator/model/cluster_status.go index 60177887..defd45f8 100644 --- a/coordinator/model/cluster_status.go +++ b/coordinator/model/cluster_status.go @@ -14,8 +14,6 @@ package model -type NodeId string - type ServerAddress struct { // Public is the endpoint that is advertised to clients Public string `json:"public" yaml:"public"` @@ -24,12 +22,6 @@ type ServerAddress struct { Internal string `json:"internal" yaml:"internal"` } -func (s *ServerAddress) GetNodeId() NodeId { - // use the internal address as the node id by default. - // todo: introduce node id in the future - return NodeId(s.Internal) -} - type Int32HashRange struct { // The minimum inclusive hash that the shard can contain Min uint32 `json:"min"` From 938d095217b82185db08098d1d4c89a3f1a968cc Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Thu, 7 Nov 2024 20:45:21 +0800 Subject: [PATCH 11/18] fix typo --- coordinator/impl/coordinator.go | 4 ++-- coordinator/impl/shard_controller.go | 2 +- coordinator/impl/shard_controller_test.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/coordinator/impl/coordinator.go b/coordinator/impl/coordinator.go index c01a2a06..4835e81e 100644 --- a/coordinator/impl/coordinator.go +++ b/coordinator/impl/coordinator.go @@ -54,7 +54,7 @@ type Coordinator interface { NodeAvailabilityListener - FindNodeInfoByInternalAddress(internalAddress string) (*model.ServerAddress, bool) + FindServerAddressByInternalAddress(internalAddress string) (*model.ServerAddress, bool) ClusterStatus() model.ClusterStatus } @@ -522,7 +522,7 @@ func (c *coordinator) rebalanceCluster() error { return nil } -func (c *coordinator) FindNodeInfoByInternalAddress(internalAddress string) (*model.ServerAddress, bool) { +func (c *coordinator) FindServerAddressByInternalAddress(internalAddress string) (*model.ServerAddress, bool) { if info, exist := c.serverIndexes.Load(internalAddress); exist { address, ok := info.(model.ServerAddress) if !ok { diff --git a/coordinator/impl/shard_controller.go b/coordinator/impl/shard_controller.go index c6d64cc1..885befd4 100644 --- a/coordinator/impl/shard_controller.go +++ b/coordinator/impl/shard_controller.go @@ -384,7 +384,7 @@ func (s *shardController) getRefreshedEnsemble() []model.ServerAddress { currentEnsemble := s.shardMetadata.Ensemble refreshedEnsembleServiceAddress := make([]model.ServerAddress, len(currentEnsemble)) for idx, candidate := range currentEnsemble { - if refreshedAddress, exist := s.coordinator.FindNodeInfoByInternalAddress(candidate.Internal); exist { + if refreshedAddress, exist := s.coordinator.FindServerAddressByInternalAddress(candidate.Internal); exist { refreshedEnsembleServiceAddress[idx] = *refreshedAddress continue } diff --git a/coordinator/impl/shard_controller_test.go b/coordinator/impl/shard_controller_test.go index 8b957217..eeef2850 100644 --- a/coordinator/impl/shard_controller_test.go +++ b/coordinator/impl/shard_controller_test.go @@ -365,7 +365,7 @@ func (m *mockCoordinator) WaitForNextUpdate(ctx context.Context, currentValue *p panic("not implemented") } -func (m *mockCoordinator) FindNodeInfoByInternalAddress(_ string) (*model.ServerAddress, bool) { +func (m *mockCoordinator) FindServerAddressByInternalAddress(_ string) (*model.ServerAddress, bool) { return nil, false } From 4b9f0520f61015858771897d559476342cb23407 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Thu, 7 Nov 2024 20:45:42 +0800 Subject: [PATCH 12/18] fix typo --- coordinator/impl/coordinator.go | 2 +- coordinator/impl/shard_controller.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/coordinator/impl/coordinator.go b/coordinator/impl/coordinator.go index 4835e81e..1885e078 100644 --- a/coordinator/impl/coordinator.go +++ b/coordinator/impl/coordinator.go @@ -477,7 +477,7 @@ func (c *coordinator) handleClusterConfigUpdated() error { } for _, sc := range c.shardControllers { - sc.SyncServerInfo() + sc.SyncServerAddress() } c.ClusterConfig = newClusterConfig diff --git a/coordinator/impl/shard_controller.go b/coordinator/impl/shard_controller.go index 885befd4..5c4a8194 100644 --- a/coordinator/impl/shard_controller.go +++ b/coordinator/impl/shard_controller.go @@ -64,7 +64,7 @@ type ShardController interface { HandleNodeFailure(failedNode model.ServerAddress) - SyncServerInfo() + SyncServerAddress() SwapNode(from model.ServerAddress, to model.ServerAddress) error DeleteShard() @@ -141,7 +141,7 @@ func NewShardController(namespace string, shard int64, namespaceConfig *model.Na s.ctx, s.cancel = context.WithCancel(context.Background()) - s.SyncServerInfo() + s.SyncServerAddress() s.log.Info( "Started shard controller", @@ -890,7 +890,7 @@ func (s *shardController) waitForFollowersToCatchUp(ctx context.Context, leader return nil } -func (s *shardController) SyncServerInfo() { +func (s *shardController) SyncServerAddress() { s.shardMetadataMutex.RLock() exist := false for _, candidate := range s.shardMetadata.Ensemble { From 6697e6f618738e3fd0363d73fe1a306a0000f79f Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Thu, 7 Nov 2024 20:48:11 +0800 Subject: [PATCH 13/18] fix compile error --- coordinator/impl/shard_controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coordinator/impl/shard_controller.go b/coordinator/impl/shard_controller.go index 5c4a8194..db0c5dd1 100644 --- a/coordinator/impl/shard_controller.go +++ b/coordinator/impl/shard_controller.go @@ -894,7 +894,7 @@ func (s *shardController) SyncServerAddress() { s.shardMetadataMutex.RLock() exist := false for _, candidate := range s.shardMetadata.Ensemble { - if newInfo, ok := s.coordinator.FindNodeInfoById(candidate.GetNodeId()); ok { + if newInfo, ok := s.coordinator.FindServerAddressByInternalAddress(candidate.Internal); ok { if *newInfo != candidate { exist = true break From 673aabe6bad38a7c5c3205b27d721ba91e0661dd Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Thu, 7 Nov 2024 20:54:21 +0800 Subject: [PATCH 14/18] sync address first --- coordinator/impl/coordinator.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/coordinator/impl/coordinator.go b/coordinator/impl/coordinator.go index 1885e078..91251059 100644 --- a/coordinator/impl/coordinator.go +++ b/coordinator/impl/coordinator.go @@ -452,6 +452,10 @@ func (c *coordinator) handleClusterConfigUpdated() error { slog.Any("metadataVersion", c.metadataVersion), ) + for _, sc := range c.shardControllers { + sc.SyncServerAddress() + } + c.checkClusterNodeChanges(newClusterConfig) clusterStatus, shardsToAdd, shardsToDelete := applyClusterChanges(&newClusterConfig, c.clusterStatus) @@ -476,10 +480,6 @@ func (c *coordinator) handleClusterConfigUpdated() error { } } - for _, sc := range c.shardControllers { - sc.SyncServerAddress() - } - c.ClusterConfig = newClusterConfig c.clusterStatus = clusterStatus From 8d45a7999b6291e3fae55bf325f9dab2fd4a9878 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Thu, 7 Nov 2024 21:00:33 +0800 Subject: [PATCH 15/18] don't need sync address everytime --- coordinator/impl/shard_controller.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/coordinator/impl/shard_controller.go b/coordinator/impl/shard_controller.go index db0c5dd1..34aa0129 100644 --- a/coordinator/impl/shard_controller.go +++ b/coordinator/impl/shard_controller.go @@ -141,8 +141,6 @@ func NewShardController(namespace string, shard int64, namespaceConfig *model.Na s.ctx, s.cancel = context.WithCancel(context.Background()) - s.SyncServerAddress() - s.log.Info( "Started shard controller", slog.Any("shard-metadata", s.shardMetadata), @@ -175,6 +173,8 @@ func (s *shardController) run() { if !s.verifyCurrentEnsemble() { s.electLeaderWithRetries() + } else { + s.SyncServerAddress() } } From e200f9b78811621df26998294bb8e43a99969e01 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Thu, 7 Nov 2024 21:03:39 +0800 Subject: [PATCH 16/18] fix typo --- coordinator/impl/shard_controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coordinator/impl/shard_controller.go b/coordinator/impl/shard_controller.go index 34aa0129..456d9223 100644 --- a/coordinator/impl/shard_controller.go +++ b/coordinator/impl/shard_controller.go @@ -906,7 +906,7 @@ func (s *shardController) SyncServerAddress() { return } s.shardMetadataMutex.RUnlock() - s.log.Info("node info changed, start a new leader election") + s.log.Info("server address changed, start a new leader election") s.electionOp <- nil } From 6b928557709b5206af0ceb34608817903311378c Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Thu, 7 Nov 2024 21:15:17 +0800 Subject: [PATCH 17/18] add debug log --- coordinator/impl/shard_controller.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/coordinator/impl/shard_controller.go b/coordinator/impl/shard_controller.go index 456d9223..80e9bfa4 100644 --- a/coordinator/impl/shard_controller.go +++ b/coordinator/impl/shard_controller.go @@ -20,6 +20,7 @@ import ( "io" "log/slog" "math/rand" + "reflect" "sync" "time" @@ -390,6 +391,12 @@ func (s *shardController) getRefreshedEnsemble() []model.ServerAddress { } refreshedEnsembleServiceAddress[idx] = candidate } + if s.log.Enabled(s.ctx, slog.LevelDebug) { + if !reflect.DeepEqual(currentEnsemble, refreshedEnsembleServiceAddress) { + s.log.Info("refresh the shard ensemble server address", slog.Any("current-ensemble", currentEnsemble), + slog.Any("new-ensemble", refreshedEnsembleServiceAddress)) + } + } return refreshedEnsembleServiceAddress } From 17b7199d901555fdf8f067c6b74358aebbb02bc1 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Thu, 7 Nov 2024 21:18:14 +0800 Subject: [PATCH 18/18] use internal address as logical node id --- coordinator/impl/shard_controller.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/coordinator/impl/shard_controller.go b/coordinator/impl/shard_controller.go index 80e9bfa4..36ebf421 100644 --- a/coordinator/impl/shard_controller.go +++ b/coordinator/impl/shard_controller.go @@ -919,7 +919,7 @@ func (s *shardController) SyncServerAddress() { func listContains(list []model.ServerAddress, sa model.ServerAddress) bool { for _, item := range list { - if item.Public == sa.Public && item.Internal == sa.Internal { + if item.Internal == sa.Internal { return true } } @@ -938,7 +938,7 @@ func mergeLists[T any](lists ...[]T) []T { func replaceInList(list []model.ServerAddress, oldServerAddress, newServerAddress model.ServerAddress) []model.ServerAddress { var res []model.ServerAddress for _, item := range list { - if item.Public != oldServerAddress.Public && item.Internal != oldServerAddress.Internal { + if item.Internal != oldServerAddress.Internal { res = append(res, item) } }