Skip to content

Commit

Permalink
Adjustment to support all characters in topic names
Browse files Browse the repository at this point in the history
  • Loading branch information
dcotfr committed Dec 9, 2023
1 parent e9af2cb commit 678508b
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,59 +18,69 @@ final class TopologyParserContext {
final Mermaid mermaid = new Mermaid();

void addSubTopology(String subTopology) {
subTopologies.add(subTopology);
graphviz.addSubTopology(subTopology);
mermaid.addSubTopology(subTopology);
final var cleanSubTopology = clean(subTopology);
subTopologies.add(cleanSubTopology);
graphviz.addSubTopology(cleanSubTopology);
mermaid.addSubTopology(cleanSubTopology);
}

void addSink(String sink, String topic) {
sinks.add(topic);
currentNode = sink;
graphviz.addSink(sink, topic);
mermaid.addSink(sink, topic);
final var cleanTopic = clean(topic);
sinks.add(cleanTopic);
final var cleanSink = clean(sink);
currentNode = clean(cleanSink);
graphviz.addSink(cleanSink, cleanTopic);
mermaid.addSink(cleanSink, cleanTopic);
}

void addSources(String source, String[] topics) {
currentNode = source;
currentNode = clean(source);
Arrays.stream(topics)
.map(String::trim).filter(topic -> !topic.isEmpty())
.forEachOrdered(topic -> {
sources.add(topic);
graphviz.addSource(source, topic);
mermaid.addSource(source, topic);
final var cleanTopic = clean(topic);
sources.add(cleanTopic);
graphviz.addSource(currentNode, cleanTopic);
mermaid.addSource(currentNode, cleanTopic);
});
}

void addRegexSource(String source, String regex) {
currentNode = source;
final var cleanRegex = regex.trim();
currentNode = clean(source);
final var cleanRegex = clean(regex);
if (!cleanRegex.isEmpty()) {
sources.add(cleanRegex);
graphviz.addRegexSource(source, cleanRegex);
mermaid.addRegexSource(source, cleanRegex);
graphviz.addRegexSource(currentNode, cleanRegex);
mermaid.addRegexSource(currentNode, cleanRegex);
}
}

void addStores(String[] stores, String processor, boolean join) {
currentNode = processor;
currentNode = clean(processor);
Arrays.stream(stores)
.map(String::trim).filter(store -> !store.isEmpty())
.forEachOrdered(store -> {
this.stores.add(store);
graphviz.addStore(store, currentNode, join);
mermaid.addStore(store, currentNode, join);
final var cleanStore = clean(store);
this.stores.add(cleanStore);
graphviz.addStore(cleanStore, currentNode, join);
mermaid.addStore(cleanStore, currentNode, join);
});
}

void addTargets(String[] targets) {
Arrays.stream(targets)
.map(String::trim).filter(target -> !("none".equals(target) || target.isEmpty()))
.forEachOrdered(target -> {
graphviz.addTarget(target, currentNode);
mermaid.addTarget(target, currentNode);
final var cleanTarget = clean(target);
graphviz.addTarget(cleanTarget, currentNode);
mermaid.addTarget(cleanTarget, currentNode);
});
}

private static String clean(String name) {
return name != null ? name.trim().replaceAll("\"", "") : null;
}

static final class Graphviz {
String currentGraph = "";
final List<String> nodes = new ArrayList<>();
Expand Down Expand Up @@ -138,7 +148,7 @@ private void addStore(String store, String node, boolean join) {
}

private static String toId(String name) {
return name.replaceAll("-", "_");
return '\"' + name + '\"';
}

private static String toLabel(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public void shouldParsingStayConstant() {
+ " --> none\n"
+ " <-- KSTREAM-SOURCE-0000000001\n"
+ "Sub-topology: 1\n"
+ " Source: KSTREAM-SOURCE-0000000003 (topics: [temperature-values])\n"
+ " Source: KSTREAM-SOURCE-0000000003 (topics: [temperature.values])\n"
+ " --> KSTREAM-LEFTJOIN-0000000004\n"
+ " Processor: KSTREAM-LEFTJOIN-0000000004 (stores: [])\n"
+ " --> KSTREAM-AGGREGATE-0000000005\n"
Expand All @@ -43,62 +43,62 @@ public void shouldParsingStayConstant() {

assertEquals(expectedDescribe, actual.getString("describe"));
assertEquals("[0, 1, 2]", actual.getString("subTopologies"));
assertEquals("[notification\\..+, temperature-values, weather-stations]", actual.getString("sources"));
assertEquals("[notification\\..+, temperature.values, weather-stations]", actual.getString("sources"));
assertEquals("[temperatures-aggregated]", actual.getString("sinks"));
assertEquals("[weather-stations-STATE-STORE-0000000000, weather-stations-store]", actual.getString("stores"));
assertEquals("digraph {\n"
+ " fontname=Helvetica; fontsize=10;\n"
+ " node [style=filled fillcolor=white color=\"#C9B7DD\" shape=box fontname=Helvetica fontsize=10];\n"
+ " weather_stations [label=\"weather\\nstations\" shape=invhouse margin=\"0,0\"];\n"
+ " KSTREAM_SOURCE_0000000001 [label=\"KSTREAM\\nSOURCE\\n0000000001\"];\n"
+ " KTABLE_SOURCE_0000000002 [label=\"KTABLE\\nSOURCE\\n0000000002\"];\n"
+ " weather_stations_STATE_STORE_0000000000 [label=\"weather\\nstations\\nSTATE\\nSTORE\\n0000000000\" shape=cylinder];\n"
+ " temperature_values [label=\"temperature\\nvalues\" shape=invhouse margin=\"0,0\"];\n"
+ " KSTREAM_SOURCE_0000000003 [label=\"KSTREAM\\nSOURCE\\n0000000003\"];\n"
+ " KSTREAM_LEFTJOIN_0000000004 [label=\"KSTREAM\\nLEFTJOIN\\n0000000004\"];\n"
+ " KSTREAM_AGGREGATE_0000000005 [label=\"KSTREAM\\nAGGREGATE\\n0000000005\"];\n"
+ " weather_stations_store [label=\"weather\\nstations\\nstore\" shape=cylinder];\n"
+ " KTABLE_TOSTREAM_0000000006 [label=\"KTABLE\\nTOSTREAM\\n0000000006\"];\n"
+ " KSTREAM_SINK_0000000007 [label=\"KSTREAM\\nSINK\\n0000000007\"];\n"
+ " temperatures_aggregated [label=\"temperatures\\naggregated\" shape=house margin=\"0,0\"];\n"
+ " \"weather-stations\" [label=\"weather\\nstations\" shape=invhouse margin=\"0,0\"];\n"
+ " \"KSTREAM-SOURCE-0000000001\" [label=\"KSTREAM\\nSOURCE\\n0000000001\"];\n"
+ " \"KTABLE-SOURCE-0000000002\" [label=\"KTABLE\\nSOURCE\\n0000000002\"];\n"
+ " \"weather-stations-STATE-STORE-0000000000\" [label=\"weather\\nstations\\nSTATE\\nSTORE\\n0000000000\" shape=cylinder];\n"
+ " \"temperature.values\" [label=\"temperature.values\" shape=invhouse margin=\"0,0\"];\n"
+ " \"KSTREAM-SOURCE-0000000003\" [label=\"KSTREAM\\nSOURCE\\n0000000003\"];\n"
+ " \"KSTREAM-LEFTJOIN-0000000004\" [label=\"KSTREAM\\nLEFTJOIN\\n0000000004\"];\n"
+ " \"KSTREAM-AGGREGATE-0000000005\" [label=\"KSTREAM\\nAGGREGATE\\n0000000005\"];\n"
+ " \"weather-stations-store\" [label=\"weather\\nstations\\nstore\" shape=cylinder];\n"
+ " \"KTABLE-TOSTREAM-0000000006\" [label=\"KTABLE\\nTOSTREAM\\n0000000006\"];\n"
+ " \"KSTREAM-SINK-0000000007\" [label=\"KSTREAM\\nSINK\\n0000000007\"];\n"
+ " \"temperatures-aggregated\" [label=\"temperatures\\naggregated\" shape=house margin=\"0,0\"];\n"
+ " REGEX_12 [label=\"notification\\\\..+\" shape=invhouse style=dashed margin=\"0,0\"];\n"
+ " KSTREAM_SOURCE_0000000008 [label=\"KSTREAM\\nSOURCE\\n0000000008\"];\n"
+ " KSTREAM_FOREACH_0000000009 [label=\"KSTREAM\\nFOREACH\\n0000000009\"];\n"
+ " \"KSTREAM-SOURCE-0000000008\" [label=\"KSTREAM\\nSOURCE\\n0000000008\"];\n"
+ " \"KSTREAM-FOREACH-0000000009\" [label=\"KSTREAM\\nFOREACH\\n0000000009\"];\n"
+ " subgraph cluster0 {\n"
+ " label=\"Sub-Topology: 0\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n"
+ " KSTREAM_SOURCE_0000000001;\n"
+ " KTABLE_SOURCE_0000000002;\n"
+ " \"KSTREAM-SOURCE-0000000001\";\n"
+ " \"KTABLE-SOURCE-0000000002\";\n"
+ " }\n"
+ " subgraph cluster1 {\n"
+ " label=\"Sub-Topology: 1\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n"
+ " KSTREAM_SOURCE_0000000003;\n"
+ " KSTREAM_LEFTJOIN_0000000004;\n"
+ " KSTREAM_AGGREGATE_0000000005;\n"
+ " KTABLE_TOSTREAM_0000000006;\n"
+ " KSTREAM_SINK_0000000007;\n"
+ " \"KSTREAM-SOURCE-0000000003\";\n"
+ " \"KSTREAM-LEFTJOIN-0000000004\";\n"
+ " \"KSTREAM-AGGREGATE-0000000005\";\n"
+ " \"KTABLE-TOSTREAM-0000000006\";\n"
+ " \"KSTREAM-SINK-0000000007\";\n"
+ " }\n"
+ " subgraph cluster2 {\n"
+ " label=\"Sub-Topology: 2\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n"
+ " KSTREAM_SOURCE_0000000008;\n"
+ " KSTREAM_FOREACH_0000000009;\n"
+ " \"KSTREAM-SOURCE-0000000008\";\n"
+ " \"KSTREAM-FOREACH-0000000009\";\n"
+ " }\n"
+ " weather_stations -> KSTREAM_SOURCE_0000000001;\n"
+ " KSTREAM_SOURCE_0000000001 -> KTABLE_SOURCE_0000000002;\n"
+ " KTABLE_SOURCE_0000000002 -> weather_stations_STATE_STORE_0000000000;\n"
+ " temperature_values -> KSTREAM_SOURCE_0000000003;\n"
+ " KSTREAM_SOURCE_0000000003 -> KSTREAM_LEFTJOIN_0000000004;\n"
+ " KSTREAM_LEFTJOIN_0000000004 -> KSTREAM_AGGREGATE_0000000005;\n"
+ " KSTREAM_AGGREGATE_0000000005 -> weather_stations_store;\n"
+ " KSTREAM_AGGREGATE_0000000005 -> KTABLE_TOSTREAM_0000000006;\n"
+ " KTABLE_TOSTREAM_0000000006 -> KSTREAM_SINK_0000000007;\n"
+ " KSTREAM_SINK_0000000007 -> temperatures_aggregated;\n"
+ " REGEX_12 -> KSTREAM_SOURCE_0000000008;\n"
+ " KSTREAM_SOURCE_0000000008 -> KSTREAM_FOREACH_0000000009;\n"
+ " \"weather-stations\" -> \"KSTREAM-SOURCE-0000000001\";\n"
+ " \"KSTREAM-SOURCE-0000000001\" -> \"KTABLE-SOURCE-0000000002\";\n"
+ " \"KTABLE-SOURCE-0000000002\" -> \"weather-stations-STATE-STORE-0000000000\";\n"
+ " \"temperature.values\" -> \"KSTREAM-SOURCE-0000000003\";\n"
+ " \"KSTREAM-SOURCE-0000000003\" -> \"KSTREAM-LEFTJOIN-0000000004\";\n"
+ " \"KSTREAM-LEFTJOIN-0000000004\" -> \"KSTREAM-AGGREGATE-0000000005\";\n"
+ " \"KSTREAM-AGGREGATE-0000000005\" -> \"weather-stations-store\";\n"
+ " \"KSTREAM-AGGREGATE-0000000005\" -> \"KTABLE-TOSTREAM-0000000006\";\n"
+ " \"KTABLE-TOSTREAM-0000000006\" -> \"KSTREAM-SINK-0000000007\";\n"
+ " \"KSTREAM-SINK-0000000007\" -> \"temperatures-aggregated\";\n"
+ " REGEX_12 -> \"KSTREAM-SOURCE-0000000008\";\n"
+ " \"KSTREAM-SOURCE-0000000008\" -> \"KSTREAM-FOREACH-0000000009\";\n"
+ "}", actual.getString("graphviz"));
assertEquals("graph TD\n"
+ " weather-stations[weather-stations] --> KSTREAM-SOURCE-0000000001(KSTREAM-<br>SOURCE-<br>0000000001)\n"
+ " KTABLE-SOURCE-0000000002[KTABLE-<br>SOURCE-<br>0000000002] --> weather-stations-STATE-STORE-0000000000(weather-<br>stations-<br>STATE-<br>STORE-<br>0000000000)\n"
+ " temperature-values[temperature-values] --> KSTREAM-SOURCE-0000000003(KSTREAM-<br>SOURCE-<br>0000000003)\n"
+ " temperature.values[temperature.values] --> KSTREAM-SOURCE-0000000003(KSTREAM-<br>SOURCE-<br>0000000003)\n"
+ " KSTREAM-AGGREGATE-0000000005[KSTREAM-<br>AGGREGATE-<br>0000000005] --> weather-stations-store(weather-<br>stations-<br>store)\n"
+ " KSTREAM-SINK-0000000007[KSTREAM-<br>SINK-<br>0000000007] --> temperatures-aggregated(temperatures-aggregated)\n"
+ " REGEX_5[notification\\..+] --> KSTREAM-SOURCE-0000000008(KSTREAM-<br>SOURCE-<br>0000000008)\n"
Expand Down

0 comments on commit 678508b

Please sign in to comment.