Skip to content

Commit

Permalink
Integration tests framework mszczygiel part4 (#1804)
Browse files Browse the repository at this point in the history
Rewriting tests
  • Loading branch information
szczygiel-m authored Dec 29, 2023
1 parent ea65ff7 commit 997b7fa
Show file tree
Hide file tree
Showing 26 changed files with 1,810 additions and 1,520 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package pl.allegro.tech.hermes.env;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId;
import pl.allegro.tech.hermes.common.kafka.JsonToAvroMigrationKafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.KafkaTopic;
import pl.allegro.tech.hermes.common.kafka.KafkaTopicName;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.stream.Collectors.toMap;
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL;
import static org.apache.kafka.clients.CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topic;

public class BrokerOperations {

private static final int DEFAULT_PARTITIONS = 2;
private static final int DEFAULT_REPLICATION_FACTOR = 1;

private final AdminClient adminClient;

private final KafkaNamesMapper kafkaNamesMapper;

public BrokerOperations(String brokerList, String namespace) {
this.adminClient = brokerAdminClient(brokerList);
String namespaceSeparator = "_";
this.kafkaNamesMapper = new JsonToAvroMigrationKafkaNamesMapper(namespace, namespaceSeparator);
}

public List<ConsumerGroupOffset> getTopicPartitionsOffsets(SubscriptionName subscriptionName) {
ConsumerGroupId consumerGroupId = kafkaNamesMapper.toConsumerGroupId(subscriptionName);

Map<TopicPartition, OffsetAndMetadata> currentOffsets = getTopicCurrentOffsets(consumerGroupId);
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets = getEndOffsets(new ArrayList<>(currentOffsets.keySet()));
return currentOffsets.keySet()
.stream()
.map(partition -> new ConsumerGroupOffset(
currentOffsets.get(partition).offset(),
endOffsets.get(partition).offset())
).collect(Collectors.toList());
}

private Map<TopicPartition, OffsetAndMetadata> getTopicCurrentOffsets(ConsumerGroupId consumerGroupId) {
try {
return adminClient.listConsumerGroupOffsets(consumerGroupId.asString()).partitionsToOffsetAndMetadata().get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> getEndOffsets(List<TopicPartition> partitions) {
try {
ListOffsetsResult listOffsetsResult = adminClient.listOffsets(
partitions.stream().collect(toMap(Function.identity(), p -> OffsetSpec.latest())));
return listOffsetsResult.all().get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public void createTopic(String topicName) {
Topic topic = topic(topicName).build();
kafkaNamesMapper.toKafkaTopics(topic)
.forEach(kafkaTopic -> createTopic(kafkaTopic.name()));
}

private void createTopic(KafkaTopicName topicName) {
try {
NewTopic topic = new NewTopic(topicName.asString(), DEFAULT_PARTITIONS, (short) DEFAULT_REPLICATION_FACTOR);
adminClient.createTopics(singletonList(topic))
.all()
.get(1, MINUTES);
} catch (ExecutionException | TimeoutException | InterruptedException e) {
throw new RuntimeException(e);
}
}

public boolean topicExists(String topicName) {
Topic topic = topic(topicName).build();
return kafkaNamesMapper.toKafkaTopics(topic)
.allMatch(this::topicExists);
}

private boolean topicExists(KafkaTopic kafkaTopic) {
try {
return adminClient
.listTopics()
.names()
.get(1, MINUTES)
.contains(kafkaTopic.name().asString());
} catch (ExecutionException | TimeoutException | InterruptedException e) {
throw new RuntimeException(e);
}
}

private AdminClient brokerAdminClient(String brokerList) {
Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(SECURITY_PROTOCOL_CONFIG, DEFAULT_SECURITY_PROTOCOL);
props.put(REQUEST_TIMEOUT_MS_CONFIG, 10000);
return AdminClient.create(props);
}

public static class ConsumerGroupOffset {
private final long currentOffset;
private final long endOffset;

ConsumerGroupOffset(long currentOffset, long endOffset) {
this.currentOffset = currentOffset;
this.endOffset = endOffset;
}

public boolean movedToEnd() {
return currentOffset == endOffset;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pl.allegro.tech.hermes.api.Group;
import pl.allegro.tech.hermes.api.MessageFiltersVerificationInput;
import pl.allegro.tech.hermes.api.OAuthProvider;
import pl.allegro.tech.hermes.api.OfflineRetransmissionRequest;
import pl.allegro.tech.hermes.api.OffsetRetransmissionDate;
import pl.allegro.tech.hermes.api.PatchData;
import pl.allegro.tech.hermes.api.Subscription;
Expand Down Expand Up @@ -91,8 +92,8 @@ public WebTestClient.ResponseSpec getSubscriptionMetrics(String topicQualifiedNa
return managementTestClient.getSubscriptionMetrics(topicQualifiedName, subscriptionName);
}

public void suspendSubscription(Topic topic, String subscription) {
managementTestClient.updateSubscriptionState(topic, subscription, Subscription.State.SUSPENDED)
public WebTestClient.ResponseSpec suspendSubscription(Topic topic, String subscription) {
return managementTestClient.updateSubscriptionState(topic, subscription, Subscription.State.SUSPENDED)
.expectStatus()
.is2xxSuccessful();
}
Expand Down Expand Up @@ -141,10 +142,8 @@ public int publishAvroUntilSuccess(String topicQualifiedName, byte[] body, Multi
return frontendTestClient.publishAvroUntilSuccess(topicQualifiedName, body, headers);
}

public void updateSubscription(Topic topic, String subscription, PatchData patch) {
managementTestClient.updateSubscription(topic, subscription, patch)
.expectStatus()
.is2xxSuccessful();
public WebTestClient.ResponseSpec updateSubscription(Topic topic, String subscription, PatchData patch) {
return managementTestClient.updateSubscription(topic, subscription, patch);
}

public WebTestClient.ResponseSpec publish(String topicQualifiedName, String body, MultiValueMap<String, String> headers) {
Expand Down Expand Up @@ -236,7 +235,7 @@ public WebTestClient.ResponseSpec getTopicMetrics(String qualifiedName) {
}

public WebTestClient.ResponseSpec listSubscriptions(String qualifiedName) {
return managementTestClient.listSubscriptions(qualifiedName);
return managementTestClient.listSubscriptions(qualifiedName, false);
}

public WebTestClient.ResponseSpec listTopics(String groupName) {
Expand Down Expand Up @@ -370,4 +369,56 @@ public WebTestClient.ResponseSpec updateOAuthProvider(String name, PatchData pat
public WebTestClient.ResponseSpec searchOwners(String source, String searchString) {
return managementTestClient.searchOwners(source, searchString);
}

public WebTestClient.ResponseSpec setMode(String mode) {
return managementTestClient.setMode(mode);
}

public WebTestClient.ResponseSpec getOfflineRetransmissionTasks() {
return managementTestClient.getOfflineRetransmissionTasks();
}

public WebTestClient.ResponseSpec deleteOfflineRetransmissionTask(String taskId) {
return managementTestClient.deleteOfflineRetransmissionTask(taskId);
}

public WebTestClient.ResponseSpec createOfflineRetransmissionTask(OfflineRetransmissionRequest request) {
return managementTestClient.createOfflineRetransmissionTask(request);
}

public WebTestClient.ResponseSpec createSubscription(Subscription subscription) {
return managementTestClient.createSubscription(subscription);
}

public WebTestClient.ResponseSpec listTrackedSubscriptions(String qualifiedName) {
return managementTestClient.listSubscriptions(qualifiedName, true);
}

public WebTestClient.ResponseSpec querySubscriptions(String qualifiedName, String query) {
return managementTestClient.querySubscriptions(qualifiedName, query);
}

public WebTestClient.ResponseSpec getSubscriptionHealth(String qualifiedTopicName, String name) {
return managementTestClient.getSubscriptionHealth(qualifiedTopicName, name);
}

public WebTestClient.ResponseSpec getConsumerGroupsDescription(String qualifiedTopicName, String subscriptionName) {
return managementTestClient.getConsumerGroupsDescription(qualifiedTopicName, subscriptionName);
}

public WebTestClient.ResponseSpec deleteGroup(String groupName) {
return managementTestClient.deleteGroup(groupName);
}

public WebTestClient.ResponseSpec updateGroup(String groupName, Group group) {
return managementTestClient.updateGroup(groupName, group);
}

public List<String> getGroups() {
return managementTestClient.getGroups();
}

public WebTestClient.ResponseSpec moveOffsetsToTheEnd(String topicQualifiedName, String subscriptionName) {
return managementTestClient.moveOffsetsToTheEnd(topicQualifiedName, subscriptionName);
}
}
Loading

0 comments on commit 997b7fa

Please sign in to comment.