From f8e1074b53075e08202e29ae0efc2f3b9998e283 Mon Sep 17 00:00:00 2001 From: David Cotton Date: Thu, 26 Oct 2023 02:06:01 +0200 Subject: [PATCH] Kafka Streams Dev UI migration to v2 --- build-parent/pom.xml | 104 ++++++++++ extensions/kafka-streams/deployment/pom.xml | 4 + .../devui/KafkaStreamsDevUIProcessor.java | 30 +++ .../dev-ui/qwc-kafka-streams-topology.js | 123 ++++++++++++ .../devui/KafkaStreamsJsonRPCService.java | 134 +++++++++++++ .../runtime/devui/TopologyParserContext.java | 183 ++++++++++++++++++ .../devui/KafkaStreamsJsonRPCServiceTest.java | 104 ++++++++++ 7 files changed, 682 insertions(+) create mode 100644 extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/devui/KafkaStreamsDevUIProcessor.java create mode 100644 extensions/kafka-streams/deployment/src/main/resources/dev-ui/qwc-kafka-streams-topology.js create mode 100644 extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devui/KafkaStreamsJsonRPCService.java create mode 100644 extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devui/TopologyParserContext.java create mode 100644 extensions/kafka-streams/runtime/src/test/java/io/quarkus/kafka/streams/runtime/devui/KafkaStreamsJsonRPCServiceTest.java diff --git a/build-parent/pom.xml b/build-parent/pom.xml index 36206399fc121e..bd06d38ef306f4 100644 --- a/build-parent/pom.xml +++ b/build-parent/pom.xml @@ -487,6 +487,110 @@ runtime + + + org.mvnpm.at.hpcc-js + wasm + 2.14.1 + runtime + + + org.mvnpm + yargs + 17.7.2 + runtime + + + org.mvnpm + cliui + 8.0.1 + runtime + + + org.mvnpm + escalade + 3.1.1 + runtime + + + org.mvnpm + get-caller-file + 2.0.5 + runtime + + + org.mvnpm + require-directory + 2.1.1 + runtime + + + org.mvnpm + string-width + 4.2.3 + runtime + + + org.mvnpm + y18n + 5.0.8 + runtime + + + org.mvnpm + yargs-parser + 21.1.1 + runtime + + + org.mvnpm + strip-ansi + 6.0.1 + runtime + + + org.mvnpm + wrap-ansi + 7.0.0 + runtime + + + org.mvnpm + emoji-regex + 8.0.0 + runtime + + + org.mvnpm + is-fullwidth-code-point + 3.0.0 + runtime + + + org.mvnpm + ansi-regex + 5.0.1 + runtime + + + org.mvnpm + ansi-styles + 4.3.0 + runtime + + + org.mvnpm + color-convert + 2.0.1 + runtime + + + org.mvnpm + color-name + 1.1.4 + runtime + + org.mvnpm.at.vanillawc diff --git a/extensions/kafka-streams/deployment/pom.xml b/extensions/kafka-streams/deployment/pom.xml index c2d06a888827af..c8ed75033ec8e0 100644 --- a/extensions/kafka-streams/deployment/pom.xml +++ b/extensions/kafka-streams/deployment/pom.xml @@ -33,6 +33,10 @@ io.quarkus quarkus-smallrye-health-spi + + org.mvnpm.at.hpcc-js + wasm + diff --git a/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/devui/KafkaStreamsDevUIProcessor.java b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/devui/KafkaStreamsDevUIProcessor.java new file mode 100644 index 00000000000000..ddc2f2e7de7af1 --- /dev/null +++ b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/devui/KafkaStreamsDevUIProcessor.java @@ -0,0 +1,30 @@ +package io.quarkus.kafka.streams.deployment.devui; + +import io.quarkus.deployment.IsDevelopment; +import io.quarkus.deployment.annotations.BuildProducer; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.devui.spi.JsonRPCProvidersBuildItem; +import io.quarkus.devui.spi.page.CardPageBuildItem; +import io.quarkus.devui.spi.page.Page; +import io.quarkus.kafka.streams.runtime.devui.KafkaStreamsJsonRPCService; + +public class KafkaStreamsDevUIProcessor { + + @BuildStep(onlyIf = IsDevelopment.class) + public void createPages(BuildProducer cardPageProducer) { + + CardPageBuildItem cardPageBuildItem = new CardPageBuildItem(); + + cardPageBuildItem.addPage(Page.webComponentPageBuilder() + .componentLink("qwc-kafka-streams-topology.js") + .title("Topology") + .icon("font-awesome-solid:diagram-project")); + + cardPageProducer.produce(cardPageBuildItem); + } + + @BuildStep(onlyIf = IsDevelopment.class) + public void createJsonRPCService(BuildProducer jsonRPCServiceProducer) { + jsonRPCServiceProducer.produce(new JsonRPCProvidersBuildItem(KafkaStreamsJsonRPCService.class)); + } +} \ No newline at end of file diff --git a/extensions/kafka-streams/deployment/src/main/resources/dev-ui/qwc-kafka-streams-topology.js b/extensions/kafka-streams/deployment/src/main/resources/dev-ui/qwc-kafka-streams-topology.js new file mode 100644 index 00000000000000..7e69d75f9976e2 --- /dev/null +++ b/extensions/kafka-streams/deployment/src/main/resources/dev-ui/qwc-kafka-streams-topology.js @@ -0,0 +1,123 @@ +import { QwcHotReloadElement, html, css } from 'qwc-hot-reload-element'; +import { unsafeHTML } from 'lit/directives/unsafe-html.js'; +import { JsonRpc } from 'jsonrpc'; + +import { Graphviz } from "@hpcc-js/wasm/graphviz.js"; + +import '@vaadin/details'; +import '@vaadin/tabs'; +import '@vaadin/vertical-layout'; +import 'qui-badge'; +import 'qui-code-block'; + +/** + * This component shows the Kafka Streams Topology + */ +export class QwcKafkaStreamsTopology extends QwcHotReloadElement { + + jsonRpc = new JsonRpc(this); + + static styles = css` + .codeBlock { + width: 100%; + height: auto; + } + `; + + static properties = { + _topology: {state: true}, + _graphviz: {state: true}, + _tabContent: {state: true} + }; + + constructor() { + super(); + this._topology = null; + this._graphviz = null; + this._tabContent = ''; + } + + connectedCallback() { + super.connectedCallback(); + Graphviz.load().then(r => this._graphviz = r); + this.hotReload() + } + + render() { + if (this._topology) { + return html` + Graph + Details + Describe + Graphviz + Mermaid + +

${this._tabContent}

`; + } + + return html` + `; + } + + hotReload() { + this._topology = null; + this.jsonRpc.getTopology().then(jsonRpcResponse => { + this._topology = jsonRpcResponse.result; + }); + } + + _tabSelectedChanged(n) { + switch(n) { + case 1 : this._selectDetailsTab(); break; + case 2 : this._selectDescribeTab(); break; + case 3 : this._selectGraphvizTab(); break; + case 4 : this._selectMermaidTab(); break; + default : this._selectGraphTab(); + } + } + + _selectGraphTab() { + if (this._graphviz) { + let g = this._graphviz.dot(this._topology.graphviz); + this._tabContent = html`${unsafeHTML(g)}`; + } else { + this._tabContent = html`Graph engine not started.`; + } + } + + _selectDetailsTab() { + this._tabContent = html` + + + + + + + + + + + + + + + + +
Sub-topologies${this._topology.subTopologies.length}${this._topology.subTopologies.map((subTopology) => html`${subTopology}`)}
Sources${this._topology.sources.length}${this._topology.sources.map((source) => html`${source}`)}
Sinks${this._topology.sinks.length}${this._topology.sinks.map((sink) => html`${sink}`)}
Stores${this._topology.stores.length}${this._topology.stores.map((store) => html`${store}`)}
`; + } + + _selectDescribeTab() { + this._tabContent = html``; + } + + _selectGraphvizTab() { + this._tabContent = html``; + } + + _selectMermaidTab() { + this._tabContent = html``; + } +} +customElements.define('qwc-kafka-streams-topology', QwcKafkaStreamsTopology); \ No newline at end of file diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devui/KafkaStreamsJsonRPCService.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devui/KafkaStreamsJsonRPCService.java new file mode 100644 index 00000000000000..fc82604f416587 --- /dev/null +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devui/KafkaStreamsJsonRPCService.java @@ -0,0 +1,134 @@ +package io.quarkus.kafka.streams.runtime.devui; + +import java.util.Arrays; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Stream; + +import jakarta.inject.Inject; + +import org.apache.kafka.streams.Topology; + +import io.smallrye.common.annotation.NonBlocking; +import io.vertx.core.json.JsonObject; + +public class KafkaStreamsJsonRPCService { + @Inject + Topology topology; + + @NonBlocking + public JsonObject getTopology() { + return parseTopologyDescription(topology.describe() != null ? topology.describe().toString() : ""); + } + + JsonObject parseTopologyDescription(String topologyDescription) { + final var res = new JsonObject(); + + final var context = new TopologyParserContext(); + Arrays.stream(topologyDescription.split("\n")) + .map(String::trim) + .forEachOrdered(line -> Stream.of(SUB_TOPOLOGY, SOURCE, PROCESSOR, SINK, RIGHT_ARROW) + .filter(itemParser -> itemParser.test(line)) + .forEachOrdered(itemParser -> itemParser.accept(context))); + + res + .put("describe", topologyDescription) + .put("subTopologies", context.subTopologies) + .put("sources", context.sources) + .put("sinks", context.sinks) + .put("stores", context.stores) + .put("graphviz", context.graphviz.toGraph()) + .put("mermaid", context.mermaid.toGraph()); + + return res; + } + + private interface RawTopologyItemParser extends Predicate, Consumer { + } + + private static final RawTopologyItemParser SUB_TOPOLOGY = new RawTopologyItemParser() { + private final Pattern subTopologyPattern = Pattern.compile("Sub-topology: (?[0-9]*).*"); + private Matcher matcher; + + @Override + public boolean test(String line) { + matcher = subTopologyPattern.matcher(line); + return matcher.matches(); + } + + @Override + public void accept(TopologyParserContext context) { + context.addSubTopology(matcher.group("subTopology")); + } + }; + + private static final RawTopologyItemParser SOURCE = new RawTopologyItemParser() { + private final Pattern sourcePattern = Pattern + .compile("Source:\\s+(?\\S+)\\s+\\(topics:\\s+\\[(?.*)\\]\\).*"); + private Matcher matcher; + + @Override + public boolean test(String line) { + matcher = sourcePattern.matcher(line); + return matcher.matches(); + } + + @Override + public void accept(TopologyParserContext context) { + context.addSources(matcher.group("source"), matcher.group("topics").split(",")); + } + }; + + private static final RawTopologyItemParser PROCESSOR = new RawTopologyItemParser() { + private final Pattern processorPattern = Pattern + .compile("Processor:\\s+(?\\S+)\\s+\\(stores:\\s+\\[(?.*)\\]\\).*"); + private Matcher matcher; + private String line; + + @Override + public boolean test(String line) { + this.line = line; + matcher = processorPattern.matcher(line); + return matcher.matches(); + } + + @Override + public void accept(TopologyParserContext context) { + context.addStores(matcher.group("stores").split(","), matcher.group("processor"), line.contains("JOIN")); + } + }; + + private static final RawTopologyItemParser SINK = new RawTopologyItemParser() { + private final Pattern sinkPattern = Pattern.compile("Sink:\\s+(?\\S+)\\s+\\(topic:\\s+(?.*)\\).*"); + private Matcher matcher; + + @Override + public boolean test(String line) { + matcher = sinkPattern.matcher(line); + return matcher.matches(); + } + + @Override + public void accept(TopologyParserContext context) { + context.addSink(matcher.group("sink"), matcher.group("topic")); + } + }; + + private static final RawTopologyItemParser RIGHT_ARROW = new RawTopologyItemParser() { + private final Pattern rightArrowPattern = Pattern.compile("\\s*-->\\s+(?.*)"); + private Matcher matcher; + + @Override + public boolean test(String line) { + matcher = rightArrowPattern.matcher(line); + return matcher.matches(); + } + + @Override + public void accept(TopologyParserContext context) { + context.addTargets(matcher.group("targets").split(",")); + } + }; +} diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devui/TopologyParserContext.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devui/TopologyParserContext.java new file mode 100644 index 00000000000000..cdbf1ec7c1a51d --- /dev/null +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devui/TopologyParserContext.java @@ -0,0 +1,183 @@ +package io.quarkus.kafka.streams.runtime.devui; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; + +final class TopologyParserContext { + String currentNode = ""; + final Set subTopologies = new TreeSet<>(); + final Set sources = new TreeSet<>(); + final Set sinks = new TreeSet<>(); + final Set stores = new TreeSet<>(); + final Graphviz graphviz = new Graphviz(); + final Mermaid mermaid = new Mermaid(); + + void addSubTopology(String subTopology) { + subTopologies.add(subTopology); + graphviz.addSubTopology(subTopology); + mermaid.addSubTopology(subTopology); + } + + void addSink(String sink, String topic) { + sinks.add(topic); + currentNode = sink; + graphviz.addSink(sink, topic); + mermaid.addSink(sink, topic); + } + + void addSources(String source, String[] topics) { + currentNode = source; + Arrays.stream(topics) + .map(String::trim).filter(topic -> !topic.isEmpty()) + .forEachOrdered(topic -> { + sources.add(topic); + graphviz.addSource(source, topic); + mermaid.addSource(source, topic); + }); + } + + void addStores(String[] stores, String processor, boolean join) { + currentNode = processor; + Arrays.stream(stores) + .map(String::trim).filter(store -> !store.isEmpty()) + .forEachOrdered(store -> { + this.stores.add(store); + graphviz.addStore(store, currentNode, join); + mermaid.addStore(store, currentNode, join); + }); + } + + void addTargets(String[] targets) { + Arrays.stream(targets) + .map(String::trim).filter(target -> !("none".equals(target) || target.isEmpty())) + .forEachOrdered(target -> { + graphviz.addTarget(target, currentNode); + mermaid.addTarget(target, currentNode); + }); + } + + static final class Graphviz { + String currentGraph = ""; + final List nodes = new ArrayList<>(); + final List edges = new ArrayList<>(); + final Map> subGraphs = new TreeMap<>(); + + String toGraph() { + final var res = new ArrayList(); + + res.add("digraph {"); + res.add(" fontname=\"Helvetica\"; fontsize=\"10\";"); + res.add(" node [style=filled fillcolor=white color=\"#C9B7DD\" shape=box fontname=\"Helvetica\" fontsize=\"10\"];"); + nodes.forEach(n -> res.add(' ' + n + ';')); + subGraphs.entrySet().forEach(e -> { + res.add(" subgraph cluster" + e.getKey() + " {"); + res.add(" label=\"Sub-Topology: " + e.getKey() + "\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";"); + e.getValue().forEach(v -> res.add(" " + v + ';')); + res.add(" }"); + }); + for (int i = 0; i < subGraphs.size(); i++) { + res.add(" subgraph cluster" + i + " {"); + res.add(" label=\"Sub-Topology: " + i + "\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";"); + res.add(" }"); + } + edges.forEach(e -> res.add(' ' + e + ';')); + res.add("}"); + + return String.join("\n", res); + } + + private void addSubTopology(String subTopology) { + currentGraph = subTopology; + subGraphs.put(subTopology, new ArrayList<>()); + } + + private void addSink(String sink, String topic) { + nodes.add(toId(topic) + " [label=\"" + toLabel(topic) + "\" shape=house margin=\"0,0\"]"); + edges.add(toId(sink) + " -> " + toId(topic)); + } + + private void addSource(String source, String topic) { + nodes.add(toId(topic) + " [label=\"" + toLabel(topic) + "\" shape=invhouse margin=\"0,0\"]"); + nodes.add(toId(source) + " [label=\"" + toLabel(source) + "\"]"); + edges.add(toId(topic) + " -> " + toId(source)); + subGraphs.get(currentGraph).add(toId(source)); + } + + private void addTarget(String target, String node) { + nodes.add(toId(target) + " [label=\"" + toLabel(target) + "\"]"); + edges.add(toId(node) + " -> " + toId(target)); + subGraphs.get(currentGraph).add(toId(target)); + } + + private void addStore(String store, String node, boolean join) { + nodes.add(toId(store) + " [label=\"" + toLabel(store) + "\" shape=cylinder]"); + if (join) { + edges.add(toId(store) + " -> " + toId(node)); + } else { + edges.add(toId(node) + " -> " + toId(store)); + } + } + + private static String toId(String name) { + return name.replaceAll("-", "_"); + } + + private static String toLabel(String name) { + return name.replaceAll("-", "\\\\n"); + } + } + + static final class Mermaid { + final List endpoints = new ArrayList<>(); + final List subTopologies = new ArrayList<>(); + + String toGraph() { + final var res = new ArrayList(); + + res.add("graph TD"); + endpoints.forEach(e -> res.add(' ' + e)); + subTopologies.forEach(s -> res.add(' ' + s)); + if (!subTopologies.isEmpty()) { + res.add(" end"); + } + + return String.join("\n", res); + } + + private void addSubTopology(String subTopology) { + if (!subTopologies.isEmpty()) { + subTopologies.add("end"); + } + subTopologies.add("subgraph Sub-Topology: " + subTopology); + } + + private void addSink(String sink, String topic) { + endpoints.add(sink + '[' + toName(sink) + "] --> " + topic + '(' + topic + ')'); + } + + private void addSource(String source, String topic) { + endpoints.add(topic + '[' + topic + "] --> " + source + '(' + toName(source) + ')'); + } + + private void addTarget(String target, String node) { + subTopologies.add(' ' + node + '[' + toName(node) + "] --> " + target + '(' + toName(target) + ')'); + } + + private void addStore(String store, String node, boolean join) { + if (join) { + endpoints.add(store + '[' + toName(store) + "] --> " + node + '(' + toName(node) + ')'); + } else { + endpoints.add(node + '[' + toName(node) + "] --> " + store + '(' + toName(store) + ')'); + } + } + + private static String toName(String name) { + return name.replaceAll("-", "-
"); + } + } +} diff --git a/extensions/kafka-streams/runtime/src/test/java/io/quarkus/kafka/streams/runtime/devui/KafkaStreamsJsonRPCServiceTest.java b/extensions/kafka-streams/runtime/src/test/java/io/quarkus/kafka/streams/runtime/devui/KafkaStreamsJsonRPCServiceTest.java new file mode 100644 index 00000000000000..8c40491616d87e --- /dev/null +++ b/extensions/kafka-streams/runtime/src/test/java/io/quarkus/kafka/streams/runtime/devui/KafkaStreamsJsonRPCServiceTest.java @@ -0,0 +1,104 @@ +package io.quarkus.kafka.streams.runtime.devui; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; +import org.mockito.InjectMocks; + +public class KafkaStreamsJsonRPCServiceTest { + @InjectMocks + KafkaStreamsJsonRPCService rpcService = new KafkaStreamsJsonRPCService(); + + @Test + public void shouldParsingStayConstant() { + final var expectedDescribe = "Topologies:\n" + + "Sub-topology: 0 for global store (will not generate tasks)\n" + + " Source: KSTREAM-SOURCE-0000000001 (topics: [weather-stations])\n" + + " --> KTABLE-SOURCE-0000000002\n" + + " Processor: KTABLE-SOURCE-0000000002 (stores: [weather-stations-STATE-STORE-0000000000])\n" + + " --> none\n" + + " <-- KSTREAM-SOURCE-0000000001\n" + + "Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000003 (topics: [temperature-values])\n" + + " --> KSTREAM-LEFTJOIN-0000000004\n" + + " Processor: KSTREAM-LEFTJOIN-0000000004 (stores: [])\n" + + " --> KSTREAM-AGGREGATE-0000000005\n" + + " <-- KSTREAM-SOURCE-0000000003\n" + + " Processor: KSTREAM-AGGREGATE-0000000005 (stores: [weather-stations-store])\n" + + " --> KTABLE-TOSTREAM-0000000006\n" + + " <-- KSTREAM-LEFTJOIN-0000000004\n" + + " Processor: KTABLE-TOSTREAM-0000000006 (stores: [])\n" + + " --> KSTREAM-SINK-0000000007\n" + + " <-- KSTREAM-AGGREGATE-0000000005\n" + + " Sink: KSTREAM-SINK-0000000007 (topic: temperatures-aggregated)\n" + + " <-- KTABLE-TOSTREAM-0000000006"; + final var actual = rpcService.parseTopologyDescription(expectedDescribe); + + assertEquals(expectedDescribe, actual.getString("describe")); + assertEquals("[0, 1]", actual.getString("subTopologies")); + assertEquals("[temperature-values, weather-stations]", actual.getString("sources")); + assertEquals("[temperatures-aggregated]", actual.getString("sinks")); + assertEquals("[weather-stations-STATE-STORE-0000000000, weather-stations-store]", actual.getString("stores")); + assertEquals("digraph {\n" + + " fontname=\"Helvetica\"; fontsize=\"10\";\n" + + " node [style=filled fillcolor=white color=\"#C9B7DD\" shape=box fontname=\"Helvetica\" fontsize=\"10\"];\n" + + " weather_stations [label=\"weather\\nstations\" shape=invhouse margin=\"0,0\"];\n" + + " KSTREAM_SOURCE_0000000001 [label=\"KSTREAM\\nSOURCE\\n0000000001\"];\n" + + " KTABLE_SOURCE_0000000002 [label=\"KTABLE\\nSOURCE\\n0000000002\"];\n" + + " weather_stations_STATE_STORE_0000000000 [label=\"weather\\nstations\\nSTATE\\nSTORE\\n0000000000\" shape=cylinder];\n" + + + " temperature_values [label=\"temperature\\nvalues\" shape=invhouse margin=\"0,0\"];\n" + + " KSTREAM_SOURCE_0000000003 [label=\"KSTREAM\\nSOURCE\\n0000000003\"];\n" + + " KSTREAM_LEFTJOIN_0000000004 [label=\"KSTREAM\\nLEFTJOIN\\n0000000004\"];\n" + + " KSTREAM_AGGREGATE_0000000005 [label=\"KSTREAM\\nAGGREGATE\\n0000000005\"];\n" + + " weather_stations_store [label=\"weather\\nstations\\nstore\" shape=cylinder];\n" + + " KTABLE_TOSTREAM_0000000006 [label=\"KTABLE\\nTOSTREAM\\n0000000006\"];\n" + + " KSTREAM_SINK_0000000007 [label=\"KSTREAM\\nSINK\\n0000000007\"];\n" + + " temperatures_aggregated [label=\"temperatures\\naggregated\" shape=house margin=\"0,0\"];\n" + + " subgraph cluster0 {\n" + + " label=\"Sub-Topology: 0\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n" + + " KSTREAM_SOURCE_0000000001;\n" + + " KTABLE_SOURCE_0000000002;\n" + + " }\n" + + " subgraph cluster1 {\n" + + " label=\"Sub-Topology: 1\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n" + + " KSTREAM_SOURCE_0000000003;\n" + + " KSTREAM_LEFTJOIN_0000000004;\n" + + " KSTREAM_AGGREGATE_0000000005;\n" + + " KTABLE_TOSTREAM_0000000006;\n" + + " KSTREAM_SINK_0000000007;\n" + + " }\n" + + " subgraph cluster0 {\n" + + " label=\"Sub-Topology: 0\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n" + + " }\n" + + " subgraph cluster1 {\n" + + " label=\"Sub-Topology: 1\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n" + + " }\n" + + " weather_stations -> KSTREAM_SOURCE_0000000001;\n" + + " KSTREAM_SOURCE_0000000001 -> KTABLE_SOURCE_0000000002;\n" + + " KTABLE_SOURCE_0000000002 -> weather_stations_STATE_STORE_0000000000;\n" + + " temperature_values -> KSTREAM_SOURCE_0000000003;\n" + + " KSTREAM_SOURCE_0000000003 -> KSTREAM_LEFTJOIN_0000000004;\n" + + " KSTREAM_LEFTJOIN_0000000004 -> KSTREAM_AGGREGATE_0000000005;\n" + + " KSTREAM_AGGREGATE_0000000005 -> weather_stations_store;\n" + + " KSTREAM_AGGREGATE_0000000005 -> KTABLE_TOSTREAM_0000000006;\n" + + " KTABLE_TOSTREAM_0000000006 -> KSTREAM_SINK_0000000007;\n" + + " KSTREAM_SINK_0000000007 -> temperatures_aggregated;\n" + + "}", actual.getString("graphviz")); + assertEquals("graph TD\n" + + " weather-stations[weather-stations] --> KSTREAM-SOURCE-0000000001(KSTREAM-
SOURCE-
0000000001)\n" + + " KTABLE-SOURCE-0000000002[KTABLE-
SOURCE-
0000000002] --> weather-stations-STATE-STORE-0000000000(weather-
stations-
STATE-
STORE-
0000000000)\n" + + " temperature-values[temperature-values] --> KSTREAM-SOURCE-0000000003(KSTREAM-
SOURCE-
0000000003)\n" + + " KSTREAM-AGGREGATE-0000000005[KSTREAM-
AGGREGATE-
0000000005] --> weather-stations-store(weather-
stations-
store)\n" + + " KSTREAM-SINK-0000000007[KSTREAM-
SINK-
0000000007] --> temperatures-aggregated(temperatures-aggregated)\n" + + " subgraph Sub-Topology: 0\n" + + " KSTREAM-SOURCE-0000000001[KSTREAM-
SOURCE-
0000000001] --> KTABLE-SOURCE-0000000002(KTABLE-
SOURCE-
0000000002)\n" + + " end\n" + + " subgraph Sub-Topology: 1\n" + + " KSTREAM-SOURCE-0000000003[KSTREAM-
SOURCE-
0000000003] --> KSTREAM-LEFTJOIN-0000000004(KSTREAM-
LEFTJOIN-
0000000004)\n" + + " KSTREAM-LEFTJOIN-0000000004[KSTREAM-
LEFTJOIN-
0000000004] --> KSTREAM-AGGREGATE-0000000005(KSTREAM-
AGGREGATE-
0000000005)\n" + + " KSTREAM-AGGREGATE-0000000005[KSTREAM-
AGGREGATE-
0000000005] --> KTABLE-TOSTREAM-0000000006(KTABLE-
TOSTREAM-
0000000006)\n" + + " KTABLE-TOSTREAM-0000000006[KTABLE-
TOSTREAM-
0000000006] --> KSTREAM-SINK-0000000007(KSTREAM-
SINK-
0000000007)\n" + + " end", actual.getString("mermaid")); + } +}