Skip to content

Commit

Permalink
Adjusted tests
Browse files Browse the repository at this point in the history
  • Loading branch information
szczygiel-m committed Feb 27, 2025
1 parent 84c4411 commit 044913a
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ public List<KafkaParameters> toRemoteKafkaProperties(
.getRemoteDatacenters();

return this.clusters.stream()
.filter(cluster -> remoteDatacenters.contains(cluster.getDatacenter()))
.filter(
cluster ->
remoteDatacenters.contains(cluster.getDatacenter())
&& !cluster.getDatacenter().equals(currentDatacenterName))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,31 @@ public void shouldThrowExceptionForInvalidDatacenterName() {
IllegalArgumentException.class,
() -> kafkaClustersProperties.toRemoteKafkaProperties(datacenterNameProvider));
}

@Test
public void shouldFilterOutLocalDatacenterFromRemoteDatacenters() {
// given
KafkaProperties localCluster = new KafkaProperties();
localCluster.setDatacenter("dc1");
// Misconfigured: remote datacenters list contains "dc1" itself.
localCluster.setRemoteDatacenters(List.of("dc1", "dc2"));

KafkaProperties remoteCluster = new KafkaProperties();
remoteCluster.setDatacenter("dc2");

kafkaClustersProperties.setClusters(Arrays.asList(localCluster, remoteCluster));
when(datacenterNameProvider.getDatacenterName()).thenReturn("dc1");

// when
List<KafkaParameters> remoteKafkaProperties =
kafkaClustersProperties.toRemoteKafkaProperties(datacenterNameProvider);

// then
Set<String> remoteDCs =
remoteKafkaProperties.stream()
.map(KafkaParameters::getDatacenter)
.collect(Collectors.toSet());
assertEquals(1, remoteKafkaProperties.size());
assertEquals(Set.of("dc2"), remoteDCs);
}
}

0 comments on commit 044913a

Please sign in to comment.