Skip to content

Latest commit

 

History

History
170 lines (124 loc) · 8.13 KB

spark-sql-streaming-FlatMapGroupsWithStateExec.adoc

File metadata and controls

170 lines (124 loc) · 8.13 KB

FlatMapGroupsWithStateExec Unary Physical Operator

FlatMapGroupsWithStateExec is a unary physical operator (aka UnaryExecNode) that is created when FlatMapGroupsWithStateStrategy execution planning strategy plans FlatMapGroupsWithState logical operator for execution.

Note
FlatMapGroupsWithState logical operator is created as the result of flatMapGroupsWithState operator.
import java.sql.Timestamp
import org.apache.spark.sql.streaming.GroupState
val stateFunc = (key: Long, values: Iterator[(Timestamp, Long)], state: GroupState[Long]) => {
  Iterator((key, values.size))
}
import java.sql.Timestamp
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode}
val rateGroups = spark.
  readStream.
  format("rate").
  load.
  withWatermark(eventTime = "timestamp", delayThreshold = "10 seconds").  // required for EventTimeTimeout
  as[(Timestamp, Long)].  // leave DataFrame for Dataset
  groupByKey { case (time, value) => value % 2 }. // creates two groups
  flatMapGroupsWithState(OutputMode.Update, GroupStateTimeout.EventTimeTimeout)(stateFunc)  // EventTimeTimeout requires watermark (defined above)

// Check out the physical plan with FlatMapGroupsWithStateExec
scala> rateGroups.explain
== Physical Plan ==
*SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#35L, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#36]
+- FlatMapGroupsWithState <function3>, value#30: bigint, newInstance(class scala.Tuple2), [value#30L], [timestamp#20-T10000ms, value#21L], obj#34: scala.Tuple2, StatefulOperatorStateInfo(<unknown>,63491721-8724-4631-b6bc-3bb1edeb4baf,0,0), class[value[0]: bigint], Update, EventTimeTimeout, 0, 0
   +- *Sort [value#30L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(value#30L, 200)
         +- AppendColumns <function1>, newInstance(class scala.Tuple2), [input[0, bigint, false] AS value#30L]
            +- EventTimeWatermark timestamp#20: timestamp, interval 10 seconds
               +- StreamingRelation rate, [timestamp#20, value#21L]

// Execute the streaming query
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val sq = rateGroups.
  writeStream.
  format("console").
  trigger(Trigger.ProcessingTime(10.seconds)).
  outputMode(OutputMode.Update).  // Append is not supported
  start

// Eventually...
sq.stop
Table 1. FlatMapGroupsWithStateExec’s SQLMetrics
Name Description

numTotalStateRows

Number of keys in the StateStore

Incremented when FlatMapGroupsWithStateExec is executed (and the iterator has finished generating the rows).

stateMemory

Memory used by the StateStore

FlatMapGroupsWithStateExec webui query details
Figure 1. FlatMapGroupsWithStateExec in web UI (Details for Query)

FlatMapGroupsWithStateExec is a ObjectProducerExec that…​FIXME

FlatMapGroupsWithStateExec is a StateStoreWriter that…​FIXME

FlatMapGroupsWithStateExec supports watermark which is…​FIXME

Note

FlatMapGroupsWithStateStrategy converts FlatMapGroupsWithState unary logical operator to FlatMapGroupsWithStateExec physical operator with undefined StatefulOperatorStateInfo, batchTimestampMs, and eventTimeWatermark.

StatefulOperatorStateInfo, batchTimestampMs, and eventTimeWatermark are defined when IncrementalExecution query execution pipeline is requested to apply the physical plan preparation rules.

When executed, FlatMapGroupsWithStateExec requires that the optional values are properly defined given timeoutConf:

Caution
FIXME Where are the optional values defined?
Table 2. FlatMapGroupsWithStateExec’s Internal Registries and Counters (in alphabetical order)
Name Description

isTimeoutEnabled

stateAttributes

stateDeserializer

stateSerializer

timestampTimeoutAttribute

Tip

Enable INFO logging level for org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec=INFO

Refer to Logging.

keyExpressions Method

Caution
FIXME

Executing FlatMapGroupsWithStateExec — doExecute Method

doExecute(): RDD[InternalRow]
Note
doExecute is a part of SparkPlan contract to produce the result of a physical operator as an RDD of internal binary rows (i.e. InternalRow).

Internally, doExecute initializes metrics.

doExecute then executes child physical operator and creates a StateStoreRDD with storeUpdateFunction that:

  1. Creates a StateStoreUpdater

  2. Filters out rows from Iterator[InternalRow] that match watermarkPredicateForData (when defined and timeoutConf is EventTimeTimeout)

  3. Generates an output Iterator[InternalRow] with elements from StateStoreUpdater's updateStateForKeysWithData and updateStateForTimedOutKeys

  4. In the end, storeUpdateFunction creates a CompletionIterator that executes a completion function (aka completionFunction) after it has successfully iterated through all the elements (i.e. when a client has consumed all the rows). The completion method requests StateStore to commit followed by updating numTotalStateRows metric with the number of keys in the state store.

Creating FlatMapGroupsWithStateExec Instance

FlatMapGroupsWithStateExec takes the following when created:

  • State function of type (Any, Iterator[Any], LogicalGroupState[Any]) ⇒ Iterator[Any]

  • Key deserializer Catalyst expression

  • Value deserializer Catalyst expression

  • Grouping attributes (as used for grouping in KeyValueGroupedDataset for mapGroupsWithState or flatMapGroupsWithState operators)

  • Data attributes

  • Output object attribute

  • Optional StatefulOperatorStateInfo

  • State ExpressionEncoder

  • OutputMode

  • GroupStateTimeout

  • Optional batchTimestampMs

  • Optional event time watermark

  • Child physical operator

FlatMapGroupsWithStateExec initializes the internal registries and counters.