Deduplicate
is a unary logical operator (i.e. LogicalPlan
) that is created to represent dropDuplicates operator (that drops duplicate records for a given subset of columns).
Deduplicate
has streaming flag enabled for streaming Datasets.
val uniqueRates = spark.
readStream.
format("rate").
load.
dropDuplicates("value") // <-- creates Deduplicate logical operator
// Note the streaming flag
scala> println(uniqueRates.queryExecution.logical.numberedTreeString)
00 Deduplicate [value#33L], true // <-- streaming flag enabled
01 +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@4785f176,rate,List(),None,List(),None,Map(),None), rate, [timestamp#32, value#33L]
Caution
|
FIXME Example with duplicates across batches to show that Deduplicate keeps state and withWatermark operator should also be used to limit how much is stored (to not cause OOM)
|
Note
|
The following code is not supported in Structured Streaming and results in an val counts = spark.
readStream.
format("rate").
load.
groupBy(window($"timestamp", "5 seconds") as "group").
agg(count("value") as "value_count").
dropDuplicates // <-- after groupBy
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val sq = counts.
writeStream.
format("console").
trigger(Trigger.ProcessingTime(10.seconds)).
outputMode(OutputMode.Complete).
start
org.apache.spark.sql.AnalysisException: dropDuplicates is not supported after aggregation on a streaming DataFrame/Dataset;; |
Note
|
|
The output schema of Deduplicate
is exactly the child's output schema.