Skip to content

Commit

Permalink
Adds Pattern support to Kafka Streams Topology Dev UI
Browse files Browse the repository at this point in the history
  • Loading branch information
dcotfr committed Nov 5, 2023
1 parent fded7c0 commit f3474cb
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void accept(TopologyParserContext context) {

private static final RawTopologyItemParser SOURCE = new RawTopologyItemParser() {
private final Pattern sourcePattern = Pattern
.compile("Source:\\s+(?<source>\\S+)\\s+\\(topics:\\s+\\[(?<topics>.*)\\]\\).*");
.compile("Source:\\s+(?<source>\\S+)\\s+\\(topics:\\s+((\\[(?<topics>.*)\\])|(?<regex>.*)\\)).*");
private Matcher matcher;

@Override
Expand All @@ -77,7 +77,11 @@ public boolean test(String line) {

@Override
public void accept(TopologyParserContext context) {
context.addSources(matcher.group("source"), matcher.group("topics").split(","));
if (matcher.group("topics") != null) {
context.addSources(matcher.group("source"), matcher.group("topics").split(","));
} else if (matcher.group("regex") != null) {
context.addRegexSource(matcher.group("source"), matcher.group("regex"));
}
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ void addSources(String source, String[] topics) {
});
}

void addRegexSource(String source, String regex) {
currentNode = source;
final var cleanRegex = regex.trim();
if (!cleanRegex.isEmpty()) {
sources.add(cleanRegex);
graphviz.addRegexSource(source, cleanRegex);
mermaid.addRegexSource(source, cleanRegex);
}
}

void addStores(String[] stores, String processor, boolean join) {
currentNode = processor;
Arrays.stream(stores)
Expand Down Expand Up @@ -71,20 +81,15 @@ String toGraph() {
final var res = new ArrayList<String>();

res.add("digraph {");
res.add(" fontname=\"Helvetica\"; fontsize=\"10\";");
res.add(" node [style=filled fillcolor=white color=\"#C9B7DD\" shape=box fontname=\"Helvetica\" fontsize=\"10\"];");
res.add(" fontname=Helvetica; fontsize=10;");
res.add(" node [style=filled fillcolor=white color=\"#C9B7DD\" shape=box fontname=Helvetica fontsize=10];");
nodes.forEach(n -> res.add(' ' + n + ';'));
subGraphs.entrySet().forEach(e -> {
res.add(" subgraph cluster" + e.getKey() + " {");
res.add(" label=\"Sub-Topology: " + e.getKey() + "\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";");
e.getValue().forEach(v -> res.add(" " + v + ';'));
res.add(" }");
});
for (int i = 0; i < subGraphs.size(); i++) {
res.add(" subgraph cluster" + i + " {");
res.add(" label=\"Sub-Topology: " + i + "\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";");
res.add(" }");
}
edges.forEach(e -> res.add(' ' + e + ';'));
res.add("}");

Expand All @@ -108,6 +113,15 @@ private void addSource(String source, String topic) {
subGraphs.get(currentGraph).add(toId(source));
}

private void addRegexSource(String source, String regex) {
final var regexId = "REGEX_" + nodes.size();
final var regexLabel = regex.replaceAll("\\\\", "\\\\\\\\");
nodes.add(regexId + " [label=\"" + regexLabel + "\" shape=invhouse style=dashed margin=\"0,0\"]");
nodes.add(toId(source) + " [label=\"" + toLabel(source) + "\"]");
edges.add(regexId + " -> " + toId(source));
subGraphs.get(currentGraph).add(toId(source));
}

private void addTarget(String target, String node) {
nodes.add(toId(target) + " [label=\"" + toLabel(target) + "\"]");
edges.add(toId(node) + " -> " + toId(target));
Expand Down Expand Up @@ -164,6 +178,10 @@ private void addSource(String source, String topic) {
endpoints.add(topic + '[' + topic + "] --> " + source + '(' + toName(source) + ')');
}

private void addRegexSource(String source, String regex) {
endpoints.add("REGEX_" + endpoints.size() + '[' + regex + "] --> " + source + '(' + toName(source) + ')');
}

private void addTarget(String target, String node) {
subTopologies.add(' ' + node + '[' + toName(node) + "] --> " + target + '(' + toName(target) + ')');
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,66 +31,77 @@ public void shouldParsingStayConstant() {
+ " --> KSTREAM-SINK-0000000007\n"
+ " <-- KSTREAM-AGGREGATE-0000000005\n"
+ " Sink: KSTREAM-SINK-0000000007 (topic: temperatures-aggregated)\n"
+ " <-- KTABLE-TOSTREAM-0000000006";
+ " <-- KTABLE-TOSTREAM-0000000006\n"
+ "\n"
+ " Sub-topology: 2\n"
+ " Source: KSTREAM-SOURCE-0000000008 (topics: notification\\..+)\n"
+ " --> KSTREAM-FOREACH-0000000009\n"
+ " Processor: KSTREAM-FOREACH-0000000009 (stores: [])\n"
+ " --> none\n"
+ " <-- KSTREAM-SOURCE-0000000008";
final var actual = rpcService.parseTopologyDescription(expectedDescribe);

assertEquals(expectedDescribe, actual.getString("describe"));
assertEquals("[0, 1]", actual.getString("subTopologies"));
assertEquals("[temperature-values, weather-stations]", actual.getString("sources"));
assertEquals("[0, 1, 2]", actual.getString("subTopologies"));
assertEquals("[notification\\..+, temperature-values, weather-stations]", actual.getString("sources"));
assertEquals("[temperatures-aggregated]", actual.getString("sinks"));
assertEquals("[weather-stations-STATE-STORE-0000000000, weather-stations-store]", actual.getString("stores"));
assertEquals("digraph {\n" +
" fontname=\"Helvetica\"; fontsize=\"10\";\n" +
" node [style=filled fillcolor=white color=\"#C9B7DD\" shape=box fontname=\"Helvetica\" fontsize=\"10\"];\n" +
" weather_stations [label=\"weather\\nstations\" shape=invhouse margin=\"0,0\"];\n" +
" KSTREAM_SOURCE_0000000001 [label=\"KSTREAM\\nSOURCE\\n0000000001\"];\n" +
" KTABLE_SOURCE_0000000002 [label=\"KTABLE\\nSOURCE\\n0000000002\"];\n" +
" weather_stations_STATE_STORE_0000000000 [label=\"weather\\nstations\\nSTATE\\nSTORE\\n0000000000\" shape=cylinder];\n"
+
" temperature_values [label=\"temperature\\nvalues\" shape=invhouse margin=\"0,0\"];\n" +
" KSTREAM_SOURCE_0000000003 [label=\"KSTREAM\\nSOURCE\\n0000000003\"];\n" +
" KSTREAM_LEFTJOIN_0000000004 [label=\"KSTREAM\\nLEFTJOIN\\n0000000004\"];\n" +
" KSTREAM_AGGREGATE_0000000005 [label=\"KSTREAM\\nAGGREGATE\\n0000000005\"];\n" +
" weather_stations_store [label=\"weather\\nstations\\nstore\" shape=cylinder];\n" +
" KTABLE_TOSTREAM_0000000006 [label=\"KTABLE\\nTOSTREAM\\n0000000006\"];\n" +
" KSTREAM_SINK_0000000007 [label=\"KSTREAM\\nSINK\\n0000000007\"];\n" +
" temperatures_aggregated [label=\"temperatures\\naggregated\" shape=house margin=\"0,0\"];\n" +
" subgraph cluster0 {\n" +
" label=\"Sub-Topology: 0\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n" +
" KSTREAM_SOURCE_0000000001;\n" +
" KTABLE_SOURCE_0000000002;\n" +
" }\n" +
" subgraph cluster1 {\n" +
" label=\"Sub-Topology: 1\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n" +
" KSTREAM_SOURCE_0000000003;\n" +
" KSTREAM_LEFTJOIN_0000000004;\n" +
" KSTREAM_AGGREGATE_0000000005;\n" +
" KTABLE_TOSTREAM_0000000006;\n" +
" KSTREAM_SINK_0000000007;\n" +
" }\n" +
" subgraph cluster0 {\n" +
" label=\"Sub-Topology: 0\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n" +
" }\n" +
" subgraph cluster1 {\n" +
" label=\"Sub-Topology: 1\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n" +
" }\n" +
" weather_stations -> KSTREAM_SOURCE_0000000001;\n" +
" KSTREAM_SOURCE_0000000001 -> KTABLE_SOURCE_0000000002;\n" +
" KTABLE_SOURCE_0000000002 -> weather_stations_STATE_STORE_0000000000;\n" +
" temperature_values -> KSTREAM_SOURCE_0000000003;\n" +
" KSTREAM_SOURCE_0000000003 -> KSTREAM_LEFTJOIN_0000000004;\n" +
" KSTREAM_LEFTJOIN_0000000004 -> KSTREAM_AGGREGATE_0000000005;\n" +
" KSTREAM_AGGREGATE_0000000005 -> weather_stations_store;\n" +
" KSTREAM_AGGREGATE_0000000005 -> KTABLE_TOSTREAM_0000000006;\n" +
" KTABLE_TOSTREAM_0000000006 -> KSTREAM_SINK_0000000007;\n" +
" KSTREAM_SINK_0000000007 -> temperatures_aggregated;\n" +
"}", actual.getString("graphviz"));
assertEquals("digraph {\n"
+ " fontname=Helvetica; fontsize=10;\n"
+ " node [style=filled fillcolor=white color=\"#C9B7DD\" shape=box fontname=Helvetica fontsize=10];\n"
+ " weather_stations [label=\"weather\\nstations\" shape=invhouse margin=\"0,0\"];\n"
+ " KSTREAM_SOURCE_0000000001 [label=\"KSTREAM\\nSOURCE\\n0000000001\"];\n"
+ " KTABLE_SOURCE_0000000002 [label=\"KTABLE\\nSOURCE\\n0000000002\"];\n"
+ " weather_stations_STATE_STORE_0000000000 [label=\"weather\\nstations\\nSTATE\\nSTORE\\n0000000000\" shape=cylinder];\n"
+ " temperature_values [label=\"temperature\\nvalues\" shape=invhouse margin=\"0,0\"];\n"
+ " KSTREAM_SOURCE_0000000003 [label=\"KSTREAM\\nSOURCE\\n0000000003\"];\n"
+ " KSTREAM_LEFTJOIN_0000000004 [label=\"KSTREAM\\nLEFTJOIN\\n0000000004\"];\n"
+ " KSTREAM_AGGREGATE_0000000005 [label=\"KSTREAM\\nAGGREGATE\\n0000000005\"];\n"
+ " weather_stations_store [label=\"weather\\nstations\\nstore\" shape=cylinder];\n"
+ " KTABLE_TOSTREAM_0000000006 [label=\"KTABLE\\nTOSTREAM\\n0000000006\"];\n"
+ " KSTREAM_SINK_0000000007 [label=\"KSTREAM\\nSINK\\n0000000007\"];\n"
+ " temperatures_aggregated [label=\"temperatures\\naggregated\" shape=house margin=\"0,0\"];\n"
+ " REGEX_12 [label=\"notification\\\\..+\" shape=invhouse style=dashed margin=\"0,0\"];\n"
+ " KSTREAM_SOURCE_0000000008 [label=\"KSTREAM\\nSOURCE\\n0000000008\"];\n"
+ " KSTREAM_FOREACH_0000000009 [label=\"KSTREAM\\nFOREACH\\n0000000009\"];\n"
+ " subgraph cluster0 {\n"
+ " label=\"Sub-Topology: 0\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n"
+ " KSTREAM_SOURCE_0000000001;\n"
+ " KTABLE_SOURCE_0000000002;\n"
+ " }\n"
+ " subgraph cluster1 {\n"
+ " label=\"Sub-Topology: 1\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n"
+ " KSTREAM_SOURCE_0000000003;\n"
+ " KSTREAM_LEFTJOIN_0000000004;\n"
+ " KSTREAM_AGGREGATE_0000000005;\n"
+ " KTABLE_TOSTREAM_0000000006;\n"
+ " KSTREAM_SINK_0000000007;\n"
+ " }\n"
+ " subgraph cluster2 {\n"
+ " label=\"Sub-Topology: 2\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n"
+ " KSTREAM_SOURCE_0000000008;\n"
+ " KSTREAM_FOREACH_0000000009;\n"
+ " }\n"
+ " weather_stations -> KSTREAM_SOURCE_0000000001;\n"
+ " KSTREAM_SOURCE_0000000001 -> KTABLE_SOURCE_0000000002;\n"
+ " KTABLE_SOURCE_0000000002 -> weather_stations_STATE_STORE_0000000000;\n"
+ " temperature_values -> KSTREAM_SOURCE_0000000003;\n"
+ " KSTREAM_SOURCE_0000000003 -> KSTREAM_LEFTJOIN_0000000004;\n"
+ " KSTREAM_LEFTJOIN_0000000004 -> KSTREAM_AGGREGATE_0000000005;\n"
+ " KSTREAM_AGGREGATE_0000000005 -> weather_stations_store;\n"
+ " KSTREAM_AGGREGATE_0000000005 -> KTABLE_TOSTREAM_0000000006;\n"
+ " KTABLE_TOSTREAM_0000000006 -> KSTREAM_SINK_0000000007;\n"
+ " KSTREAM_SINK_0000000007 -> temperatures_aggregated;\n"
+ " REGEX_12 -> KSTREAM_SOURCE_0000000008;\n"
+ " KSTREAM_SOURCE_0000000008 -> KSTREAM_FOREACH_0000000009;\n"
+ "}", actual.getString("graphviz"));
assertEquals("graph TD\n"
+ " weather-stations[weather-stations] --> KSTREAM-SOURCE-0000000001(KSTREAM-<br>SOURCE-<br>0000000001)\n"
+ " KTABLE-SOURCE-0000000002[KTABLE-<br>SOURCE-<br>0000000002] --> weather-stations-STATE-STORE-0000000000(weather-<br>stations-<br>STATE-<br>STORE-<br>0000000000)\n"
+ " temperature-values[temperature-values] --> KSTREAM-SOURCE-0000000003(KSTREAM-<br>SOURCE-<br>0000000003)\n"
+ " KSTREAM-AGGREGATE-0000000005[KSTREAM-<br>AGGREGATE-<br>0000000005] --> weather-stations-store(weather-<br>stations-<br>store)\n"
+ " KSTREAM-SINK-0000000007[KSTREAM-<br>SINK-<br>0000000007] --> temperatures-aggregated(temperatures-aggregated)\n"
+ " REGEX_5[notification\\..+] --> KSTREAM-SOURCE-0000000008(KSTREAM-<br>SOURCE-<br>0000000008)\n"
+ " subgraph Sub-Topology: 0\n"
+ " KSTREAM-SOURCE-0000000001[KSTREAM-<br>SOURCE-<br>0000000001] --> KTABLE-SOURCE-0000000002(KTABLE-<br>SOURCE-<br>0000000002)\n"
+ " end\n"
Expand All @@ -99,6 +110,9 @@ public void shouldParsingStayConstant() {
+ " KSTREAM-LEFTJOIN-0000000004[KSTREAM-<br>LEFTJOIN-<br>0000000004] --> KSTREAM-AGGREGATE-0000000005(KSTREAM-<br>AGGREGATE-<br>0000000005)\n"
+ " KSTREAM-AGGREGATE-0000000005[KSTREAM-<br>AGGREGATE-<br>0000000005] --> KTABLE-TOSTREAM-0000000006(KTABLE-<br>TOSTREAM-<br>0000000006)\n"
+ " KTABLE-TOSTREAM-0000000006[KTABLE-<br>TOSTREAM-<br>0000000006] --> KSTREAM-SINK-0000000007(KSTREAM-<br>SINK-<br>0000000007)\n"
+ " end\n"
+ " subgraph Sub-Topology: 2\n"
+ " KSTREAM-SOURCE-0000000008[KSTREAM-<br>SOURCE-<br>0000000008] --> KSTREAM-FOREACH-0000000009(KSTREAM-<br>FOREACH-<br>0000000009)\n"
+ " end", actual.getString("mermaid"));
}
}

0 comments on commit f3474cb

Please sign in to comment.