Skip to content

Commit

Permalink
fix: Choose wrong shard leader during balance channel(#29525) (#29532)
Browse files Browse the repository at this point in the history
issue: #29523
pr: #29525

readable shard leader should still be the old one during channel
balance, if the new shard leader is not ready.
This PR fixed that query coord choose wrong shard leader during balance
channel

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Dec 28, 2023
1 parent 687eb39 commit 07ef52e
Showing 1 changed file with 13 additions and 6 deletions.
19 changes: 13 additions & 6 deletions internal/querycoordv2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -881,9 +881,8 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade
log := log.With(zap.String("channel", channel.GetChannelName()))

leaders := s.dist.LeaderViewManager.GetLeadersByShard(channel.GetChannelName())
leaders = filterDupLeaders(s.meta.ReplicaManager, leaders)
ids := make([]int64, 0, len(leaders))
addrs := make([]string, 0, len(leaders))

readableLeaders := make(map[int64]*meta.LeaderView)

var channelErr error
if len(leaders) == 0 {
Expand Down Expand Up @@ -944,11 +943,10 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade
continue
}

ids = append(ids, info.ID())
addrs = append(addrs, info.Addr())
readableLeaders[leader.ID] = leader
}

if len(ids) == 0 {
if len(readableLeaders) == 0 {
msg := fmt.Sprintf("channel %s is not available in any replica", channel.GetChannelName())
log.Warn(msg, zap.Error(channelErr))
resp.Status = merr.Status(
Expand All @@ -957,6 +955,15 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade
return resp, nil
}

readableLeaders = filterDupLeaders(s.meta.ReplicaManager, readableLeaders)
ids := make([]int64, 0, len(leaders))
addrs := make([]string, 0, len(leaders))
for _, leader := range readableLeaders {
info := s.nodeMgr.Get(leader.ID)
ids = append(ids, info.ID())
addrs = append(addrs, info.Addr())
}

resp.Shards = append(resp.Shards, &querypb.ShardLeadersList{
ChannelName: channel.GetChannelName(),
NodeIds: ids,
Expand Down

0 comments on commit 07ef52e

Please sign in to comment.