diff --git a/client/src/main/java/io/streamnative/oxia/client/shard/ShardManager.java b/client/src/main/java/io/streamnative/oxia/client/shard/ShardManager.java index 5ce908de..b65bc24b 100644 --- a/client/src/main/java/io/streamnative/oxia/client/shard/ShardManager.java +++ b/client/src/main/java/io/streamnative/oxia/client/shard/ShardManager.java @@ -239,6 +239,7 @@ static ShardAssignmentChanges computeShardLeaderChanges( oldAssignments.values().stream() .filter(s -> newAssignments.containsKey(s.id())) .filter(s -> !newAssignments.get(s.id()).leader().equals(s.leader())) + .map(s -> newAssignments.get(s.id())) // return reassigned leader .collect(toSet()); return new ShardAssignmentChanges( unmodifiableSet(added), unmodifiableSet(removed), unmodifiableSet(changed)); diff --git a/client/src/test/java/io/streamnative/oxia/client/shard/ShardManagerTest.java b/client/src/test/java/io/streamnative/oxia/client/shard/ShardManagerTest.java index 62046b40..43c9a40a 100644 --- a/client/src/test/java/io/streamnative/oxia/client/shard/ShardManagerTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/shard/ShardManagerTest.java @@ -119,6 +119,15 @@ void recomputeShardHashBoundariesWithSameValue() { assertThat(a).isUnmodifiable(); }); } + + @Test + void recomputeAssignmentsWithSameBoundariesAndDiffLeader() { + final var existing = Map.of(1L, new Shard(1L, "leader 1", new HashRange(0, 4294967295L))); + final var updates = Set.of(new Shard(1L, "leader 2", new HashRange(0, 4294967295L))); + final var updatedMap = ShardManager.recomputeShardHashBoundaries(existing, updates); + final var changes = ShardManager.computeShardLeaderChanges(existing, updatedMap); + assertThat(changes.reassigned()).isEqualTo(updates); + } } @Nested