Skip to content

Latest commit



225 lines (198 loc) · 8.94 KB


File metadata and controls

225 lines (198 loc) · 8.94 KB

EventTimeWatermarkExec Unary Physical Operator for Accumulating Event Time Watermark

EventTimeWatermarkExec is a unary physical operator (aka UnaryExecNode) that accumulates the event time values (that appear in eventTime watermark column).


EventTimeWatermarkExec uses eventTimeStats accumulator to send the statistics (i.e. the maximum, minimum, average and count) for the event time column in a streaming batch that is later used in:

EventTimeWatermarkExec is created when StatefulAggregationStrategy execution planning strategy plans a EventTimeWatermark logical operator for execution.

EventTimeWatermark logical operator is created as the result of withWatermark operator.
val rates = spark.
  withWatermark(eventTime = "timestamp", delayThreshold = "10 seconds") // <-- use EventTimeWatermark logical operator
scala> rates.explain
== Physical Plan ==
EventTimeWatermark timestamp#0: timestamp, interval 10 seconds
+- StreamingRelation rate, [timestamp#0, value#1L]

import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val sq = rates.
  option("truncate", false).
17/08/11 09:04:17 INFO StreamExecution: Starting rates-to-console [id = ec8f8228-90f6-4e1f-8ad2-80222affed63, runId = f605c134-cfb0-4378-88c1-159d8a7c232e] with file:///private/var/folders/0w/kb0d3rqn4zb9fcc91pxhgn8w0000gn/T/temporary-3869a982-9824-4715-8cce-cce7c8251299 to store the query checkpoint.
Batch: 0
17/08/11 09:04:17 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8] -> 0),ArrayBuffer(),Map(watermark -> 1970-01-01T00:00:00.000Z))
17/08/11 09:04:17 INFO StreamExecution: Streaming query made progress: {
  "id" : "ec8f8228-90f6-4e1f-8ad2-80222affed63",
  "runId" : "f605c134-cfb0-4378-88c1-159d8a7c232e",
  "name" : "rates-to-console",
  "timestamp" : "2017-08-11T07:04:17.373Z",
  "batchId" : 0,
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 38,
    "getBatch" : 1,
    "getOffset" : 0,
    "queryPlanning" : 1,
    "triggerExecution" : 62,
    "walCommit" : 19
  "eventTime" : {
    "watermark" : "1970-01-01T00:00:00.000Z"  // <-- no watermark found yet
17/08/11 09:04:17 DEBUG StreamExecution: batch 0 committed
Batch: 1
|timestamp              |value|
|2017-08-11 09:04:17.282|0    |
|2017-08-11 09:04:18.282|1    |
17/08/11 09:04:20 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8] -> 2),ArrayBuffer(),Map(max -> 2017-08-11T07:04:18.282Z, min -> 2017-08-11T07:04:17.282Z, avg -> 2017-08-11T07:04:17.782Z, watermark -> 1970-01-01T00:00:00.000Z))
// Notice eventTimeStats in eventTime section below
// They are only available when watermark is used and
// EventTimeWatermarkExec.eventTimeStats.value.count > 0, i.e.
// there were input rows (with event time)
// Note that watermark has NOT been changed yet (perhaps it should have)
17/08/11 09:04:20 INFO StreamExecution: Streaming query made progress: {
  "id" : "ec8f8228-90f6-4e1f-8ad2-80222affed63",
  "runId" : "f605c134-cfb0-4378-88c1-159d8a7c232e",
  "name" : "rates-to-console",
  "timestamp" : "2017-08-11T07:04:20.004Z",
  "batchId" : 1,
  "numInputRows" : 2,
  "inputRowsPerSecond" : 0.7601672367920943,
  "processedRowsPerSecond" : 25.31645569620253,
  "durationMs" : {
    "addBatch" : 48,
    "getBatch" : 6,
    "getOffset" : 0,
    "queryPlanning" : 1,
    "triggerExecution" : 79,
    "walCommit" : 23
  "eventTime" : {
    "avg" : "2017-08-11T07:04:17.782Z",
    "max" : "2017-08-11T07:04:18.282Z",
    "min" : "2017-08-11T07:04:17.282Z",
    "watermark" : "1970-01-01T00:00:00.000Z"
17/08/11 09:04:20 DEBUG StreamExecution: batch 1 committed
// At long last!
// I think it should have been a batch earlier
// I did ask about it on the dev mailing list today (on 17/08/11)
17/08/11 09:04:30 DEBUG StreamExecution: Observed event time stats: EventTimeStats(1502435058282,1502435057282,1.502435057782E12,2)
17/08/11 09:04:30 INFO StreamExecution: Updating eventTime watermark to: 1502435048282 ms
Batch: 2
|timestamp              |value|
|2017-08-11 09:04:19.282|2    |
|2017-08-11 09:04:20.282|3    |
|2017-08-11 09:04:21.282|4    |
|2017-08-11 09:04:22.282|5    |
|2017-08-11 09:04:23.282|6    |
|2017-08-11 09:04:24.282|7    |
|2017-08-11 09:04:25.282|8    |
|2017-08-11 09:04:26.282|9    |
|2017-08-11 09:04:27.282|10   |
|2017-08-11 09:04:28.282|11   |
17/08/11 09:04:30 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8] -> 10),ArrayBuffer(),Map(max -> 2017-08-11T07:04:28.282Z, min -> 2017-08-11T07:04:19.282Z, avg -> 2017-08-11T07:04:23.782Z, watermark -> 2017-08-11T07:04:08.282Z))
17/08/11 09:04:30 INFO StreamExecution: Streaming query made progress: {
  "id" : "ec8f8228-90f6-4e1f-8ad2-80222affed63",
  "runId" : "f605c134-cfb0-4378-88c1-159d8a7c232e",
  "name" : "rates-to-console",
  "timestamp" : "2017-08-11T07:04:30.003Z",
  "batchId" : 2,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 1.000100010001,
  "processedRowsPerSecond" : 56.17977528089888,
  "durationMs" : {
    "addBatch" : 147,
    "getBatch" : 6,
    "getOffset" : 0,
    "queryPlanning" : 1,
    "triggerExecution" : 178,
    "walCommit" : 22
  "eventTime" : {
    "avg" : "2017-08-11T07:04:23.782Z",
    "max" : "2017-08-11T07:04:28.282Z",
    "min" : "2017-08-11T07:04:19.282Z",
    "watermark" : "2017-08-11T07:04:08.282Z"
17/08/11 09:04:30 DEBUG StreamExecution: batch 2 committed

// In the end, stop the streaming query
Table 1. EventTimeWatermarkExec’s Internal Registries and Counters
Name Description



Used when…​FIXME


EventTimeStatsAccum accumulator to accumulate eventTime values from every row in a streaming batch (when EventTimeWatermarkExec is executed).

EventTimeStatsAccum is a Spark accumulator of EventTimeStats from Longs (i.e. AccumulatorV2[Long, EventTimeStats]).
Every Spark accumulator has to be registered before use, and eventTimeStats is registered when EventTimeWatermarkExec is created.

Executing EventTimeWatermarkExec (And Collecting Event Times) — doExecute Method

doExecute(): RDD[InternalRow]
doExecute is a part of SparkPlan contract to produce the result of a physical operator as an RDD of internal binary rows (i.e. InternalRow).

Internally, doExecute executes child physical operator and maps over the partitions (using RDD.mapPartitions) that does the following:

  1. Creates an unsafe projection for eventTime in the output schema of child physical operator.

  2. For every row (as InternalRow)

Creating EventTimeWatermarkExec Instance

EventTimeWatermarkExec takes the following when created:

  • Name of the eventTime watermark column

  • Delay CalendarInterval

  • Child physical plan

While being created, EventTimeWatermarkExec registers eventTimeStats accumulator (with the current SparkContext).

EventTimeWatermarkExec initializes the internal registries and counters.