Topology
is a directed acyclic graph of stream processing nodes that represents the stream processing logic of a Kafka Streams application.
Topology
can be created directly (as part of Low-Level Processor API) or indirectly using Streams DSL — High-Level Stream Processing DSL.
Topology
provides the fluent API to add local and global state stores, sources, processors and sinks to build advanced stream processing graphs.
Topology
takes no arguments when created.
// Created directly
import org.apache.kafka.streams.Topology
val topology = new Topology
// Created using Streams DSL (StreamsBuilder API)
// Scala API for Kafka Streams
import org.apache.kafka.streams.scala._
import ImplicitConversions._
import Serdes._
val builder = new StreamsBuilder
val topology = builder.build
scala> :type topology
org.apache.kafka.streams.Topology
Once created, Topology
can be extended with sources, processors (optionally connected to one or more state stores), sinks, with local and global state stores.
scala> :type topology
org.apache.kafka.streams.Topology
import org.apache.kafka.streams.state.Stores
val storeBuilder = Stores
.keyValueStoreBuilder[String, String](
Stores.inMemoryKeyValueStore("in-memory-key-value-store"),
Serdes.String,
Serdes.String)
.withLoggingDisabled // this is for a global table
val sourceName = "demo-source-processor"
val timestampExtractor = null
val keyDeserializer = Serdes.String.deserializer
val valueDeserializer = Serdes.String.deserializer
val topic = "demo-topic"
val processorName = "demo-processor-supplier"
import org.apache.kafka.streams.kstream.internals.KTableSource
val stateUpdateSupplier = new KTableSource[String, String]("store-name")
topology.addGlobalStore(
storeBuilder,
sourceName,
timestampExtractor,
keyDeserializer,
valueDeserializer,
topic,
processorName,
stateUpdateSupplier)
Topology
can be described.
scala> :type topology
org.apache.kafka.streams.Topology
scala> println(topology.describe)
Topologies:
Sub-topology: 0 for global store (will not generate tasks)
Source: demo-source-processor (topics: [demo-topic])
--> demo-processor-supplier
Processor: demo-processor-supplier (stores: [in-memory-key-value-store])
--> none
<-- demo-source-processor
Topology
is a logical representation of a ProcessorTopology.
Method | Description |
---|---|
|
Topology addGlobalStore(
StoreBuilder storeBuilder,
String sourceName,
Deserializer keyDeserializer,
Deserializer valueDeserializer,
String topic,
String processorName,
ProcessorSupplier stateUpdateSupplier)
Topology addGlobalStore(
StoreBuilder storeBuilder,
String sourceName,
TimestampExtractor timestampExtractor,
Deserializer keyDeserializer,
Deserializer valueDeserializer,
String topic,
String processorName,
ProcessorSupplier stateUpdateSupplier) Adds a global StateStore (with the StoreBuilder, ProcessorSupplier and optional TimestampExtractor) to the topology. Internally, |
|
Topology addProcessor(
String name,
ProcessorSupplier supplier,
String... parentNames) Adds a new processor node (with the ProcessorSupplier) to the topology Internally, |
|
Topology addSink(
String name,
String topic,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
StreamPartitioner<? super K, ? super V> partitioner,
String... parentNames)
Topology addSink(
String name,
String topic,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
String... parentNames)
Topology addSink(
String name,
String topic,
StreamPartitioner<? super K, ? super V> partitioner,
String... parentNames)
Topology addSink(
String name,
String topic,
String... parentNames)
Topology addSink(
String name,
TopicNameExtractor<K, V> topicExtractor,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
StreamPartitioner<? super K, ? super V> partitioner,
String... parentNames)
Topology addSink(
String name,
TopicNameExtractor<K, V> topicExtractor,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
String... parentNames)
Topology addSink(
String name,
TopicNameExtractor<K, V> topicExtractor,
StreamPartitioner<? super K, ? super V> partitioner,
String... parentNames)
Topology addSink(
String name,
TopicNameExtractor<K, V> topicExtractor,
String... parentNames) Adds a new sink node (with the optional TopicNameExtractor and StreamPartitioner) to the topology. Internally, |
|
Topology addSource(
AutoOffsetReset offsetReset,
String name,
Deserializer keyDeserializer,
Deserializer valueDeserializer,
Pattern topicPattern)
Topology addSource(
AutoOffsetReset offsetReset,
String name,
Deserializer keyDeserializer,
Deserializer valueDeserializer,
String... topics)
Topology addSource(
AutoOffsetReset offsetReset,
String name,
Pattern topicPattern)
Topology addSource(
AutoOffsetReset offsetReset,
String name,
String... topics)
Topology addSource(
AutoOffsetReset offsetReset,
String name,
TimestampExtractor timestampExtractor,
Deserializer keyDeserializer,
Deserializer valueDeserializer,
Pattern topicPattern)
Topology addSource(
AutoOffsetReset offsetReset,
String name,
TimestampExtractor timestampExtractor,
Deserializer keyDeserializer,
Deserializer valueDeserializer,
String... topics)
Topology addSource(
AutoOffsetReset offsetReset,
TimestampExtractor timestampExtractor,
String name,
Pattern topicPattern)
Topology addSource(
AutoOffsetReset offsetReset,
TimestampExtractor timestampExtractor,
String name,
String... topics)
Topology addSource(
String name,
Deserializer keyDeserializer,
Deserializer valueDeserializer,
Pattern topicPattern)
Topology addSource(
String name,
Deserializer keyDeserializer,
Deserializer valueDeserializer,
String... topics)
Topology addSource(
String name,
Pattern topicPattern)
Topology addSource(
String name,
String... topics)
Topology addSource(
TimestampExtractor timestampExtractor,
String name,
Pattern topicPattern)
Topology addSource(
TimestampExtractor timestampExtractor,
String name,
String... topics) Adds a new source node (with the optional AutoOffsetReset and TimestampExtractor) to the topology. Internally, |
|
Topology addStateStore(
StoreBuilder storeBuilder,
String... processorNames) Adds a new state store (as a StoreBuilder) to the topology and associates it with processors Internally, |
|
Topology connectProcessorAndStateStores(
String processorName,
String... stateStoreNames) Connects the processor node with state stores (by name). Internally, |
|
TopologyDescription describe() Describes the topology via TopologyDescription (meta representation) Internally, |
Internally, Topology
uses an InternalTopologyBuilder for all the methods and is simply a thin layer atop (that aims at making Kafka Streams developers' life simpler).
Topology
defines offset reset policy (AutoOffsetReset
) that can be one of the following values: