From 9295204fd297a951a6ad30f1df1c99ac38d0131c Mon Sep 17 00:00:00 2001 From: rodesai Date: Mon, 19 Aug 2019 11:07:49 -0700 Subject: [PATCH] feat: enhance datagen for use as a load generator Resurrecting some ancient enhancements to datagen so that we can use it to generate load: - Add a flag to disable printing each row - Add a flag to control the number of threads producing data - Add a flag to control the total message rate (msgs/second) across all the threads. The rate limiting is implemented using a token bucket. --- .../io/confluent/ksql/datagen/DataGen.java | 181 +++++++++++++++--- .../ksql/datagen/DataGenProducer.java | 24 ++- .../confluent/ksql/datagen/DataGenTest.java | 5 +- 3 files changed, 176 insertions(+), 34 deletions(-) diff --git a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java index 6adf47d04c5c..12e13d208d8a 100644 --- a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java +++ b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java @@ -16,19 +16,24 @@ package io.confluent.ksql.datagen; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.RateLimiter; import io.confluent.avro.random.generator.Generator; import io.confluent.ksql.serde.Format; import io.confluent.ksql.util.KsqlConfig; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Random; import java.util.function.BiConsumer; +import java.util.function.Supplier; public final class DataGen { @@ -58,19 +63,48 @@ static void run(final String... args) throws IOException { return; } - final Generator generator = new Generator(arguments.schemaFile, new Random()); final Properties props = getProperties(arguments); final DataGenProducer dataProducer = ProducerFactory .getProducer(arguments.keyFormat, arguments.valueFormat, props); + final Optional rateLimiter = arguments.msgRate != -1 + ? Optional.of(RateLimiter.create(arguments.msgRate)) : Optional.empty(); - dataProducer.populateTopic( - props, - generator, - arguments.topicName, - arguments.keyName, - arguments.iterations, - arguments.maxInterval - ); + final List threads = new LinkedList<>(); + for (int i = 0; i < arguments.numThreads; i++) { + threads.add(startProducerThread(arguments, dataProducer, props, rateLimiter)); + } + + for (final Thread t : threads) { + try { + t.join(); + } catch (InterruptedException e) { + // interrupted. exit with error + System.exit(1); + } + } + } + + private static Thread startProducerThread( + final Arguments arguments, + final DataGenProducer dataProducer, + final Properties props, + final Optional rateLimiter) throws IOException { + final Generator generator = new Generator(arguments.schemaFile.get(), new Random()); + final Thread t = new Thread(() -> { + dataProducer.populateTopic( + props, + generator, + arguments.topicName, + arguments.keyName, + arguments.iterations, + arguments.maxInterval, + arguments.printRows, + rateLimiter + ); + }); + t.setDaemon(true); + t.start(); + return t; } static Properties getProperties(final Arguments arguments) throws IOException { @@ -105,7 +139,10 @@ private static void usage() { + "key= " + newLine + "[iterations= (defaults to 1,000,000)] " + newLine + "[maxInterval= (defaults to 500)] " + newLine - + "[propertiesFile=]" + newLine + + "[propertiesFile=] " + newLine + + "[nThreads=] " + newLine + + "[msgRate=] " + newLine + + "[printRows=]" + newLine ); } @@ -113,7 +150,7 @@ static class Arguments { private final boolean help; private final String bootstrapServer; - private final InputStream schemaFile; + private final Supplier schemaFile; private final Format keyFormat; private final Format valueFormat; private final String topicName; @@ -122,12 +159,15 @@ static class Arguments { private final long maxInterval; private final String schemaRegistryUrl; private final InputStream propertiesFile; + private final int numThreads; + private final int msgRate; + private final boolean printRows; // CHECKSTYLE_RULES.OFF: ParameterNumberCheck Arguments( final boolean help, final String bootstrapServer, - final InputStream schemaFile, + final Supplier schemaFile, final Format keyFormat, final Format valueFormat, final String topicName, @@ -135,7 +175,10 @@ static class Arguments { final int iterations, final long maxInterval, final String schemaRegistryUrl, - final InputStream propertiesFile + final InputStream propertiesFile, + final int numThreads, + final int msgRate, + final boolean printRows ) { // CHECKSTYLE_RULES.ON: ParameterNumberCheck this.help = help; @@ -149,6 +192,9 @@ static class Arguments { this.maxInterval = maxInterval; this.schemaRegistryUrl = schemaRegistryUrl; this.propertiesFile = propertiesFile; + this.numThreads = numThreads; + this.msgRate = msgRate; + this.printRows = printRows; } static class ArgumentParseException extends RuntimeException { @@ -171,19 +217,22 @@ private static final class Builder { .put("format", (builder, argVal) -> builder.valueFormat = parseFormat(argVal)) .put("topic", (builder, argVal) -> builder.topicName = argVal) .put("key", (builder, argVal) -> builder.keyName = argVal) - .put("iterations", (builder, argVal) -> builder.iterations = parseIterations(argVal)) + .put("iterations", (builder, argVal) -> builder.iterations = parseInt(argVal, 1)) .put("maxInterval", - (builder, argVal) -> builder.maxInterval = parseIterations(argVal)) + (builder, argVal) -> builder.maxInterval = parseInt(argVal, 0)) .put("schemaRegistryUrl", (builder, argVal) -> builder.schemaRegistryUrl = argVal) .put("propertiesFile", - (builder, argVal) -> builder.propertiesFile = toFileInputStream(argVal)) + (builder, argVal) -> builder.propertiesFile = toFileInputStream(argVal).get()) + .put("msgRate", (builder, argVal) -> builder.msgRate = parseMsgRate(argVal)) + .put("nThreads", (builder, argVal) -> builder.numThreads = parseNumThreads(argVal)) + .put("printRows", (builder, argVal) -> builder.printRows = parsePrintRows(argVal)) .build(); private Quickstart quickstart; private boolean help; private String bootstrapServer; - private InputStream schemaFile; + private Supplier schemaFile; private Format keyFormat; private Format valueFormat; private String topicName; @@ -192,6 +241,9 @@ private static final class Builder { private long maxInterval; private String schemaRegistryUrl; private InputStream propertiesFile; + private int msgRate; + private int numThreads; + private boolean printRows; private Builder() { quickstart = null; @@ -206,6 +258,9 @@ private Builder() { maxInterval = -1; schemaRegistryUrl = "http://localhost:8081"; propertiesFile = null; + msgRate = -1; + numThreads = 1; + printRows = true; } private enum Quickstart { @@ -228,8 +283,8 @@ private enum Quickstart { this.keyName = keyName; } - public InputStream getSchemaFile() { - return getClass().getClassLoader().getResourceAsStream(schemaFileName); + public Supplier getSchemaFile() { + return () -> getClass().getClassLoader().getResourceAsStream(schemaFileName); } public String getTopicName(final Format format) { @@ -251,7 +306,22 @@ public Format getValueFormat() { Arguments build() { if (help) { - return new Arguments(true, null, null, null, null,null, null, 0, -1, null, null); + return new Arguments( + true, + null, + null, + null, + null, + null, + null, + 0, + -1, + null, + null, + 1, + -1, + true + ); } if (quickstart != null) { @@ -282,7 +352,10 @@ Arguments build() { iterations, maxInterval, schemaRegistryUrl, - propertiesFile + propertiesFile, + numThreads, + msgRate, + printRows ); } @@ -342,9 +415,15 @@ private void setArg(final String argName, final String argVal) { handler.accept(this, argVal); } - private static FileInputStream toFileInputStream(final String argVal) { + private static Supplier toFileInputStream(final String argVal) { try { - return new FileInputStream(argVal); + return () -> { + try { + return new FileInputStream(argVal); + } catch (final FileNotFoundException e) { + throw new RuntimeException(e); + } + }; } catch (final Exception e) { throw new IllegalArgumentException("File not found: " + argVal, e); } @@ -375,19 +454,65 @@ private static Format parseFormat(final String formatString) { } } - private static int parseIterations(final String iterationsString) { + private static int parseNumThreads(final String numThreadsString) { + try { + final int result = Integer.valueOf(numThreadsString, 10); + if (result < 0) { + throw new ArgumentParseException(String.format( + "Invalid number of threads in '%d'; must be a positive number", + result)); + } + return result; + } catch (NumberFormatException e) { + throw new ArgumentParseException(String.format( + "Invalid number of threads in '%s'; must be a positive number", + numThreadsString)); + } + } + + private static int parseMsgRate(final String msgRateString) { + try { + final int result = Integer.valueOf(msgRateString, 10); + if (result < 0) { + throw new ArgumentParseException(String.format( + "Invalid msg rate in '%d'; must be a positive number", + result)); + } + return result; + } catch (NumberFormatException e) { + throw new ArgumentParseException(String.format( + "Invalid msg rate in '%s'; must be a positive number", + msgRateString)); + } + } + + private static boolean parsePrintRows(final String printRowsString) { + switch (printRowsString.toLowerCase()) { + case "false": + return false; + case "true": + return true; + default: + throw new ArgumentParseException(String.format( + "Invalid value for printRows in '%s'; must be true or false", + printRowsString + )); + } + } + + private static int parseInt(final String iterationsString, final int minValue) { try { final int result = Integer.valueOf(iterationsString, 10); - if (result <= 0) { + if (result < minValue) { throw new ArgumentParseException(String.format( - "Invalid number of iterations in '%d'; must be a positive number", - result + "Invalid integer value '%d'; must be > %d", + result, minValue )); } return Integer.valueOf(iterationsString, 10); } catch (final NumberFormatException exception) { throw new ArgumentParseException(String.format( - "Invalid number of iterations in '%s'; must be a valid base 10 integer", + "Invalid integer value '%s'; must be a valid base 10 integer", iterationsString )); } diff --git a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGenProducer.java b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGenProducer.java index 68ecdeed0fff..d7cf72105d6b 100644 --- a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGenProducer.java +++ b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGenProducer.java @@ -17,11 +17,13 @@ import static java.util.Objects.requireNonNull; +import com.google.common.util.concurrent.RateLimiter; import io.confluent.avro.random.generator.Generator; import io.confluent.ksql.GenericRow; import io.confluent.ksql.schema.ksql.PersistenceSchema; import io.confluent.ksql.util.Pair; import java.util.Objects; +import java.util.Optional; import java.util.Properties; import java.util.stream.Collectors; import org.apache.avro.Schema; @@ -35,7 +37,7 @@ public class DataGenProducer { - // Max 100 ms between messsages. + // Max 500 ms between messsages. public static final long INTER_MESSAGE_MAX_INTERVAL = 500; private final SerializerFactory keySerializerFactory; @@ -55,7 +57,9 @@ public void populateTopic( final String kafkaTopicName, final String key, final int messageCount, - final long maxInterval + final long maxInterval, + final boolean printRows, + final Optional rateLimiter ) { final Schema avroSchema = generator.schema(); if (avroSchema.getField(key) == null) { @@ -76,6 +80,7 @@ public void populateTopic( ); for (int i = 0; i < messageCount; i++) { + rateLimiter.ifPresent(RateLimiter::acquire); final Pair genericRowPair = rowGenerator.generateRow(); @@ -88,7 +93,8 @@ public void populateTopic( producer.send(producerRecord, new LoggingCallback(kafkaTopicName, genericRowPair.getLeft(), - genericRowPair.getRight())); + genericRowPair.getRight(), + printRows)); try { final long interval = maxInterval < 0 ? INTER_MESSAGE_MAX_INTERVAL : maxInterval; @@ -121,11 +127,17 @@ private static class LoggingCallback implements Callback { private final String topic; private final String key; private final String value; + private final boolean printOnSuccess; - LoggingCallback(final String topic, final Struct key, final GenericRow value) { + LoggingCallback( + final String topic, + final Struct key, + final GenericRow value, + final boolean printOnSuccess) { this.topic = topic; this.key = formatKey(key); this.value = Objects.toString(value); + this.printOnSuccess = printOnSuccess; } @Override @@ -138,7 +150,9 @@ public void onCompletion(final RecordMetadata metadata, final Exception e) { ); e.printStackTrace(System.err); } else { - System.out.println(key + " --> (" + value + ") ts:" + metadata.timestamp()); + if (printOnSuccess) { + System.out.println(key + " --> (" + value + ") ts:" + metadata.timestamp()); + } } } diff --git a/ksql-examples/src/test/java/io/confluent/ksql/datagen/DataGenTest.java b/ksql-examples/src/test/java/io/confluent/ksql/datagen/DataGenTest.java index 4a7f0493d5a2..e09af53ec3c4 100644 --- a/ksql-examples/src/test/java/io/confluent/ksql/datagen/DataGenTest.java +++ b/ksql-examples/src/test/java/io/confluent/ksql/datagen/DataGenTest.java @@ -80,7 +80,10 @@ public void shouldPassSchemaRegistryUrl() throws Exception { 0, 0L, "srUrl", - null + null, + 1, + -1, + true ); final Properties props = DataGen.getProperties(args);