From aaa62952ee14d55da0049ee644f1cc29511ecbcd Mon Sep 17 00:00:00 2001 From: David Cotton Date: Wed, 15 Nov 2023 23:14:55 +0100 Subject: [PATCH] Adjustment to support all characters in topic names --- .../runtime/devui/TopologyParserContext.java | 54 +++++++------ .../devui/KafkaStreamsJsonRPCServiceTest.java | 76 +++++++++---------- 2 files changed, 70 insertions(+), 60 deletions(-) diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devui/TopologyParserContext.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devui/TopologyParserContext.java index 6a047065af724a..db4139f54d95da 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devui/TopologyParserContext.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devui/TopologyParserContext.java @@ -18,47 +18,52 @@ final class TopologyParserContext { final Mermaid mermaid = new Mermaid(); void addSubTopology(String subTopology) { - subTopologies.add(subTopology); - graphviz.addSubTopology(subTopology); - mermaid.addSubTopology(subTopology); + final var cleanSubTopology = clean(subTopology); + subTopologies.add(cleanSubTopology); + graphviz.addSubTopology(cleanSubTopology); + mermaid.addSubTopology(cleanSubTopology); } void addSink(String sink, String topic) { - sinks.add(topic); - currentNode = sink; - graphviz.addSink(sink, topic); - mermaid.addSink(sink, topic); + final var cleanTopic = clean(topic); + sinks.add(cleanTopic); + final var cleanSink = clean(sink); + currentNode = clean(cleanSink); + graphviz.addSink(cleanSink, cleanTopic); + mermaid.addSink(cleanSink, cleanTopic); } void addSources(String source, String[] topics) { - currentNode = source; + currentNode = clean(source); Arrays.stream(topics) .map(String::trim).filter(topic -> !topic.isEmpty()) .forEachOrdered(topic -> { - sources.add(topic); - graphviz.addSource(source, topic); - mermaid.addSource(source, topic); + final var cleanTopic = clean(topic); + sources.add(cleanTopic); + graphviz.addSource(currentNode, cleanTopic); + mermaid.addSource(currentNode, cleanTopic); }); } void addRegexSource(String source, String regex) { - currentNode = source; - final var cleanRegex = regex.trim(); + currentNode = clean(source); + final var cleanRegex = clean(regex); if (!cleanRegex.isEmpty()) { sources.add(cleanRegex); - graphviz.addRegexSource(source, cleanRegex); - mermaid.addRegexSource(source, cleanRegex); + graphviz.addRegexSource(currentNode, cleanRegex); + mermaid.addRegexSource(currentNode, cleanRegex); } } void addStores(String[] stores, String processor, boolean join) { - currentNode = processor; + currentNode = clean(processor); Arrays.stream(stores) .map(String::trim).filter(store -> !store.isEmpty()) .forEachOrdered(store -> { - this.stores.add(store); - graphviz.addStore(store, currentNode, join); - mermaid.addStore(store, currentNode, join); + final var cleanStore = clean(store); + this.stores.add(cleanStore); + graphviz.addStore(cleanStore, currentNode, join); + mermaid.addStore(cleanStore, currentNode, join); }); } @@ -66,11 +71,16 @@ void addTargets(String[] targets) { Arrays.stream(targets) .map(String::trim).filter(target -> !("none".equals(target) || target.isEmpty())) .forEachOrdered(target -> { - graphviz.addTarget(target, currentNode); - mermaid.addTarget(target, currentNode); + final var cleanTarget = clean(target); + graphviz.addTarget(cleanTarget, currentNode); + mermaid.addTarget(cleanTarget, currentNode); }); } + private static String clean(String name) { + return name != null ? name.trim().replaceAll("\"", "") : null; + } + static final class Graphviz { String currentGraph = ""; final List nodes = new ArrayList<>(); @@ -138,7 +148,7 @@ private void addStore(String store, String node, boolean join) { } private static String toId(String name) { - return name.replaceAll("-", "_"); + return '\"' + name + '\"'; } private static String toLabel(String name) { diff --git a/extensions/kafka-streams/runtime/src/test/java/io/quarkus/kafka/streams/runtime/devui/KafkaStreamsJsonRPCServiceTest.java b/extensions/kafka-streams/runtime/src/test/java/io/quarkus/kafka/streams/runtime/devui/KafkaStreamsJsonRPCServiceTest.java index 854ac92d972512..e1f83f489792b9 100644 --- a/extensions/kafka-streams/runtime/src/test/java/io/quarkus/kafka/streams/runtime/devui/KafkaStreamsJsonRPCServiceTest.java +++ b/extensions/kafka-streams/runtime/src/test/java/io/quarkus/kafka/streams/runtime/devui/KafkaStreamsJsonRPCServiceTest.java @@ -19,7 +19,7 @@ public void shouldParsingStayConstant() { + " --> none\n" + " <-- KSTREAM-SOURCE-0000000001\n" + "Sub-topology: 1\n" - + " Source: KSTREAM-SOURCE-0000000003 (topics: [temperature-values])\n" + + " Source: KSTREAM-SOURCE-0000000003 (topics: [temperature.values])\n" + " --> KSTREAM-LEFTJOIN-0000000004\n" + " Processor: KSTREAM-LEFTJOIN-0000000004 (stores: [])\n" + " --> KSTREAM-AGGREGATE-0000000005\n" @@ -43,62 +43,62 @@ public void shouldParsingStayConstant() { assertEquals(expectedDescribe, actual.getString("describe")); assertEquals("[0, 1, 2]", actual.getString("subTopologies")); - assertEquals("[notification\\..+, temperature-values, weather-stations]", actual.getString("sources")); + assertEquals("[notification\\..+, temperature.values, weather-stations]", actual.getString("sources")); assertEquals("[temperatures-aggregated]", actual.getString("sinks")); assertEquals("[weather-stations-STATE-STORE-0000000000, weather-stations-store]", actual.getString("stores")); assertEquals("digraph {\n" + " fontname=Helvetica; fontsize=10;\n" + " node [style=filled fillcolor=white color=\"#C9B7DD\" shape=box fontname=Helvetica fontsize=10];\n" - + " weather_stations [label=\"weather\\nstations\" shape=invhouse margin=\"0,0\"];\n" - + " KSTREAM_SOURCE_0000000001 [label=\"KSTREAM\\nSOURCE\\n0000000001\"];\n" - + " KTABLE_SOURCE_0000000002 [label=\"KTABLE\\nSOURCE\\n0000000002\"];\n" - + " weather_stations_STATE_STORE_0000000000 [label=\"weather\\nstations\\nSTATE\\nSTORE\\n0000000000\" shape=cylinder];\n" - + " temperature_values [label=\"temperature\\nvalues\" shape=invhouse margin=\"0,0\"];\n" - + " KSTREAM_SOURCE_0000000003 [label=\"KSTREAM\\nSOURCE\\n0000000003\"];\n" - + " KSTREAM_LEFTJOIN_0000000004 [label=\"KSTREAM\\nLEFTJOIN\\n0000000004\"];\n" - + " KSTREAM_AGGREGATE_0000000005 [label=\"KSTREAM\\nAGGREGATE\\n0000000005\"];\n" - + " weather_stations_store [label=\"weather\\nstations\\nstore\" shape=cylinder];\n" - + " KTABLE_TOSTREAM_0000000006 [label=\"KTABLE\\nTOSTREAM\\n0000000006\"];\n" - + " KSTREAM_SINK_0000000007 [label=\"KSTREAM\\nSINK\\n0000000007\"];\n" - + " temperatures_aggregated [label=\"temperatures\\naggregated\" shape=house margin=\"0,0\"];\n" + + " \"weather-stations\" [label=\"weather\\nstations\" shape=invhouse margin=\"0,0\"];\n" + + " \"KSTREAM-SOURCE-0000000001\" [label=\"KSTREAM\\nSOURCE\\n0000000001\"];\n" + + " \"KTABLE-SOURCE-0000000002\" [label=\"KTABLE\\nSOURCE\\n0000000002\"];\n" + + " \"weather-stations-STATE-STORE-0000000000\" [label=\"weather\\nstations\\nSTATE\\nSTORE\\n0000000000\" shape=cylinder];\n" + + " \"temperature.values\" [label=\"temperature.values\" shape=invhouse margin=\"0,0\"];\n" + + " \"KSTREAM-SOURCE-0000000003\" [label=\"KSTREAM\\nSOURCE\\n0000000003\"];\n" + + " \"KSTREAM-LEFTJOIN-0000000004\" [label=\"KSTREAM\\nLEFTJOIN\\n0000000004\"];\n" + + " \"KSTREAM-AGGREGATE-0000000005\" [label=\"KSTREAM\\nAGGREGATE\\n0000000005\"];\n" + + " \"weather-stations-store\" [label=\"weather\\nstations\\nstore\" shape=cylinder];\n" + + " \"KTABLE-TOSTREAM-0000000006\" [label=\"KTABLE\\nTOSTREAM\\n0000000006\"];\n" + + " \"KSTREAM-SINK-0000000007\" [label=\"KSTREAM\\nSINK\\n0000000007\"];\n" + + " \"temperatures-aggregated\" [label=\"temperatures\\naggregated\" shape=house margin=\"0,0\"];\n" + " REGEX_12 [label=\"notification\\\\..+\" shape=invhouse style=dashed margin=\"0,0\"];\n" - + " KSTREAM_SOURCE_0000000008 [label=\"KSTREAM\\nSOURCE\\n0000000008\"];\n" - + " KSTREAM_FOREACH_0000000009 [label=\"KSTREAM\\nFOREACH\\n0000000009\"];\n" + + " \"KSTREAM-SOURCE-0000000008\" [label=\"KSTREAM\\nSOURCE\\n0000000008\"];\n" + + " \"KSTREAM-FOREACH-0000000009\" [label=\"KSTREAM\\nFOREACH\\n0000000009\"];\n" + " subgraph cluster0 {\n" + " label=\"Sub-Topology: 0\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n" - + " KSTREAM_SOURCE_0000000001;\n" - + " KTABLE_SOURCE_0000000002;\n" + + " \"KSTREAM-SOURCE-0000000001\";\n" + + " \"KTABLE-SOURCE-0000000002\";\n" + " }\n" + " subgraph cluster1 {\n" + " label=\"Sub-Topology: 1\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n" - + " KSTREAM_SOURCE_0000000003;\n" - + " KSTREAM_LEFTJOIN_0000000004;\n" - + " KSTREAM_AGGREGATE_0000000005;\n" - + " KTABLE_TOSTREAM_0000000006;\n" - + " KSTREAM_SINK_0000000007;\n" + + " \"KSTREAM-SOURCE-0000000003\";\n" + + " \"KSTREAM-LEFTJOIN-0000000004\";\n" + + " \"KSTREAM-AGGREGATE-0000000005\";\n" + + " \"KTABLE-TOSTREAM-0000000006\";\n" + + " \"KSTREAM-SINK-0000000007\";\n" + " }\n" + " subgraph cluster2 {\n" + " label=\"Sub-Topology: 2\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n" - + " KSTREAM_SOURCE_0000000008;\n" - + " KSTREAM_FOREACH_0000000009;\n" + + " \"KSTREAM-SOURCE-0000000008\";\n" + + " \"KSTREAM-FOREACH-0000000009\";\n" + " }\n" - + " weather_stations -> KSTREAM_SOURCE_0000000001;\n" - + " KSTREAM_SOURCE_0000000001 -> KTABLE_SOURCE_0000000002;\n" - + " KTABLE_SOURCE_0000000002 -> weather_stations_STATE_STORE_0000000000;\n" - + " temperature_values -> KSTREAM_SOURCE_0000000003;\n" - + " KSTREAM_SOURCE_0000000003 -> KSTREAM_LEFTJOIN_0000000004;\n" - + " KSTREAM_LEFTJOIN_0000000004 -> KSTREAM_AGGREGATE_0000000005;\n" - + " KSTREAM_AGGREGATE_0000000005 -> weather_stations_store;\n" - + " KSTREAM_AGGREGATE_0000000005 -> KTABLE_TOSTREAM_0000000006;\n" - + " KTABLE_TOSTREAM_0000000006 -> KSTREAM_SINK_0000000007;\n" - + " KSTREAM_SINK_0000000007 -> temperatures_aggregated;\n" - + " REGEX_12 -> KSTREAM_SOURCE_0000000008;\n" - + " KSTREAM_SOURCE_0000000008 -> KSTREAM_FOREACH_0000000009;\n" + + " \"weather-stations\" -> \"KSTREAM-SOURCE-0000000001\";\n" + + " \"KSTREAM-SOURCE-0000000001\" -> \"KTABLE-SOURCE-0000000002\";\n" + + " \"KTABLE-SOURCE-0000000002\" -> \"weather-stations-STATE-STORE-0000000000\";\n" + + " \"temperature.values\" -> \"KSTREAM-SOURCE-0000000003\";\n" + + " \"KSTREAM-SOURCE-0000000003\" -> \"KSTREAM-LEFTJOIN-0000000004\";\n" + + " \"KSTREAM-LEFTJOIN-0000000004\" -> \"KSTREAM-AGGREGATE-0000000005\";\n" + + " \"KSTREAM-AGGREGATE-0000000005\" -> \"weather-stations-store\";\n" + + " \"KSTREAM-AGGREGATE-0000000005\" -> \"KTABLE-TOSTREAM-0000000006\";\n" + + " \"KTABLE-TOSTREAM-0000000006\" -> \"KSTREAM-SINK-0000000007\";\n" + + " \"KSTREAM-SINK-0000000007\" -> \"temperatures-aggregated\";\n" + + " REGEX_12 -> \"KSTREAM-SOURCE-0000000008\";\n" + + " \"KSTREAM-SOURCE-0000000008\" -> \"KSTREAM-FOREACH-0000000009\";\n" + "}", actual.getString("graphviz")); assertEquals("graph TD\n" + " weather-stations[weather-stations] --> KSTREAM-SOURCE-0000000001(KSTREAM-
SOURCE-
0000000001)\n" + " KTABLE-SOURCE-0000000002[KTABLE-
SOURCE-
0000000002] --> weather-stations-STATE-STORE-0000000000(weather-
stations-
STATE-
STORE-
0000000000)\n" - + " temperature-values[temperature-values] --> KSTREAM-SOURCE-0000000003(KSTREAM-
SOURCE-
0000000003)\n" + + " temperature.values[temperature.values] --> KSTREAM-SOURCE-0000000003(KSTREAM-
SOURCE-
0000000003)\n" + " KSTREAM-AGGREGATE-0000000005[KSTREAM-
AGGREGATE-
0000000005] --> weather-stations-store(weather-
stations-
store)\n" + " KSTREAM-SINK-0000000007[KSTREAM-
SINK-
0000000007] --> temperatures-aggregated(temperatures-aggregated)\n" + " REGEX_5[notification\\..+] --> KSTREAM-SOURCE-0000000008(KSTREAM-
SOURCE-
0000000008)\n"