Append
output mode requires that a streaming aggregation defines a watermark (using withWatermark operator) on at least one of the grouping expressions (directly or using window function).
Note
|
withWatermark operator has to be used before the aggregation operator (for the watermark to be used). |
In Append
output mode the current watermark level is used to:
Note
|
Sorting is only supported on streaming aggregated Datasets with Complete output mode.
|
Batch / Events | Current Watermark Level [ms] | Expired State, Late Events and Saved State Rows | |||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
0 |
Saved State Rows
|
|||||||||||||||||||||||||||||||||||||||||||||
|
5000 (Maximum event time |
Expired State
Late Events
Saved State Rows
|
|||||||||||||||||||||||||||||||||||||||||||||
|
25000 (Maximum event time from the previous batch is |
Expired State
Late Events
Saved State Rows
|
|||||||||||||||||||||||||||||||||||||||||||||
|
25000 (Maximum event time from the previous batch is |
Saved State Rows
|
|||||||||||||||||||||||||||||||||||||||||||||
|
26000 (Maximum event time from the previous batch is |
Expired State
Saved State Rows
|
Note
|
Event time watermark may advance based on the maximum event time from the previous events (from the previous batch exactly as the level advances every trigger so the earlier levels are already counted in). |
Note
|
Event time watermark can only change when the maximum event time is bigger than the current watermark minus the delayThreshold (as defined using withWatermark operator).
|
Tip
|
Use the following to publish events to Kafka.
|
/**
* Reading datasets with records from a Kafka topic
*/
/**
TIP (only when working with SNAPSHOT version)
Remove the SNAPSHOT package from the local cache
rm -rf \
~/.ivy2/cache/org.apache.spark \
~/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.0-SNAPSHOT.jar
*/
/**
TIP: Start spark-shell with spark-sql-kafka-0-10 package
./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0-SNAPSHOT
*/
/**
TIP: Copy the following code to append.txt and use :load command in spark-shell to load it
:load append.txt
*/
// START: Only for easier debugging
// The state is then only for one partition
// which should make monitoring it easier
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 1)
scala> spark.sessionState.conf.numShufflePartitions
res1: Int = 1
// END: Only for easier debugging
// Use streaming aggregation with groupBy operator to have StateStoreSaveExec operator
// Since the demo uses Append output mode
// it has to define a streaming event time watermark using withWatermark operator
// UnsupportedOperationChecker makes sure that the requirement holds
val idsPerBatch = spark.
readStream.
format("kafka").
option("subscribe", "topic1").
option("kafka.bootstrap.servers", "localhost:9092").
load.
withColumn("tokens", split('value, ",")).
withColumn("seconds", 'tokens(0) cast "long").
withColumn("event_time", to_timestamp(from_unixtime('seconds))). // <-- Event time has to be a timestamp
withColumn("id", 'tokens(1)).
withColumn("batch", 'tokens(2) cast "int").
withWatermark(eventTime = "event_time", delayThreshold = "10 seconds"). // <-- define watermark (before groupBy!)
groupBy($"event_time"). // <-- use event_time for grouping
agg(collect_list("batch") as "batches", collect_list("id") as "ids").
withColumn("event_time", to_timestamp($"event_time")) // <-- convert to human-readable date
// idsPerBatch is a streaming Dataset with just one Kafka source
// so it knows nothing about output mode or the current streaming watermark yet
// - Output mode is defined on writing side
// - streaming watermark is read from rows at runtime
// That's why StatefulOperatorStateInfo is generic (and uses the default Append for output mode)
// and no batch-specific values are printed out
// They will be available right after the first streaming batch
// Use explain on a streaming query to know the trigger-specific values
scala> idsPerBatch.explain
== Physical Plan ==
*Project [event_time#36-T10000ms AS event_time#97, batches#90, ids#92]
+- ObjectHashAggregate(keys=[event_time#36-T10000ms], functions=[collect_list(batch#61, 0, 0), collect_list(id#48, 0, 0)])
+- Exchange hashpartitioning(event_time#36-T10000ms, 1)
+- StateStoreSave [event_time#36-T10000ms], StatefulOperatorStateInfo(<unknown>,7c5641eb-8ff9-447b-b9ba-b347c057d08f,0,0), Append, 0
+- ObjectHashAggregate(keys=[event_time#36-T10000ms], functions=[merge_collect_list(batch#61, 0, 0), merge_collect_list(id#48, 0, 0)])
+- Exchange hashpartitioning(event_time#36-T10000ms, 1)
+- StateStoreRestore [event_time#36-T10000ms], StatefulOperatorStateInfo(<unknown>,7c5641eb-8ff9-447b-b9ba-b347c057d08f,0,0)
+- ObjectHashAggregate(keys=[event_time#36-T10000ms], functions=[merge_collect_list(batch#61, 0, 0), merge_collect_list(id#48, 0, 0)])
+- Exchange hashpartitioning(event_time#36-T10000ms, 1)
+- ObjectHashAggregate(keys=[event_time#36-T10000ms], functions=[partial_collect_list(batch#61, 0, 0), partial_collect_list(id#48, 0, 0)])
+- EventTimeWatermark event_time#36: timestamp, interval 10 seconds
+- *Project [cast(from_unixtime(cast(split(cast(value#1 as string), ,)[0] as bigint), yyyy-MM-dd HH:mm:ss, Some(Europe/Berlin)) as timestamp) AS event_time#36, split(cast(value#1 as string), ,)[1] AS id#48, cast(split(cast(value#1 as string), ,)[2] as int) AS batch#61]
+- StreamingRelation kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
// Start the query and hence StateStoreSaveExec
// Note Append output mode
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val sq = idsPerBatch.
writeStream.
format("console").
option("truncate", false).
trigger(Trigger.ProcessingTime(5.seconds)).
outputMode(OutputMode.Append). // <-- Append output mode
start
-------------------------------------------
Batch: 0
-------------------------------------------
+----------+-------+---+
|event_time|batches|ids|
+----------+-------+---+
+----------+-------+---+
// there's only 1 stateful operator and hence 0 for the index in stateOperators
scala> println(sq.lastProgress.stateOperators(0).prettyJson)
{
"numRowsTotal" : 0,
"numRowsUpdated" : 0,
"memoryUsedBytes" : 77
}
// Current watermark
// We've just started so it's the default start time
scala> println(sq.lastProgress.eventTime.get("watermark"))
1970-01-01T00:00:00.000Z
-------------------------------------------
Batch: 1
-------------------------------------------
+----------+-------+---+
|event_time|batches|ids|
+----------+-------+---+
+----------+-------+---+
// it's Append output mode so numRowsTotal is...FIXME
// no keys were available earlier (it's just started!) and so numRowsUpdated is 0
scala> println(sq.lastProgress.stateOperators(0).prettyJson)
{
"numRowsTotal" : 2,
"numRowsUpdated" : 2,
"memoryUsedBytes" : 669
}
// Current watermark
// One streaming batch has passed so it's still the default start time
// that will get changed the next streaming batch
// watermark is always one batch behind
scala> println(sq.lastProgress.eventTime.get("watermark"))
1970-01-01T00:00:00.000Z
// Could be 0 if the time to update the lastProgress is short
// FIXME Explain it in detail
scala> println(sq.lastProgress.numInputRows)
2
-------------------------------------------
Batch: 2
-------------------------------------------
+-------------------+-------+---+
|event_time |batches|ids|
+-------------------+-------+---+
|1970-01-01 01:00:01|[1] |[1]|
+-------------------+-------+---+
scala> println(sq.lastProgress.stateOperators(0).prettyJson)
{
"numRowsTotal" : 2,
"numRowsUpdated" : 2,
"memoryUsedBytes" : 701
}
// Current watermark
// Updated and so the output with the final aggregation (aka expired state)
scala> println(sq.lastProgress.eventTime.get("watermark"))
1970-01-01T00:00:05.000Z
scala> println(sq.lastProgress.numInputRows)
3
-------------------------------------------
Batch: 3
-------------------------------------------
+-------------------+-------+------+
|event_time |batches|ids |
+-------------------+-------+------+
|1970-01-01 01:00:15|[2, 1] |[2, 2]|
+-------------------+-------+------+
scala> println(sq.lastProgress.stateOperators(0).prettyJson)
{
"numRowsTotal" : 2,
"numRowsUpdated" : 1,
"memoryUsedBytes" : 685
}
// Current watermark
// Updated and so the output with the final aggregation (aka expired state)
scala> println(sq.lastProgress.eventTime.get("watermark"))
1970-01-01T00:00:25.000Z
scala> println(sq.lastProgress.numInputRows)
4
-------------------------------------------
Batch: 4
-------------------------------------------
+----------+-------+---+
|event_time|batches|ids|
+----------+-------+---+
+----------+-------+---+
scala> println(sq.lastProgress.stateOperators(0).prettyJson)
{
"numRowsTotal" : 3,
"numRowsUpdated" : 1,
"memoryUsedBytes" : 965
}
scala> println(sq.lastProgress.eventTime.get("watermark"))
1970-01-01T00:00:25.000Z
scala> println(sq.lastProgress.numInputRows)
1
// publish new records
// See the events table above
-------------------------------------------
Batch: 5
-------------------------------------------
+-------------------+-------+---+
|event_time |batches|ids|
+-------------------+-------+---+
|1970-01-01 01:00:26|[3] |[4]|
+-------------------+-------+---+
scala> println(sq.lastProgress.stateOperators(0).prettyJson)
{
"numRowsTotal" : 3,
"numRowsUpdated" : 1,
"memoryUsedBytes" : 997
}
// Current watermark
// Updated and so the output with the final aggregation (aka expired state)
scala> println(sq.lastProgress.eventTime.get("watermark"))
1970-01-01T00:00:26.000Z
scala> println(sq.lastProgress.numInputRows)
1
// In the end...
sq.stop