Skip to content

Commit

Permalink
fix: use internal address as logical node id for balancer
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao committed Nov 9, 2024
1 parent fb90db8 commit 6e1a46a
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 48 deletions.
80 changes: 41 additions & 39 deletions coordinator/impl/cluster_rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ type SwapNodeAction struct {
To model.ServerAddress
}

type ServerContext struct {
Addr model.ServerAddress
Shards common.Set[int64]
}

// Make sure every server is assigned a similar number of shards
// Output a list of actions to be taken to rebalance the cluster.
func rebalanceCluster(servers []model.ServerAddress, currentStatus *model.ClusterStatus) []SwapNodeAction { //nolint:revive
Expand All @@ -49,11 +54,11 @@ outer:
}
if len(deletedServers) > 0 {
slog.Debug("Deleted servers: ")
for ds, shards := range deletedServers {
for ds, context := range deletedServers {
slog.Debug(
"",
slog.String("server", ds.Internal),
slog.Int("count", shards.Count()),
slog.String("server", ds),
slog.Int("count", context.Shards.Count()),
)
}
}
Expand All @@ -62,26 +67,26 @@ outer:
// First try to reassign shards from the removed servers.
// We do it one by one, by placing in the lead loaded server
if len(deletedServers) > 0 {
ds, shards := getFirstEntry(deletedServers)
id, context := getFirstEntry(deletedServers)

for j := serversCount - 1; j >= 0; j-- {
to := rankings[j]
eligibleShards := shards.Complement(to.Shards)
eligibleShards := context.Shards.Complement(to.Shards)

if !eligibleShards.IsEmpty() {
a := SwapNodeAction{
Shard: eligibleShards.GetSorted()[0],
From: ds,
From: context.Addr,
To: to.Addr,
}

shards.Remove(a.Shard)
if shards.IsEmpty() {
delete(deletedServers, ds)
context.Shards.Remove(a.Shard)
if context.Shards.IsEmpty() {
delete(deletedServers, id)
} else {
deletedServers[ds] = shards
deletedServers[id] = context
}
shardsPerServer[a.To].Add(a.Shard)
shardsPerServer[a.To.Internal].Shards.Add(a.Shard)

slog.Debug(
"Transfer from removed node",
Expand Down Expand Up @@ -117,8 +122,8 @@ outer:
To: leastLoaded.Addr,
}

shardsPerServer[a.From].Remove(a.Shard)
shardsPerServer[a.To].Add(a.Shard)
shardsPerServer[a.From.Internal].Shards.Remove(a.Shard)
shardsPerServer[a.To.Internal].Shards.Add(a.Shard)

slog.Debug(
"Swapping nodes",
Expand All @@ -132,49 +137,46 @@ outer:
}

func getShardsPerServer(servers []model.ServerAddress, currentStatus *model.ClusterStatus) (
existingServers map[model.ServerAddress]common.Set[int64],
deletedServers map[model.ServerAddress]common.Set[int64]) {
existingServers = map[model.ServerAddress]common.Set[int64]{}
deletedServers = map[model.ServerAddress]common.Set[int64]{}
existingServers map[string]ServerContext,
deletedServers map[string]ServerContext) {
existingServers = map[string]ServerContext{}
deletedServers = map[string]ServerContext{}

for _, s := range servers {
existingServers[s] = common.NewSet[int64]()
existingServers[s.Internal] = ServerContext{
Addr: s,
Shards: common.NewSet[int64](),
}
}

for _, nss := range currentStatus.Namespaces {
for shardId, shard := range nss.Shards {
for _, addr := range shard.Ensemble {
if _, ok := existingServers[addr]; ok {
existingServers[addr].Add(shardId)
for _, candidate := range shard.Ensemble {
if _, ok := existingServers[candidate.Internal]; ok {
existingServers[candidate.Internal].Shards.Add(shardId)
continue
}

// This server is getting removed
if _, ok := deletedServers[addr]; !ok {
deletedServers[addr] = common.NewSet[int64]()
if _, ok := deletedServers[candidate.Internal]; !ok {
deletedServers[candidate.Internal] = ServerContext{
Addr: candidate,
Shards: common.NewSet[int64]()}
}

deletedServers[addr].Add(shardId)
deletedServers[candidate.Internal].Shards.Add(shardId)
}
}
}

return existingServers, deletedServers
}

type ServerRank struct {
Addr model.ServerAddress
Shards common.Set[int64]
}

func getServerRanking(shardsPerServer map[model.ServerAddress]common.Set[int64]) []ServerRank {
res := make([]ServerRank, 0)
func getServerRanking(shardsPerServer map[string]ServerContext) []ServerContext {
res := make([]ServerContext, 0)

for server, shards := range shardsPerServer {
res = append(res, ServerRank{
Addr: server,
Shards: shards,
})
for _, context := range shardsPerServer {
res = append(res, context)
}

// Rank the servers from the one with most shards to the one with the least
Expand All @@ -191,14 +193,14 @@ func getServerRanking(shardsPerServer map[model.ServerAddress]common.Set[int64])
return res
}

func getFirstEntry(m map[model.ServerAddress]common.Set[int64]) (model.ServerAddress, common.Set[int64]) {
keys := make([]model.ServerAddress, 0, len(m))
func getFirstEntry(m map[string]ServerContext) (string, ServerContext) {
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}

sort.SliceStable(keys, func(i, j int) bool {
return keys[i].Internal < keys[j].Internal
return keys[i] < keys[j]
})

return keys[0], m[keys[0]]
Expand Down
29 changes: 22 additions & 7 deletions coordinator/impl/cluster_rebalance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,30 @@ func TestClusterRebalance_Count(t *testing.T) {

count, deletedServers := getShardsPerServer([]model.ServerAddress{s1, s2, s3, s4, s5}, cs)

assert.Equal(t, map[model.ServerAddress]common.Set[int64]{
s1: common.NewSetFrom[int64]([]int64{0, 1, 2}),
s2: common.NewSetFrom[int64]([]int64{0, 1}),
s3: common.NewSetFrom[int64]([]int64{0, 2}),
s4: common.NewSetFrom[int64]([]int64{1, 2}),
s5: common.NewSet[int64](),
assert.Equal(t, map[string]ServerContext{
s1.Internal: {
s1,
common.NewSetFrom[int64]([]int64{0, 1, 2}),
},
s2.Internal: {
s2,
common.NewSetFrom[int64]([]int64{0, 1}),
},
s3.Internal: {
s3,
common.NewSetFrom[int64]([]int64{0, 2}),
},
s4.Internal: {
s4,
common.NewSetFrom[int64]([]int64{1, 2}),
},
s5.Internal: {
s5,
common.NewSet[int64](),
},
}, count)

assert.Equal(t, map[model.ServerAddress]common.Set[int64]{}, deletedServers)
assert.Equal(t, map[string]ServerContext{}, deletedServers)
}

func TestClusterRebalance_Single(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions coordinator/impl/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,12 +452,12 @@ func (c *coordinator) handleClusterConfigUpdated() error {
slog.Any("metadataVersion", c.metadataVersion),
)

c.checkClusterNodeChanges(newClusterConfig)

for _, sc := range c.shardControllers {
sc.SyncServerAddress()
}

c.checkClusterNodeChanges(newClusterConfig)

clusterStatus, shardsToAdd, shardsToDelete := applyClusterChanges(&newClusterConfig, c.clusterStatus)

for shard, namespace := range shardsToAdd {
Expand Down
22 changes: 22 additions & 0 deletions coordinator/impl/coordinator_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,28 @@ func TestCoordinator_RefreshServerInfo(t *testing.T) {
clusterConfig.Servers = clusterServer
configChangesCh <- nil

removeNodeHappened := false
timeout, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
outer:
for {
select {
case <-timeout.Done():
break outer
default:
time.Sleep(100 * time.Millisecond)
for _, ns := range c.ClusterStatus().Namespaces {
for _, shard := range ns.Shards {
if shard.RemovedNodes == nil || len(shard.RemovedNodes) == 0 {
continue
}
removeNodeHappened = true
}
}
}
}
cancelFunc()
assert.False(t, removeNodeHappened)

assert.Eventually(t, func() bool {
for _, ns := range c.ClusterStatus().Namespaces {
for _, shard := range ns.Shards {
Expand Down

0 comments on commit 6e1a46a

Please sign in to comment.