Skip to content

Commit

Permalink
Offset explorer - used to expose the earliest offset and latest offse…
Browse files Browse the repository at this point in the history
…t for topics (#29)
  • Loading branch information
chia7712 authored Sep 26, 2021
1 parent 05fcc4c commit 24e8ff0
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 2 deletions.
20 changes: 19 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,27 @@ Run the benchmark from release
```

### Latency Benchmark Configurations
1. --bootstrap.servers: the brokers addresses
1. --bootstrap.servers: the server to connect to
2. --consumers: the number of consumers (threads). Default: 1
3. --producers: the number of producers (threads). Default: 1
4. --valueSize: the size of record value. Default: 100 bytes
5. --duration: the duration to run this benchmark. Default: 5 seconds
6. --flushDuration: the duration to flush producer records. Default: 2 seconds

## Offset Explorer

This tool can expose both earliest offset and latest offset for all (public and private) topics.

Run the tool from source code
```shell
./gradlew run --args="offset --bootstrap.servers 192.168.50.178:19993"
```

Run the benchmark from release
```shell
./bin/App offset --bootstrap.servers 192.168.50.178:19993
```

### Offset Explorer Configurations
1. --bootstrap.servers: the server to connect to
2. --topic: the topic to search
4 changes: 3 additions & 1 deletion app/src/main/java/org/astraea/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.astraea.offset.OffsetExplorer;
import org.astraea.performance.latency.End2EndLatency;

public class App {
private static final List<Class<?>> MAIN_CLASSES = Arrays.asList(End2EndLatency.class);
private static final List<Class<?>> MAIN_CLASSES =
Arrays.asList(End2EndLatency.class, OffsetExplorer.class);

private static String toString(List<Class<?>> mains) {
return mains.stream().map(Class::getName).collect(Collectors.joining(","));
Expand Down
168 changes: 168 additions & 0 deletions app/src/main/java/org/astraea/offset/OffsetExplorer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package org.astraea.offset;

import java.io.Closeable;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.common.TopicPartition;

public class OffsetExplorer {
static final String BROKERS_KEY = "--bootstrap.servers";
static final String TOPIC_KEY = "--topic";

interface Admin extends Closeable {
Set<String> topics();

Set<TopicPartition> partitions(Set<String> topics);

Map<TopicPartition, Long> earliestOffsets(Set<TopicPartition> partitions);

Map<TopicPartition, Long> latestOffsets(Set<TopicPartition> partitions);

static Admin of(org.apache.kafka.clients.admin.Admin admin) {
return new Admin() {
@Override
public void close() {
admin.close();
}

@Override
public Set<String> topics() {
try {
return admin.listTopics(new ListTopicsOptions().listInternal(true)).names().get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public Set<TopicPartition> partitions(Set<String> topics) {
try {
return admin.describeTopics(topics).all().get().entrySet().stream()
.flatMap(
e ->
e.getValue().partitions().stream()
.map(p -> new TopicPartition(e.getKey(), p.partition())))
.collect(Collectors.toSet());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public Map<TopicPartition, Long> earliestOffsets(Set<TopicPartition> partitions) {
try {
return admin
.listOffsets(
partitions.stream()
.collect(Collectors.toMap(e -> e, e -> new OffsetSpec.EarliestSpec())))
.all()
.get()
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public Map<TopicPartition, Long> latestOffsets(Set<TopicPartition> partitions) {
try {
return admin
.listOffsets(
partitions.stream()
.collect(Collectors.toMap(e -> e, e -> new OffsetSpec.LatestSpec())))
.all()
.get()
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
}
}

private static String help() {
return "Available configs:\n"
+ BROKERS_KEY
+ ": (REQUIRED) The server to connect to\n"
+ TOPIC_KEY
+ ": (OPTIONAL) the topic to check";
}

static Map<TopicPartition, Map.Entry<Long, Long>> execute(Admin admin, Set<String> topics) {
var topicPartitions = admin.partitions(topics);

var earliestOffsets = admin.earliestOffsets(topicPartitions);
var latestOffsets = admin.latestOffsets(topicPartitions);

var result = new LinkedHashMap<TopicPartition, Map.Entry<Long, Long>>();

topics.forEach(
topic ->
earliestOffsets.entrySet().stream()
.filter(e -> e.getKey().topic().equals(topic))
.sorted(
Comparator.comparing((Map.Entry<TopicPartition, Long> o) -> o.getKey().topic())
.thenComparingInt(o -> o.getKey().partition()))
.forEach(
earliestOffset ->
latestOffsets.entrySet().stream()
.filter(e -> e.getKey().equals(earliestOffset.getKey()))
.forEach(
latestOffset ->
result.put(
earliestOffset.getKey(),
Map.entry(
earliestOffset.getValue(), latestOffset.getValue())))));

return result;
}

public static void main(String[] args) throws IOException {
var configs = toMaps(args);
try (var admin = Admin.of(AdminClient.create(toAdminProps(configs)))) {
var topics =
configs.containsKey(TOPIC_KEY)
? Collections.singleton(configs.get(TOPIC_KEY))
: admin.topics();

var result = execute(admin, topics);
result.forEach(
(k, v) ->
System.out.println(
"topic: "
+ k.topic()
+ " partition: "
+ k.partition()
+ " start: "
+ v.getKey()
+ " end: "
+ v.getValue()));
}
}

static Map<String, Object> toAdminProps(Map<String, String> argMap) {
var props = new HashMap<String, Object>();
if (!argMap.containsKey(BROKERS_KEY)) throw new IllegalArgumentException(help());
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, argMap.get(BROKERS_KEY));
return props;
}

private static Map<String, String> toMaps(String[] args) {
var argMap = new HashMap<String, String>();
for (var i = 0; i <= args.length; i += 2) {
if (i + 1 >= args.length) break;
argMap.put(args[i], args[i + 1]);
}
return argMap;
}
}
65 changes: 65 additions & 0 deletions app/src/test/java/org/astraea/offset/OffsetExplorerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.astraea.offset;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class OffsetExplorerTest {

@Test
void testToAdminProps() {
Assertions.assertThrows(
IllegalArgumentException.class, () -> OffsetExplorer.toAdminProps(Collections.emptyMap()));

Assertions.assertEquals(
"brokers",
OffsetExplorer.toAdminProps(Map.of(OffsetExplorer.BROKERS_KEY, "brokers"))
.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
}

@Test
void testExecute() {
var topicName = "topic";
var partition = 1000;
var latestOffset = 1000L;
var earliestOffset = 100L;

try (var admin =
new OffsetExplorer.Admin() {
@Override
public void close() {}

@Override
public Set<String> topics() {
return Collections.singleton("topic");
}

@Override
public Set<TopicPartition> partitions(Set<String> topics) {
return Collections.singleton(new TopicPartition(topicName, partition));
}

@Override
public Map<TopicPartition, Long> earliestOffsets(Set<TopicPartition> partitions) {
return Map.of(new TopicPartition(topicName, partition), earliestOffset);
}

@Override
public Map<TopicPartition, Long> latestOffsets(Set<TopicPartition> partitions) {
return Map.of(new TopicPartition(topicName, partition), latestOffset);
}
}) {
var result = OffsetExplorer.execute(admin, Collections.singleton(topicName));
Assertions.assertEquals(1, result.size());
var item = result.entrySet().iterator().next();
Assertions.assertEquals(topicName, item.getKey().topic());
Assertions.assertEquals(partition, item.getKey().partition());
Assertions.assertEquals(earliestOffset, item.getValue().getKey());
Assertions.assertEquals(latestOffset, item.getValue().getValue());
}
}
}

0 comments on commit 24e8ff0

Please sign in to comment.