EventTimeWatermark
is a unary logical operator (i.e. UnaryNode
) that is created as the result of withWatermark operator.
val q = spark.
readStream.
format("rate").
load.
withWatermark(eventTime = "timestamp", delayThreshold = "30 seconds") // <-- creates EventTimeWatermark
scala> q.explain(extended = true)
== Parsed Logical Plan ==
'EventTimeWatermark 'timestamp, interval 30 seconds
+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3d97b0a,rate,List(),None,List(),None,Map(),None), rate, [timestamp#10, value#11L]
== Analyzed Logical Plan ==
timestamp: timestamp, value: bigint
EventTimeWatermark timestamp#10: timestamp, interval 30 seconds
+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3d97b0a,rate,List(),None,List(),None,Map(),None), rate, [timestamp#10, value#11L]
== Optimized Logical Plan ==
EventTimeWatermark timestamp#10: timestamp, interval 30 seconds
+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3d97b0a,rate,List(),None,List(),None,Map(),None), rate, [timestamp#10, value#11L]
== Physical Plan ==
EventTimeWatermark timestamp#10: timestamp, interval 30 seconds
+- StreamingRelation rate, [timestamp#10, value#11L]
EventTimeWatermark
uses spark.watermarkDelayMs
key (in Metadata
in output) to hold the event-time watermark delay.
Note
|
The event-time watermark delay is used to calculate the difference between the event time of an event (that is modelled as a row in the Dataset for a streaming batch) and the time in the past. |
Note
|
val logs = spark.
read. // <-- batch non-streaming query that makes `EliminateEventTimeWatermark` rule applicable
format("text").
load("logs")
// logs is a batch Dataset
scala> logs.isStreaming
res0: Boolean = false
val q = logs.
withWatermark(eventTime = "timestamp", delayThreshold = "30 seconds") // <-- creates EventTimeWatermark
scala> println(q.queryExecution.logical.numberedTreeString) // <-- no EventTimeWatermark as it was removed immediately
00 Relation[value#0] text |
Note
|
EventTimeWatermark is converted (aka planned) to EventTimeWatermarkExec physical operator in StatefulAggregationStrategy execution planning strategy.
|
output: Seq[Attribute]
Note
|
output is a part of the QueryPlan Contract to describe the attributes of (the schema of) the output.
|
output
finds eventTime column in the child's output schema and updates the Metadata
of the column with spark.watermarkDelayMs key and the milliseconds for the delay.
output
removes spark.watermarkDelayMs key from the other columns.
// See q created above
// FIXME How to access/show the eventTime column with the metadata updated to include spark.watermarkDelayMs?
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
val etw = q.queryExecution.logical.asInstanceOf[EventTimeWatermark]
scala> etw.output.toStructType.printTreeString
root
|-- timestamp: timestamp (nullable = true)
|-- value: long (nullable = true)