Skip to content

Commit

Permalink
Integration tests improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
AnatolyPopov committed Nov 25, 2024
1 parent a31b0ca commit d9a235a
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@

import java.io.File;
import java.io.IOException;
import java.net.ConnectException;
import java.net.ServerSocket;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
Expand All @@ -45,6 +43,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.connect.json.JsonDeserializer;

Expand Down Expand Up @@ -132,113 +131,74 @@ static List<Integer> getKafkaListenerPorts() throws IOException {
}
}

static List<String> consumeMessages(final String topic, final int expectedMessageCount,
final String bootstrapServers) {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(topic));
final List<String> messages = new ArrayList<>();

// Poll messages from the topic
while (messages.size() < expectedMessageCount) {
final ConsumerRecords<byte[], byte[]> records = consumer.poll(5L);
for (final ConsumerRecord<byte[], byte[]> record : records) {
messages.add(new String(record.value(), StandardCharsets.UTF_8));
}
}

return messages;
}
static List<String> consumeByteMessages(final String topic, final int expectedMessageCount,
String bootstrapServers) {
final Properties consumerProperties = getConsumerProperties(bootstrapServers, ByteArrayDeserializer.class,
ByteArrayDeserializer.class);
final List<byte[]> objects = consumeMessages(topic, expectedMessageCount, consumerProperties);
return objects.stream().map(String::new).collect(Collectors.toList());
}

static List<GenericRecord> consumeAvroMessages(final String topic, final int expectedMessageCount,
final String bootstrapServers, final String schemaRegistryUrl) {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group-avro");
// Assuming string key
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Avro deserializer for values
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("schema.registry.url", schemaRegistryUrl); // URL of the schema registry
props.put("specific.avro.reader", "false"); // Use GenericRecord instead of specific Avro classes

try (KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(topic));
final List<GenericRecord> recordsList = new ArrayList<>();

// Poll messages from the topic
while (recordsList.size() < expectedMessageCount) {
final ConsumerRecords<String, GenericRecord> records = consumer.poll(500L);
for (final ConsumerRecord<String, GenericRecord> record : records) {
recordsList.add(record.value());
}
}

return recordsList;
}
final Properties consumerProperties = getConsumerProperties(bootstrapServers, StringDeserializer.class,
KafkaAvroDeserializer.class, schemaRegistryUrl);
return consumeMessages(topic, expectedMessageCount, consumerProperties);
}

static List<JsonNode> consumeJsonMessages(final String topic, final int expectedMessageCount,
final String bootstrapServers) {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group-json");
// Assuming string key
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Json deserializer for values
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
final Properties consumerProperties = getConsumerProperties(bootstrapServers, StringDeserializer.class,
JsonDeserializer.class);
return consumeMessages(topic, expectedMessageCount, consumerProperties);
}

try (KafkaConsumer<String, JsonNode> consumer = new KafkaConsumer<>(props)) {
static <K, V> List<V> consumeMessages(final String topic, final int expectedMessageCount,
final Properties consumerProperties) {
try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(consumerProperties)) {
consumer.subscribe(Collections.singletonList(topic));
final List<JsonNode> recordsList = new ArrayList<>();

// Poll messages from the topic
while (recordsList.size() < expectedMessageCount) {
final ConsumerRecords<String, JsonNode> records = consumer.poll(500L);
for (final ConsumerRecord<String, JsonNode> record : records) {
recordsList.add(record.value()); // Add the GenericRecord to the list
final List<V> recordValues = new ArrayList<>();
await().atMost(Duration.ofMinutes(2)).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
final ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(500L));
for (final ConsumerRecord<K, V> record : records) {
recordValues.add(record.value());
}
}
assertThat(recordValues).hasSize(expectedMessageCount);
});
return recordValues;
}
}

return recordsList;
static Map<String, Object> consumeOffsetMessages(KafkaConsumer<byte[], byte[]> consumer) throws IOException {
// Poll messages from the topic
final Map<String, Object> messages = new HashMap<>();
final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(1));
for (final ConsumerRecord<byte[], byte[]> record : records) {
Map<String, Object> offsetRec = OBJECT_MAPPER.readValue(record.value(), new TypeReference<>() { // NOPMD
});
messages.putAll(offsetRec);
}
return messages;
}

static <K, V> Properties getConsumerProperties(String bootstrapServers,
Class<? extends Deserializer<K>> keyDeserializer, Class<? extends Deserializer<V>> valueDeserializer,
String schemaRegistryUrl) {
final Properties props = getConsumerProperties(bootstrapServers, keyDeserializer, valueDeserializer);
props.put("specific.avro.reader", "false"); // Use GenericRecord instead of specific Avro classes
props.put("schema.registry.url", schemaRegistryUrl); // URL of the schema registry
return props;
}

static Map<String, Object> consumeOffsetStorageMessages(final String topic, final int expectedMessageCount,
final String bootstrapServer) throws ConnectException {
static <K, V> Properties getConsumerProperties(String bootstrapServers,
Class<? extends Deserializer<K>> keyDeserializer, Class<? extends Deserializer<V>> valueDeserializer) {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(topic));

// Poll messages from the topic
final Map<String, Object> messages = new HashMap<>();
while (messages.size() < expectedMessageCount) {
final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(5L));
for (final ConsumerRecord<byte[], byte[]> record : records) {
Map<String, Object> offsetRec = OBJECT_MAPPER.readValue(record.value(), new TypeReference<>() { // NOPMD
});
messages.putAll(offsetRec);
}
}
return messages;

} catch (IOException e) {
throw new ConnectException("Error while consuming messages " + e.getMessage());
}
return props;
}
}
Loading

0 comments on commit d9a235a

Please sign in to comment.