diff --git a/README.md b/README.md index f775c5c480..f183866fbd 100644 --- a/README.md +++ b/README.md @@ -41,4 +41,32 @@ jmx address: 192.168.50.224:15905 ================================================= ``` -Noted that the command to set up broker can be executed multiple times to create a broker cluster. \ No newline at end of file +Noted that the command to set up broker can be executed multiple times to create a broker cluster. + +# Kafka tools + +This project offers many kafka tools to simplify the life for kafka users. + +## Latency Benchmark + +This tool is used to test following latencies. +1. producer latency: the time of completing producer data request +2. E2E latency: the time for a record to travel through Kafka + +Run the benchmark from source code +```shell +./gradlew run --args="Latency --bootstrap.servers 192.168.50.224:18878" +``` + +Run the benchmark from release +```shell +./bin/App Latency --bootstrap.servers 192.168.50.224:18878 +``` + +### Latency Benchmark Configurations +1. --bootstrap.servers: the brokers addresses +2. --consumers: the number of consumers (threads). Default: 1 +3. --producers: the number of producers (threads). Default: 1 +4. --valueSize: the size of record value. Default: 100 bytes +5. --duration: the duration to run this benchmark. Default: 5 seconds +6. --flushDuration: the duration to flush producer records. Default: 2 seconds diff --git a/app/build.gradle b/app/build.gradle index 3e14b02446..70d1b73fcd 100644 --- a/app/build.gradle +++ b/app/build.gradle @@ -27,6 +27,7 @@ dependencies { application { // Define the main class for the application. mainClass = 'org.astraea.App' + applicationDefaultJvmArgs = ["-server", "-XX:+UseG1GC", "-Djava.awt.headless=true", "-Xms4G", "-Xmx4G"] } java { diff --git a/app/src/main/java/org/astraea/App.java b/app/src/main/java/org/astraea/App.java index eb8dd5d6f8..cf2d1faef1 100644 --- a/app/src/main/java/org/astraea/App.java +++ b/app/src/main/java/org/astraea/App.java @@ -4,12 +4,10 @@ import java.util.List; import java.util.function.Supplier; import java.util.stream.Collectors; +import org.astraea.performance.latency.End2EndLatency; public class App { - private static final List> MAIN_CLASSES = - Arrays.asList( - // add the classes having main function - ); + private static final List> MAIN_CLASSES = Arrays.asList(End2EndLatency.class); private static String toString(List> mains) { return mains.stream().map(Class::getName).collect(Collectors.joining(",")); diff --git a/app/src/main/java/org/astraea/performance/latency/CloseableThread.java b/app/src/main/java/org/astraea/performance/latency/CloseableThread.java new file mode 100644 index 0000000000..baa3afb95e --- /dev/null +++ b/app/src/main/java/org/astraea/performance/latency/CloseableThread.java @@ -0,0 +1,41 @@ +package org.astraea.performance.latency; + +import java.io.Closeable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +abstract class CloseableThread implements Runnable, Closeable { + private final AtomicBoolean closed = new AtomicBoolean(); + private final CountDownLatch closeLatch = new CountDownLatch(1); + + @Override + public final void run() { + try { + while (!closed.get()) execute(); + } catch (InterruptedException e) { + // swallow + } finally { + try { + cleanup(); + } finally { + closeLatch.countDown(); + } + } + } + + /** looped action. */ + abstract void execute() throws InterruptedException; + + /** final action when leaving loop. */ + void cleanup() {} + + @Override + public void close() { + closed.set(true); + try { + closeLatch.await(); + } catch (InterruptedException e) { + // swallow + } + } +} diff --git a/app/src/main/java/org/astraea/performance/latency/ComponentFactory.java b/app/src/main/java/org/astraea/performance/latency/ComponentFactory.java new file mode 100644 index 0000000000..fe0ac24baa --- /dev/null +++ b/app/src/main/java/org/astraea/performance/latency/ComponentFactory.java @@ -0,0 +1,58 @@ +package org.astraea.performance.latency; + +import java.util.Properties; +import java.util.Set; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; + +interface ComponentFactory { + + /** + * create a factory based on kafka cluster. All components created by this factory will send + * request to kafka to get responses. + * + * @param brokers kafka broker addresses + * @param topics to subscribe + * @return a factory based on kafka + */ + static ComponentFactory fromKafka(String brokers, Set topics) { + + return new ComponentFactory() { + private final String groupId = "group-id-" + System.currentTimeMillis(); + + @Override + public Producer producer() { + var props = new Properties(); + props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + return Producer.fromKafka(props); + } + + @Override + public Consumer createConsumer() { + var props = new Properties(); + props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokers); + // all consumers are in same group, so there is no duplicate data in read workload. + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + return Consumer.fromKafka(props, topics); + } + + @Override + public TopicAdmin createTopicAdmin() { + var props = new Properties(); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + return TopicAdmin.fromKafka(props); + } + }; + } + + /** @return a new producer. Please close it when you don't need it. */ + Producer producer(); + + /** @return a new consumer. Please close it when you don't need it */ + Consumer createConsumer(); + + /** @return a new topic admin. Please close it when you don't need it. */ + TopicAdmin createTopicAdmin(); +} diff --git a/app/src/main/java/org/astraea/performance/latency/Consumer.java b/app/src/main/java/org/astraea/performance/latency/Consumer.java new file mode 100644 index 0000000000..0bae78476c --- /dev/null +++ b/app/src/main/java/org/astraea/performance/latency/Consumer.java @@ -0,0 +1,45 @@ +package org.astraea.performance.latency; + +import java.io.Closeable; +import java.time.Duration; +import java.util.Properties; +import java.util.Set; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; + +interface Consumer extends Closeable { + Duration POLL_TIMEOUT = Duration.ofMillis(500); + + static Consumer fromKafka(Properties props, Set topics) { + var kafkaConsumer = + new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer()); + kafkaConsumer.subscribe(topics); + return new Consumer() { + + @Override + public ConsumerRecords poll() { + return kafkaConsumer.poll(POLL_TIMEOUT); + } + + @Override + public void wakeup() { + kafkaConsumer.wakeup(); + } + + @Override + public void close() { + kafkaConsumer.close(); + } + }; + } + + /** see {@link KafkaConsumer#poll(Duration)} */ + ConsumerRecords poll(); + + /** see {@link KafkaConsumer#wakeup()} */ + default void wakeup() {} + + @Override + default void close() {} +} diff --git a/app/src/main/java/org/astraea/performance/latency/ConsumerThread.java b/app/src/main/java/org/astraea/performance/latency/ConsumerThread.java new file mode 100644 index 0000000000..2a04c04dd2 --- /dev/null +++ b/app/src/main/java/org/astraea/performance/latency/ConsumerThread.java @@ -0,0 +1,46 @@ +package org.astraea.performance.latency; + +import java.util.Objects; + +class ConsumerThread extends CloseableThread { + + private final Consumer consumer; + private final DataManager dataManager; + private final MeterTracker tracker; + + ConsumerThread(DataManager dataManager, MeterTracker tracker, Consumer consumer) { + this.dataManager = Objects.requireNonNull(dataManager); + this.consumer = Objects.requireNonNull(consumer); + this.tracker = Objects.requireNonNull(tracker); + } + + @Override + public void execute() throws InterruptedException { + try { + var now = System.currentTimeMillis(); + var records = consumer.poll(); + records.forEach( + record -> { + var entry = dataManager.removeSendingRecord(record.key()); + var latency = now - entry.getValue(); + var produceRecord = entry.getKey(); + if (!KafkaUtils.equal(produceRecord, record)) + System.out.println("receive corrupt data!!!"); + else tracker.record(record.serializedKeySize() + record.serializedValueSize(), latency); + }); + } catch (org.apache.kafka.common.errors.WakeupException e) { + throw new InterruptedException(e.getMessage()); + } + } + + @Override + void cleanup() { + consumer.close(); + } + + @Override + public void close() { + consumer.wakeup(); + super.close(); + } +} diff --git a/app/src/main/java/org/astraea/performance/latency/DataManager.java b/app/src/main/java/org/astraea/performance/latency/DataManager.java new file mode 100644 index 0000000000..128a72d3ae --- /dev/null +++ b/app/src/main/java/org/astraea/performance/latency/DataManager.java @@ -0,0 +1,74 @@ +package org.astraea.performance.latency; + +import java.util.*; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.kafka.clients.producer.ProducerRecord; + +class DataManager { + + static DataManager of(String topic, int valueSize) { + return new DataManager(topic, valueSize, true); + } + + static DataManager noConsumer(String topic, int valueSize) { + return new DataManager(topic, valueSize, false); + } + + /** record key -> (record, timestamp_done) */ + private final ConcurrentMap, Long>> + sendingRecords = new ConcurrentSkipListMap<>(Arrays::compare); + + private final String topic; + private final int valueSize; + private final boolean hasConsumer; + + private final AtomicLong recordIndex = new AtomicLong(0); + + final AtomicLong producerRecords = new AtomicLong(0); + + private DataManager(String topic, int valueSize, boolean hasConsumer) { + this.topic = Objects.requireNonNull(topic); + this.valueSize = valueSize; + this.hasConsumer = hasConsumer; + } + + /** + * @return generate a new record with random data and specify topic name. The record consists of + * key, value, topic and header. + */ + ProducerRecord producerRecord() { + var content = String.valueOf(recordIndex.getAndIncrement()); + var rawContent = content.getBytes(); + var headers = Collections.singletonList(KafkaUtils.header(content, rawContent)); + return new ProducerRecord<>(topic, null, rawContent, new byte[valueSize], headers); + } + + void sendingRecord(ProducerRecord record, long now) { + if (hasConsumer) { + var previous = + sendingRecords.put(record.key(), new AbstractMap.SimpleImmutableEntry<>(record, now)); + if (previous != null) throw new RuntimeException("duplicate data!!!"); + } + producerRecords.incrementAndGet(); + } + + /** + * get sending record + * + * @param key of completed record + * @return the completed record. Or NullPointerException if there is no related record. + */ + Map.Entry, Long> removeSendingRecord(byte[] key) { + if (!hasConsumer) + throw new UnsupportedOperationException( + "removeSendingRecord is unsupported when there is no consumer"); + return Objects.requireNonNull(sendingRecords.remove(key)); + } + + /** @return number of completed records */ + long numberOfProducerRecords() { + return producerRecords.get(); + } +} diff --git a/app/src/main/java/org/astraea/performance/latency/End2EndLatency.java b/app/src/main/java/org/astraea/performance/latency/End2EndLatency.java new file mode 100644 index 0000000000..ad2e422ab6 --- /dev/null +++ b/app/src/main/java/org/astraea/performance/latency/End2EndLatency.java @@ -0,0 +1,159 @@ +package org.astraea.performance.latency; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class End2EndLatency { + + static final String BROKERS_KEY = "--bootstrap.servers"; + static final String TOPIC_KEY = "--topic"; + static final String PRODUCERS_KEY = "--producers"; + static final String CONSUMERS_KEY = "--consumers"; + static final String DURATION_KEY = "--duration"; + static final String VALUE_SIZE_KEY = "--valueSize"; + static final String FLUSH_DURATION_KEY = "--flushDuration"; + + private static IllegalArgumentException exception(String key, Object incorrectValue) { + return new IllegalArgumentException( + "the value: " + incorrectValue + " is illegal. Please use another value for " + key); + } + + static class Parameters { + final String brokers; + final String topic; + final int numberOfProducers; + final int numberOfConsumers; + final Duration duration; + final int valueSize; + final Duration flushDuration; + + Parameters( + String brokers, + String topic, + int numberOfProducers, + int numberOfConsumers, + Duration duration, + int valueSize, + Duration flushDuration) { + this.brokers = Objects.requireNonNull(brokers); + this.topic = Objects.requireNonNull(topic); + this.numberOfProducers = numberOfProducers; + this.numberOfConsumers = numberOfConsumers; + this.duration = duration; + this.valueSize = valueSize; + this.flushDuration = flushDuration; + + if (brokers.isBlank()) throw exception(BROKERS_KEY, brokers); + if (topic.isBlank()) throw exception(TOPIC_KEY, topic); + if (numberOfProducers <= 0) throw exception(PRODUCERS_KEY, numberOfProducers); + if (numberOfConsumers < 0) throw exception(CONSUMERS_KEY, numberOfConsumers); + if (duration.toMillis() <= 0) throw exception(DURATION_KEY, duration); + if (flushDuration.toMillis() <= 0) throw exception(FLUSH_DURATION_KEY, flushDuration); + if (valueSize <= 0) throw exception(VALUE_SIZE_KEY, valueSize); + } + } + + static Parameters parameters(String[] args) { + // group arguments by key + var argMap = new HashMap(); + for (var i = 0; i <= args.length; i += 2) { + if (i + 1 >= args.length) break; + argMap.put(args[i], args[i + 1]); + } + + // take values from legal keys. We just ignore the invalid configs. + return new Parameters( + argMap.getOrDefault(BROKERS_KEY, ""), + argMap.getOrDefault(TOPIC_KEY, "testLatency-" + System.currentTimeMillis()), + Integer.parseInt(argMap.getOrDefault(PRODUCERS_KEY, "1")), + Integer.parseInt(argMap.getOrDefault(CONSUMERS_KEY, "1")), + Duration.ofSeconds(Long.parseLong(argMap.getOrDefault(DURATION_KEY, "5"))), + Integer.parseInt(argMap.getOrDefault(VALUE_SIZE_KEY, "100")), + Duration.ofSeconds(Long.parseLong(argMap.getOrDefault(FLUSH_DURATION_KEY, "2")))); + } + + public static void main(String[] args) throws Exception { + var parameters = parameters(args); + + System.out.println("brokers: " + parameters.brokers); + System.out.println("topic: " + parameters.topic); + System.out.println("numberOfProducers: " + parameters.numberOfProducers); + System.out.println("numberOfConsumers: " + parameters.numberOfConsumers); + System.out.println("duration: " + parameters.duration.toSeconds() + " seconds"); + System.out.println("value size: " + parameters.valueSize + " bytes"); + System.out.println("flush duration: " + parameters.flushDuration.toSeconds() + " seconds"); + + try (var closeFlag = + execute( + ComponentFactory.fromKafka(parameters.brokers, Collections.singleton(parameters.topic)), + parameters)) { + TimeUnit.MILLISECONDS.sleep(parameters.duration.toMillis()); + } + } + + static AutoCloseable execute(ComponentFactory factory, Parameters parameters) throws Exception { + var consumerTracker = new MeterTracker("consumer latency"); + var producerTracker = new MeterTracker("producer latency"); + var dataManager = + parameters.numberOfConsumers <= 0 + ? DataManager.noConsumer(parameters.topic, parameters.valueSize) + : DataManager.of(parameters.topic, parameters.valueSize); + + // create producer threads + var producers = + IntStream.range(0, parameters.numberOfProducers) + .mapToObj( + i -> + new ProducerThread( + dataManager, producerTracker, factory.producer(), parameters.flushDuration)) + .collect(Collectors.toList()); + + // create consumers threads + var consumers = + IntStream.range(0, parameters.numberOfConsumers) + .mapToObj( + i -> new ConsumerThread(dataManager, consumerTracker, factory.createConsumer())) + .collect(Collectors.toList()); + + // + 2 for latency trackers + var services = + Executors.newFixedThreadPool( + parameters.numberOfProducers + parameters.numberOfConsumers + 2); + + AutoCloseable releaseAllObjects = + () -> { + producerTracker.close(); + consumerTracker.close(); + producers.forEach(ProducerThread::close); + consumers.forEach(ConsumerThread::close); + services.shutdownNow(); + if (!services.awaitTermination(30, TimeUnit.SECONDS)) { + System.out.println("timeout to wait all threads"); + } + }; + try { + try (var topicAdmin = factory.createTopicAdmin()) { + // the number of partitions is equal to number of consumers. That make each consumer can + // consume a part of topic. + KafkaUtils.createTopicIfNotExist( + topicAdmin, + parameters.topic, + parameters.numberOfConsumers <= 0 ? 1 : parameters.numberOfConsumers); + } + consumers.forEach(services::execute); + producers.forEach(services::execute); + services.execute(consumerTracker); + services.execute(producerTracker); + return releaseAllObjects; + } catch (Exception e) { + releaseAllObjects.close(); + throw e; + } + } +} diff --git a/app/src/main/java/org/astraea/performance/latency/KafkaUtils.java b/app/src/main/java/org/astraea/performance/latency/KafkaUtils.java new file mode 100644 index 0000000000..66f2ca7500 --- /dev/null +++ b/app/src/main/java/org/astraea/performance/latency/KafkaUtils.java @@ -0,0 +1,53 @@ +package org.astraea.performance.latency; + +import java.util.*; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; + +final class KafkaUtils { + + static Header header(String key, byte[] value) { + return new RecordHeader(key, value); + } + + static boolean equal(Iterable
lhs, Iterable
rhs) { + Comparator
comparator = + (o1, o2) -> { + var r = o1.key().compareTo(o2.key()); + if (r != 0) return r; + else return Arrays.compare(o1.value(), o2.value()); + }; + var lhsList = new ArrayList
(); + lhs.forEach(lhsList::add); + var rhsList = new ArrayList
(); + rhs.forEach(rhsList::add); + lhsList.sort(comparator); + rhsList.sort(comparator); + return lhsList.equals(rhsList); + } + + static boolean equal( + ProducerRecord producerRecord, + ConsumerRecord consumerRecord) { + if (!Arrays.equals(producerRecord.key(), consumerRecord.key())) return false; + if (!Arrays.equals(producerRecord.value(), consumerRecord.value())) return false; + if (!producerRecord.topic().equals(consumerRecord.topic())) return false; + return equal(producerRecord.headers(), consumerRecord.headers()); + } + + static void createTopicIfNotExist(TopicAdmin adminClient, String name, int numberOfPartitions) { + try { + if (!adminClient.listTopics().contains(name)) + adminClient.createTopics( + Collections.singletonList( + new NewTopic(name, Optional.of(numberOfPartitions), Optional.empty()))); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private KafkaUtils() {} +} diff --git a/app/src/main/java/org/astraea/performance/latency/MeterTracker.java b/app/src/main/java/org/astraea/performance/latency/MeterTracker.java new file mode 100644 index 0000000000..86d3d0aeeb --- /dev/null +++ b/app/src/main/java/org/astraea/performance/latency/MeterTracker.java @@ -0,0 +1,78 @@ +package org.astraea.performance.latency; + +import java.util.concurrent.TimeUnit; + +class MeterTracker extends CloseableThread { + private final long startTime = System.currentTimeMillis(); + private final String name; + private long bytes = 0; + private long count = 0; + private double avgLatency = 0; + private long maxLatency = 0; + + MeterTracker(String name) { + this.name = name; + } + + synchronized void record(long bytes, long latency) { + maxLatency = Math.max(maxLatency, latency); + avgLatency = (avgLatency * count + latency) / (count + 1); + count += 1; + this.bytes += bytes; + } + + synchronized long count() { + return count; + } + + synchronized long bytes() { + return bytes; + } + + synchronized double averageLatency() { + return avgLatency; + } + + synchronized long maxLatency() { + return maxLatency; + } + + synchronized double throughput() { + var second = (System.currentTimeMillis() - startTime) / 1000; + return ((double) bytes / second); + } + + private static double round(double value) { + return Math.round(value * 100) / (double) 100; + } + + @Override + public synchronized String toString() { + return " records: " + + count + + " throughput: " + + round(throughput() / 1024 / 1024) + + " MB/s" + + " avg_latency: " + + round(avgLatency) + + " ms" + + " max_latency: " + + maxLatency + + " ms"; + } + + @Override + void execute() { + try { + TimeUnit.SECONDS.sleep(2); + String output; + synchronized (this) { + if (count == 0) return; + else output = "[" + name + "] " + this; + } + System.out.println(output); + } catch (InterruptedException e) { + // swallow + } + } +} diff --git a/app/src/main/java/org/astraea/performance/latency/Producer.java b/app/src/main/java/org/astraea/performance/latency/Producer.java new file mode 100644 index 0000000000..9e16de32f5 --- /dev/null +++ b/app/src/main/java/org/astraea/performance/latency/Producer.java @@ -0,0 +1,51 @@ +package org.astraea.performance.latency; + +import java.io.Closeable; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.ByteArraySerializer; + +interface Producer extends Closeable { + + static Producer fromKafka(Properties props) { + var kafkaProducer = + new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer()); + return new Producer() { + + @Override + public CompletionStage send(ProducerRecord record) { + var f = new CompletableFuture(); + kafkaProducer.send( + record, + (r, e) -> { + if (e != null) f.completeExceptionally(e); + else f.complete(r); + }); + return f; + } + + @Override + public void flush() { + kafkaProducer.flush(); + } + + @Override + public void close() { + kafkaProducer.close(); + } + }; + } + + /** see {@link KafkaProducer#send(ProducerRecord)} */ + CompletionStage send(ProducerRecord record); + + /** see {@link KafkaProducer#flush()}} */ + default void flush() {} + + @Override + default void close() {} +} diff --git a/app/src/main/java/org/astraea/performance/latency/ProducerThread.java b/app/src/main/java/org/astraea/performance/latency/ProducerThread.java new file mode 100644 index 0000000000..f3eb6ac196 --- /dev/null +++ b/app/src/main/java/org/astraea/performance/latency/ProducerThread.java @@ -0,0 +1,47 @@ +package org.astraea.performance.latency; + +import java.time.Duration; +import java.util.Objects; + +class ProducerThread extends CloseableThread { + private final Producer producer; + private final DataManager dataManager; + private final MeterTracker tracker; + private final Duration flushDuration; + private long lastSend = 0; + + ProducerThread( + DataManager dataManager, MeterTracker tracker, Producer producer, Duration flushDuration) { + this.dataManager = Objects.requireNonNull(dataManager); + this.producer = Objects.requireNonNull(producer); + this.tracker = Objects.requireNonNull(tracker); + this.flushDuration = Objects.requireNonNull(flushDuration); + } + + @Override + void execute() { + var data = dataManager.producerRecord(); + var now = System.currentTimeMillis(); + dataManager.sendingRecord(data, now); + producer + .send(data) + .whenComplete( + (r, e) -> { + if (e != null) tracker.record(0, System.currentTimeMillis() - now); + else + tracker.record( + r.serializedKeySize() + r.serializedValueSize(), + System.currentTimeMillis() - now); + }); + if (lastSend <= 0) lastSend = now; + else if (lastSend + flushDuration.toMillis() < now) { + lastSend = now; + producer.flush(); + } + } + + @Override + void cleanup() { + producer.close(); + } +} diff --git a/app/src/main/java/org/astraea/performance/latency/TopicAdmin.java b/app/src/main/java/org/astraea/performance/latency/TopicAdmin.java new file mode 100644 index 0000000000..f2aadd5e4c --- /dev/null +++ b/app/src/main/java/org/astraea/performance/latency/TopicAdmin.java @@ -0,0 +1,50 @@ +package org.astraea.performance.latency; + +import java.io.Closeable; +import java.util.Collection; +import java.util.Properties; +import java.util.Set; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; + +interface TopicAdmin extends Closeable { + + static TopicAdmin fromKafka(Properties props) { + var adminClient = Admin.create(props); + + return new TopicAdmin() { + + @Override + public Set listTopics() { + try { + return adminClient.listTopics().names().get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void createTopics(Collection newTopics) { + try { + adminClient.createTopics(newTopics).all().get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + adminClient.close(); + } + }; + } + + /** see {@link Admin#listTopics()} */ + Set listTopics(); + + /** see {@link Admin#createTopics(Collection)} */ + void createTopics(Collection newTopics); + + @Override + default void close() {} +} diff --git a/app/src/test/java/org/astraea/performance/latency/CloseableThreadTest.java b/app/src/test/java/org/astraea/performance/latency/CloseableThreadTest.java new file mode 100644 index 0000000000..27caa35590 --- /dev/null +++ b/app/src/test/java/org/astraea/performance/latency/CloseableThreadTest.java @@ -0,0 +1,41 @@ +package org.astraea.performance.latency; + +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class CloseableThreadTest { + + @Test + void testAllActions() throws InterruptedException { + var executeCount = new AtomicInteger(); + var cleanupCount = new AtomicInteger(); + var thread = + new CloseableThread() { + + @Override + void execute() { + executeCount.incrementAndGet(); + } + + @Override + void cleanup() { + cleanupCount.incrementAndGet(); + } + }; + + var service = Executors.newSingleThreadExecutor(); + service.execute(thread); + + TimeUnit.SECONDS.sleep(1); + + thread.close(); + service.shutdown(); + Assertions.assertTrue(service.awaitTermination(10, TimeUnit.SECONDS)); + + Assertions.assertTrue(executeCount.get() > 0); + Assertions.assertEquals(1, cleanupCount.get()); + } +} diff --git a/app/src/test/java/org/astraea/performance/latency/ConsumerThreadTest.java b/app/src/test/java/org/astraea/performance/latency/ConsumerThreadTest.java new file mode 100644 index 0000000000..9d0734890f --- /dev/null +++ b/app/src/test/java/org/astraea/performance/latency/ConsumerThreadTest.java @@ -0,0 +1,44 @@ +package org.astraea.performance.latency; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class ConsumerThreadTest { + private final String topic = "topic-" + System.currentTimeMillis(); + private final DataManager dataManager = DataManager.of(topic, 10); + + @Test + void testExecute() throws InterruptedException { + var count = new AtomicInteger(0); + var producerRecord = dataManager.producerRecord(); + var consumerRecord = FakeComponentFactory.toConsumerRecord(producerRecord); + try (Consumer consumer = + () -> { + count.incrementAndGet(); + return new ConsumerRecords<>( + Collections.singletonMap( + new TopicPartition(topic, 1), Collections.singletonList(consumerRecord))); + }) { + var tracker = new MeterTracker("test consumer"); + Assertions.assertEquals(0, tracker.maxLatency()); + Assertions.assertEquals(0, tracker.averageLatency()); + + var thread = new ConsumerThread(dataManager, tracker, consumer); + dataManager.sendingRecord(producerRecord, System.currentTimeMillis()); + + // add the latency + TimeUnit.SECONDS.sleep(1); + thread.execute(); + Assertions.assertEquals(1, count.get()); + Assertions.assertEquals(1, dataManager.numberOfProducerRecords()); + Assertions.assertEquals(1, tracker.count()); + Assertions.assertNotEquals(0, tracker.maxLatency()); + Assertions.assertNotEquals(0, tracker.averageLatency()); + } + } +} diff --git a/app/src/test/java/org/astraea/performance/latency/DataManagerTest.java b/app/src/test/java/org/astraea/performance/latency/DataManagerTest.java new file mode 100644 index 0000000000..415d60e612 --- /dev/null +++ b/app/src/test/java/org/astraea/performance/latency/DataManagerTest.java @@ -0,0 +1,30 @@ +package org.astraea.performance.latency; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class DataManagerTest { + private final String topic = "topic-" + String.valueOf(System.currentTimeMillis()); + private final DataManager dataManager = DataManager.of(topic, 10); + + @Test + void testTakeRecord() { + var record = dataManager.producerRecord(); + Assertions.assertEquals(topic, record.topic()); + Assertions.assertNotNull(record.key()); + Assertions.assertNotNull(record.value()); + Assertions.assertNotNull(record.headers()); + } + + @Test + void testCompleteRecord() { + Assertions.assertEquals(0, dataManager.numberOfProducerRecords()); + var record = dataManager.producerRecord(); + dataManager.sendingRecord(record, System.currentTimeMillis()); + Assertions.assertEquals(1, dataManager.numberOfProducerRecords()); + Assertions.assertEquals(record, dataManager.removeSendingRecord(record.key()).getKey()); + + Assertions.assertThrows( + NullPointerException.class, () -> dataManager.removeSendingRecord("aa".getBytes())); + } +} diff --git a/app/src/test/java/org/astraea/performance/latency/End2EndLatencyTest.java b/app/src/test/java/org/astraea/performance/latency/End2EndLatencyTest.java new file mode 100644 index 0000000000..60d702bb1c --- /dev/null +++ b/app/src/test/java/org/astraea/performance/latency/End2EndLatencyTest.java @@ -0,0 +1,136 @@ +package org.astraea.performance.latency; + +import java.time.Duration; +import java.util.concurrent.*; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class End2EndLatencyTest { + + @Test + void testIncorrectParameters() { + Assertions.assertThrows( + IllegalArgumentException.class, + () -> End2EndLatency.parameters(new String[] {End2EndLatency.BROKERS_KEY, ""})); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + End2EndLatency.parameters( + new String[] { + End2EndLatency.BROKERS_KEY, "localhost:11111", + End2EndLatency.TOPIC_KEY, "" + })); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + End2EndLatency.parameters( + new String[] { + End2EndLatency.BROKERS_KEY, "localhost:11111", + End2EndLatency.DURATION_KEY, "-1" + })); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + End2EndLatency.parameters( + new String[] { + End2EndLatency.BROKERS_KEY, "localhost:11111", + End2EndLatency.FLUSH_DURATION_KEY, "-1" + })); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + End2EndLatency.parameters( + new String[] { + End2EndLatency.BROKERS_KEY, "localhost:11111", + End2EndLatency.PRODUCERS_KEY, "-1" + })); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + End2EndLatency.parameters( + new String[] { + End2EndLatency.BROKERS_KEY, "localhost:11111", + End2EndLatency.PRODUCERS_KEY, "0" + })); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + End2EndLatency.parameters( + new String[] { + End2EndLatency.BROKERS_KEY, "localhost:11111", + End2EndLatency.CONSUMERS_KEY, "-1" + })); + + Assertions.assertEquals( + 0, + End2EndLatency.parameters( + new String[] { + End2EndLatency.BROKERS_KEY, "localhost:11111", + End2EndLatency.CONSUMERS_KEY, "0" + }) + .numberOfConsumers); + } + + @Test + void testParameters() { + var brokers = "broker00:12345"; + var topic = "topic"; + var numberOfProducers = 100; + var numberOfConsumers = 10; + var duration = Duration.ofSeconds(10); + var valueSize = 888; + var flushDuration = Duration.ofSeconds(3); + var parameters = + End2EndLatency.parameters( + new String[] { + End2EndLatency.BROKERS_KEY, + brokers, + End2EndLatency.CONSUMERS_KEY, + String.valueOf(numberOfConsumers), + End2EndLatency.DURATION_KEY, + String.valueOf(duration.toSeconds()), + End2EndLatency.PRODUCERS_KEY, + String.valueOf(numberOfProducers), + End2EndLatency.TOPIC_KEY, + topic, + End2EndLatency.VALUE_SIZE_KEY, + String.valueOf(valueSize), + End2EndLatency.FLUSH_DURATION_KEY, + String.valueOf(flushDuration.toSeconds()) + }); + Assertions.assertEquals(brokers, parameters.brokers); + Assertions.assertEquals(topic, parameters.topic); + Assertions.assertEquals(numberOfConsumers, parameters.numberOfConsumers); + Assertions.assertEquals(numberOfProducers, parameters.numberOfProducers); + Assertions.assertEquals(duration, parameters.duration); + Assertions.assertEquals(valueSize, parameters.valueSize); + Assertions.assertEquals(flushDuration, parameters.flushDuration); + } + + @Test + void testExecute() throws Exception { + var factory = new FakeComponentFactory(); + try (var r = + End2EndLatency.execute( + factory, + new End2EndLatency.Parameters( + "brokers", "topic", 1, 1, Duration.ofSeconds(1), 10, Duration.ofSeconds(1)))) { + TimeUnit.SECONDS.sleep(2); + } + + // check producers count + Assertions.assertTrue(factory.producerSendCount.get() > 0); + Assertions.assertTrue(factory.producerFlushCount.get() > 0); + Assertions.assertEquals(1, factory.producerCloseCount.get()); + + // check consumer count + Assertions.assertTrue(factory.consumerPoolCount.get() > 0); + Assertions.assertEquals(1, factory.consumerWakeupCount.get()); + Assertions.assertEquals(1, factory.consumerCloseCount.get()); + + // check admin topic count + Assertions.assertEquals(1, factory.topicAdminListCount.get()); + Assertions.assertEquals(1, factory.topicAdminCloseCount.get()); + Assertions.assertEquals(1, factory.topicAdminCreateCount.get()); + } +} diff --git a/app/src/test/java/org/astraea/performance/latency/FakeComponentFactory.java b/app/src/test/java/org/astraea/performance/latency/FakeComponentFactory.java new file mode 100644 index 0000000000..60f1e0cdfa --- /dev/null +++ b/app/src/test/java/org/astraea/performance/latency/FakeComponentFactory.java @@ -0,0 +1,138 @@ +package org.astraea.performance.latency; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.TimestampType; + +class FakeComponentFactory implements ComponentFactory { + + static ConsumerRecord toConsumerRecord( + ProducerRecord producerRecord) { + return new ConsumerRecord<>( + producerRecord.topic(), + 1, + 1L, + 1L, + TimestampType.CREATE_TIME, + 1L, + producerRecord.key() == null ? 0 : producerRecord.key().length, + producerRecord.value() == null ? 0 : producerRecord.value().length, + producerRecord.key(), + producerRecord.value(), + new RecordHeaders(producerRecord.headers())); + } + + final AtomicInteger producerSendCount = new AtomicInteger(); + final AtomicInteger producerFlushCount = new AtomicInteger(); + final AtomicInteger producerCloseCount = new AtomicInteger(); + + final AtomicInteger consumerPoolCount = new AtomicInteger(); + final AtomicInteger consumerWakeupCount = new AtomicInteger(); + final AtomicInteger consumerCloseCount = new AtomicInteger(); + + final AtomicInteger topicAdminListCount = new AtomicInteger(); + final AtomicInteger topicAdminCreateCount = new AtomicInteger(); + final AtomicInteger topicAdminCloseCount = new AtomicInteger(); + + /** save all records from producer */ + final BlockingQueue> allRecords = new LinkedBlockingQueue<>(); + + /** save non-consumed records */ + final BlockingQueue> records = new LinkedBlockingQueue<>(); + + @Override + public Producer producer() { + return new Producer() { + + @Override + public CompletionStage send(ProducerRecord record) { + producerSendCount.incrementAndGet(); + allRecords.add(record); + records.add(record); + return CompletableFuture.completedFuture( + new RecordMetadata( + new TopicPartition(record.topic(), 1), + 1L, + 1L, + 1L, + 1L, + record.key() == null ? 0 : record.key().length, + record.value() == null ? 0 : record.value().length)); + } + + @Override + public void flush() { + producerFlushCount.incrementAndGet(); + } + + @Override + public void close() { + producerCloseCount.incrementAndGet(); + } + }; + } + + @Override + public Consumer createConsumer() { + return new Consumer() { + + @Override + public ConsumerRecords poll() { + consumerPoolCount.incrementAndGet(); + try { + var producerRecord = records.poll(1, TimeUnit.SECONDS); + if (producerRecord == null) return new ConsumerRecords<>(Collections.emptyMap()); + return new ConsumerRecords<>( + Collections.singletonMap( + new TopicPartition(producerRecord.topic(), 1), + Collections.singletonList(toConsumerRecord(producerRecord)))); + } catch (InterruptedException e) { + throw new WakeupException(); + } + } + + @Override + public void wakeup() { + consumerWakeupCount.incrementAndGet(); + } + + @Override + public void close() { + consumerCloseCount.incrementAndGet(); + } + }; + } + + @Override + public TopicAdmin createTopicAdmin() { + return new TopicAdmin() { + + @Override + public Set listTopics() { + topicAdminListCount.incrementAndGet(); + return Collections.emptySet(); + } + + @Override + public void createTopics(Collection newTopics) { + topicAdminCreateCount.incrementAndGet(); + } + + @Override + public void close() { + topicAdminCloseCount.incrementAndGet(); + } + }; + } +} diff --git a/app/src/test/java/org/astraea/performance/latency/KafkaUtilsTest.java b/app/src/test/java/org/astraea/performance/latency/KafkaUtilsTest.java new file mode 100644 index 0000000000..f42ede5607 --- /dev/null +++ b/app/src/test/java/org/astraea/performance/latency/KafkaUtilsTest.java @@ -0,0 +1,85 @@ +package org.astraea.performance.latency; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class KafkaUtilsTest { + + @Test + void testCreateTopic() { + var numberOfPartitions = 1000; + var existentTopic = "existentTopic"; + var count = new AtomicInteger(); + var topicAdmin = + new TopicAdmin() { + + @Override + public Set listTopics() { + return Collections.singleton(existentTopic); + } + + @Override + public void createTopics(Collection newTopics) { + count.incrementAndGet(); + Assertions.assertEquals(1, newTopics.size()); + Assertions.assertEquals( + numberOfPartitions, newTopics.iterator().next().numPartitions()); + } + }; + KafkaUtils.createTopicIfNotExist(topicAdmin, existentTopic, 10); + Assertions.assertEquals(0, count.get()); + + KafkaUtils.createTopicIfNotExist(topicAdmin, "nonexistent", numberOfPartitions); + Assertions.assertEquals(1, count.get()); + } + + @Test + void testCreateHeader() { + var header = KafkaUtils.header("a", "a".getBytes()); + Assertions.assertEquals("a", header.key()); + Assertions.assertArrayEquals("a".getBytes(), header.value()); + } + + @Test + void testEqualOfHeader() { + var headers0 = + IntStream.range(0, 5) + .mapToObj(i -> KafkaUtils.header(String.valueOf(i), String.valueOf(i).getBytes())) + .collect(Collectors.toList()); + var headers1 = + IntStream.range(0, 5) + .mapToObj(i -> KafkaUtils.header(String.valueOf(i), String.valueOf(i).getBytes())) + .collect(Collectors.toList()); + + // reorder + IntStream.range(0, 10) + .forEach( + i -> { + Collections.shuffle(headers1); + Collections.shuffle(headers0); + Assertions.assertTrue(KafkaUtils.equal(headers0, headers0)); + Assertions.assertTrue(KafkaUtils.equal(headers1, headers1)); + Assertions.assertTrue(KafkaUtils.equal(headers0, headers1)); + Assertions.assertTrue(KafkaUtils.equal(headers1, headers0)); + }); + } + + @Test + void testEqualOfRecord() { + var topic = "topic"; + var key = "key".getBytes(); + var value = "value".getBytes(); + var headers = Collections.singleton(KafkaUtils.header("a", "b".getBytes())); + var producerRecord = new ProducerRecord<>(topic, null, key, value, headers); + var consumerRecord = FakeComponentFactory.toConsumerRecord(producerRecord); + Assertions.assertTrue(KafkaUtils.equal(producerRecord, consumerRecord)); + } +} diff --git a/app/src/test/java/org/astraea/performance/latency/MeterTrackerTest.java b/app/src/test/java/org/astraea/performance/latency/MeterTrackerTest.java new file mode 100644 index 0000000000..235343cb81 --- /dev/null +++ b/app/src/test/java/org/astraea/performance/latency/MeterTrackerTest.java @@ -0,0 +1,21 @@ +package org.astraea.performance.latency; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class MeterTrackerTest { + + @Test + void testRecord() { + var tracker = new MeterTracker("test tracker"); + Assertions.assertEquals(0, tracker.count()); + Assertions.assertEquals(0, tracker.maxLatency()); + Assertions.assertEquals(0, tracker.averageLatency()); + + tracker.record(10, 10); + Assertions.assertEquals(1, tracker.count()); + Assertions.assertEquals(10, tracker.bytes()); + Assertions.assertEquals(10, tracker.maxLatency()); + Assertions.assertEquals(10, tracker.averageLatency()); + } +} diff --git a/app/src/test/java/org/astraea/performance/latency/ProducerThreadTest.java b/app/src/test/java/org/astraea/performance/latency/ProducerThreadTest.java new file mode 100644 index 0000000000..a4cf476973 --- /dev/null +++ b/app/src/test/java/org/astraea/performance/latency/ProducerThreadTest.java @@ -0,0 +1,32 @@ +package org.astraea.performance.latency; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class ProducerThreadTest { + private final String topic = "topic-" + String.valueOf(System.currentTimeMillis()); + private final DataManager dataManager = DataManager.of(topic, 10); + + @Test + void testExecute() { + var count = new AtomicInteger(0); + try (Producer producer = + record -> { + count.incrementAndGet(); + return CompletableFuture.completedFuture( + new RecordMetadata(new TopicPartition(record.topic(), 1), 1L, 1L, 1L, 1L, 1, 1)); + }) { + var thread = + new ProducerThread( + dataManager, new MeterTracker("test producer"), producer, Duration.ZERO); + thread.execute(); + Assertions.assertEquals(1, count.get()); + Assertions.assertEquals(1, dataManager.numberOfProducerRecords()); + } + } +}