KStream
is the abstraction of a record stream (of key-value pairs).
KStream
can be created directly from one or many Kafka topics (using StreamsBuilder.stream operator) or as a result of transformations on an existing KStream
.
Tip
|
Use Scala API for Kafka Streams to make your Kafka Streams development more pleasant if Scala is your programming language. |
// Scala API for Kafka Streams
import org.apache.kafka.streams.scala._
import ImplicitConversions._
import Serdes._
val builder = new StreamsBuilder
// Use type annotation to describe the stream, i.e. stream[String, String]
// Else...Scala type inferencer gives us a stream of "nothing", i.e. KStream[Nothing, Nothing]
val input = builder.stream[String, String]("input")
scala> :type input
org.apache.kafka.streams.scala.kstream.KStream[String,String]
KStream
comes with a rich set of operators (aka KStream API) that allow for building topologies to consume, process and produce key-value records.
Operator | Description | ||
---|---|---|---|
|
KStream<K, V>[] branch(
Predicate<? super K, ? super V>... predicates) |
||
|
KStream<K, V> filter(
Predicate<? super K, ? super V> predicate) |
||
|
KStream<K, V> filterNot(
Predicate<? super K, ? super V> predicate) |
||
|
KStream<KR, VR> flatMap(
KeyValueMapper<
? super K,
? super V,
? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper) |
||
|
KStream<K, VR> flatMapValues(
ValueMapper<
? super V,
? extends Iterable<? extends VR>> mapper)
KStream<K, VR> flatMapValues(
ValueMapperWithKey<
? super K,
? super V,
? extends Iterable<? extends VR>> mapper) |
||
|
KStream<K1, V1> flatTransform(
TransformerSupplier<
? super K,
? super V,
Iterable<KeyValue<K1, V1>>> transformerSupplier,
String... stateStoreNames) |
||
|
KStream<K, VR> flatTransformValues(
ValueTransformerSupplier<
? super V,
Iterable<VR>> valueTransformerSupplier,
String... stateStoreNames)
KStream<K, VR> flatTransformValues(
ValueTransformerWithKeySupplier<
? super K,
? super V,
Iterable<VR>> valueTransformerSupplier,
String... stateStoreNames) |
||
|
void foreach(
ForeachAction<? super K, ? super V> action) |
||
|
KGroupedStream<KR, V> groupBy(
KeyValueMapper<? super K, ? super V, KR> selector)
KGroupedStream<KR, V> groupBy(
KeyValueMapper<? super K, ? super V, KR> selector,
Grouped<KR, V> grouped) Creates a KGroupedStream with a given KeyValueMapper |
||
|
KGroupedStream<K, V> groupByKey()
KGroupedStream<K, V> groupByKey(
Grouped<K, V> grouped) Creates a KGroupedStream |
||
|
KStream<K, RV> join(
GlobalKTable<GK, GV> globalKTable,
KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
ValueJoiner<? super V, ? super GV, ? extends RV> joiner)
KStream<K, VR> join(
KStream<K, VO> otherStream,
ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
JoinWindows windows)
KStream<K, VR> join(
KStream<K, VO> otherStream,
ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
JoinWindows windows,
Joined<K, V, VO> joined)
KStream<K, VR> join(
KTable<K, VT> table,
ValueJoiner<? super V, ? super VT, ? extends VR> joiner)
KStream<K, VR> join(
KTable<K, VT> table,
ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
Joined<K, V, VT> joined) |
||
|
KStream<K, RV> leftJoin(
GlobalKTable<GK, GV> globalKTable,
KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
ValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner)
KStream<K, VR> leftJoin(
KStream<K, VO> otherStream,
ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
JoinWindows windows)
KStream<K, VR> leftJoin(
KStream<K, VO> otherStream,
ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
JoinWindows windows,
Joined<K, V, VO> joined)
KStream<K, VR> leftJoin(
KTable<K, VT> table,
ValueJoiner<? super V, ? super VT, ? extends VR> joiner)
KStream<K, VR> leftJoin(
KTable<K, VT> table,
ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
Joined<K, V, VT> joined) |
||
|
KStream<KR, VR> map(
KeyValueMapper<
? super K,
? super V,
? extends KeyValue<? extends KR, ? extends VR>> mapper) |
||
|
KStream<K, VR> mapValues(
ValueMapper<? super V, ? extends VR> mapper)
KStream<K, VR> mapValues(
ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper) |
||
|
KStream<K, V> merge(
KStream<K, V> stream) |
||
|
KStream<K, VR> outerJoin(
KStream<K, VO> otherStream,
ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
JoinWindows windows)
KStream<K, VR> outerJoin(
KStream<K, VO> otherStream,
ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
JoinWindows windows,
Joined<K, V, VO> joined) |
||
|
KStream<K, V> peek(
ForeachAction<? super K, ? super V> action) |
||
|
void print(
Printed<K, V> printed) |
||
|
void process(
ProcessorSupplier<? super K, ? super V> processorSupplier,
String... stateStoreNames) |
||
|
KStream<KR, V> selectKey(
KeyValueMapper<
? super K,
? super V,
? extends KR> mapper) |
||
|
KStream<K, V> through(
String topic)
KStream<K, V> through(
String topic,
Produced<K, V> produced) Materializes the stream to a given topic (passes it through) and creates a new |
||
|
void to(
String topic)
void to(
String topic,
Produced<K, V> produced)
void to(
TopicNameExtractor<K, V> topicExtractor)
void to(
TopicNameExtractor<K, V> topicExtractor,
Produced<K, V> produced) Produces records to a given topic or using dynamic routing based on TopicNameExtractor
|
||
|
KStream<K1, V1> transform(
TransformerSupplier<
? super K,
? super V,
KeyValue<K1, V1>> transformerSupplier,
String... stateStoreNames) Stateful record transformation |
||
|
KStream<K, VR> transformValues(
ValueTransformerSupplier<
? super V,
? extends VR> valueTransformerSupplier,
String... stateStoreNames)
KStream<K, VR> transformValues(
ValueTransformerWithKeySupplier<
? super K,
? super V,
? extends VR> valueTransformerSupplier,
String... stateStoreNames) Stateful record-by-record value transformation
|
Note
|
KStreamImpl is the one and only known implementation of the KStream Contract in Kafka Streams. |