Skip to content

Commit

Permalink
feat: enhance datagen for use as a load generator
Browse files Browse the repository at this point in the history
Resurrecting some ancient enhancements to datagen so that we can use it
to generate load:

- Add a flag to disable printing each row
- Add a flag to control the number of threads producing data
- Add a flag to control the total message rate (msgs/second) across all the
  threads. The rate limiting is implemented using a token bucket.
  • Loading branch information
rodesai committed Aug 21, 2019
1 parent bc7148f commit 9295204
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 34 deletions.
181 changes: 153 additions & 28 deletions ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,24 @@
package io.confluent.ksql.datagen;

import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.RateLimiter;
import io.confluent.avro.random.generator.Generator;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.util.KsqlConfig;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

public final class DataGen {

Expand Down Expand Up @@ -58,19 +63,48 @@ static void run(final String... args) throws IOException {
return;
}

final Generator generator = new Generator(arguments.schemaFile, new Random());
final Properties props = getProperties(arguments);
final DataGenProducer dataProducer = ProducerFactory
.getProducer(arguments.keyFormat, arguments.valueFormat, props);
final Optional<RateLimiter> rateLimiter = arguments.msgRate != -1
? Optional.of(RateLimiter.create(arguments.msgRate)) : Optional.empty();

dataProducer.populateTopic(
props,
generator,
arguments.topicName,
arguments.keyName,
arguments.iterations,
arguments.maxInterval
);
final List<Thread> threads = new LinkedList<>();
for (int i = 0; i < arguments.numThreads; i++) {
threads.add(startProducerThread(arguments, dataProducer, props, rateLimiter));
}

for (final Thread t : threads) {
try {
t.join();
} catch (InterruptedException e) {
// interrupted. exit with error
System.exit(1);
}
}
}

private static Thread startProducerThread(
final Arguments arguments,
final DataGenProducer dataProducer,
final Properties props,
final Optional<RateLimiter> rateLimiter) throws IOException {
final Generator generator = new Generator(arguments.schemaFile.get(), new Random());
final Thread t = new Thread(() -> {
dataProducer.populateTopic(
props,
generator,
arguments.topicName,
arguments.keyName,
arguments.iterations,
arguments.maxInterval,
arguments.printRows,
rateLimiter
);
});
t.setDaemon(true);
t.start();
return t;
}

static Properties getProperties(final Arguments arguments) throws IOException {
Expand Down Expand Up @@ -105,15 +139,18 @@ private static void usage() {
+ "key=<name of key column> " + newLine
+ "[iterations=<number of rows> (defaults to 1,000,000)] " + newLine
+ "[maxInterval=<Max time in ms between rows> (defaults to 500)] " + newLine
+ "[propertiesFile=<file specifying Kafka client properties>]" + newLine
+ "[propertiesFile=<file specifying Kafka client properties>] " + newLine
+ "[nThreads=<number of producers to start>] " + newLine
+ "[msgRate=<rate to produce in msgs/second>] " + newLine
+ "[printRows=<true|false>]" + newLine
);
}

static class Arguments {

private final boolean help;
private final String bootstrapServer;
private final InputStream schemaFile;
private final Supplier<InputStream> schemaFile;
private final Format keyFormat;
private final Format valueFormat;
private final String topicName;
Expand All @@ -122,20 +159,26 @@ static class Arguments {
private final long maxInterval;
private final String schemaRegistryUrl;
private final InputStream propertiesFile;
private final int numThreads;
private final int msgRate;
private final boolean printRows;

// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
Arguments(
final boolean help,
final String bootstrapServer,
final InputStream schemaFile,
final Supplier<InputStream> schemaFile,
final Format keyFormat,
final Format valueFormat,
final String topicName,
final String keyName,
final int iterations,
final long maxInterval,
final String schemaRegistryUrl,
final InputStream propertiesFile
final InputStream propertiesFile,
final int numThreads,
final int msgRate,
final boolean printRows
) {
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
this.help = help;
Expand All @@ -149,6 +192,9 @@ static class Arguments {
this.maxInterval = maxInterval;
this.schemaRegistryUrl = schemaRegistryUrl;
this.propertiesFile = propertiesFile;
this.numThreads = numThreads;
this.msgRate = msgRate;
this.printRows = printRows;
}

static class ArgumentParseException extends RuntimeException {
Expand All @@ -171,19 +217,22 @@ private static final class Builder {
.put("format", (builder, argVal) -> builder.valueFormat = parseFormat(argVal))
.put("topic", (builder, argVal) -> builder.topicName = argVal)
.put("key", (builder, argVal) -> builder.keyName = argVal)
.put("iterations", (builder, argVal) -> builder.iterations = parseIterations(argVal))
.put("iterations", (builder, argVal) -> builder.iterations = parseInt(argVal, 1))
.put("maxInterval",
(builder, argVal) -> builder.maxInterval = parseIterations(argVal))
(builder, argVal) -> builder.maxInterval = parseInt(argVal, 0))
.put("schemaRegistryUrl", (builder, argVal) -> builder.schemaRegistryUrl = argVal)
.put("propertiesFile",
(builder, argVal) -> builder.propertiesFile = toFileInputStream(argVal))
(builder, argVal) -> builder.propertiesFile = toFileInputStream(argVal).get())
.put("msgRate", (builder, argVal) -> builder.msgRate = parseMsgRate(argVal))
.put("nThreads", (builder, argVal) -> builder.numThreads = parseNumThreads(argVal))
.put("printRows", (builder, argVal) -> builder.printRows = parsePrintRows(argVal))
.build();

private Quickstart quickstart;

private boolean help;
private String bootstrapServer;
private InputStream schemaFile;
private Supplier<InputStream> schemaFile;
private Format keyFormat;
private Format valueFormat;
private String topicName;
Expand All @@ -192,6 +241,9 @@ private static final class Builder {
private long maxInterval;
private String schemaRegistryUrl;
private InputStream propertiesFile;
private int msgRate;
private int numThreads;
private boolean printRows;

private Builder() {
quickstart = null;
Expand All @@ -206,6 +258,9 @@ private Builder() {
maxInterval = -1;
schemaRegistryUrl = "http://localhost:8081";
propertiesFile = null;
msgRate = -1;
numThreads = 1;
printRows = true;
}

private enum Quickstart {
Expand All @@ -228,8 +283,8 @@ private enum Quickstart {
this.keyName = keyName;
}

public InputStream getSchemaFile() {
return getClass().getClassLoader().getResourceAsStream(schemaFileName);
public Supplier<InputStream> getSchemaFile() {
return () -> getClass().getClassLoader().getResourceAsStream(schemaFileName);
}

public String getTopicName(final Format format) {
Expand All @@ -251,7 +306,22 @@ public Format getValueFormat() {

Arguments build() {
if (help) {
return new Arguments(true, null, null, null, null,null, null, 0, -1, null, null);
return new Arguments(
true,
null,
null,
null,
null,
null,
null,
0,
-1,
null,
null,
1,
-1,
true
);
}

if (quickstart != null) {
Expand Down Expand Up @@ -282,7 +352,10 @@ Arguments build() {
iterations,
maxInterval,
schemaRegistryUrl,
propertiesFile
propertiesFile,
numThreads,
msgRate,
printRows
);
}

Expand Down Expand Up @@ -342,9 +415,15 @@ private void setArg(final String argName, final String argVal) {
handler.accept(this, argVal);
}

private static FileInputStream toFileInputStream(final String argVal) {
private static Supplier<InputStream> toFileInputStream(final String argVal) {
try {
return new FileInputStream(argVal);
return () -> {
try {
return new FileInputStream(argVal);
} catch (final FileNotFoundException e) {
throw new RuntimeException(e);
}
};
} catch (final Exception e) {
throw new IllegalArgumentException("File not found: " + argVal, e);
}
Expand Down Expand Up @@ -375,19 +454,65 @@ private static Format parseFormat(final String formatString) {
}
}

private static int parseIterations(final String iterationsString) {
private static int parseNumThreads(final String numThreadsString) {
try {
final int result = Integer.valueOf(numThreadsString, 10);
if (result < 0) {
throw new ArgumentParseException(String.format(
"Invalid number of threads in '%d'; must be a positive number",
result));
}
return result;
} catch (NumberFormatException e) {
throw new ArgumentParseException(String.format(
"Invalid number of threads in '%s'; must be a positive number",
numThreadsString));
}
}

private static int parseMsgRate(final String msgRateString) {
try {
final int result = Integer.valueOf(msgRateString, 10);
if (result < 0) {
throw new ArgumentParseException(String.format(
"Invalid msg rate in '%d'; must be a positive number",
result));
}
return result;
} catch (NumberFormatException e) {
throw new ArgumentParseException(String.format(
"Invalid msg rate in '%s'; must be a positive number",
msgRateString));
}
}

private static boolean parsePrintRows(final String printRowsString) {
switch (printRowsString.toLowerCase()) {
case "false":
return false;
case "true":
return true;
default:
throw new ArgumentParseException(String.format(
"Invalid value for printRows in '%s'; must be true or false",
printRowsString
));
}
}

private static int parseInt(final String iterationsString, final int minValue) {
try {
final int result = Integer.valueOf(iterationsString, 10);
if (result <= 0) {
if (result < minValue) {
throw new ArgumentParseException(String.format(
"Invalid number of iterations in '%d'; must be a positive number",
result
"Invalid integer value '%d'; must be > %d",
result, minValue
));
}
return Integer.valueOf(iterationsString, 10);
} catch (final NumberFormatException exception) {
throw new ArgumentParseException(String.format(
"Invalid number of iterations in '%s'; must be a valid base 10 integer",
"Invalid integer value '%s'; must be a valid base 10 integer",
iterationsString
));
}
Expand Down
Loading

0 comments on commit 9295204

Please sign in to comment.