Skip to content

Latest commit

 

History

History
181 lines (122 loc) · 5.97 KB

spark-sql-streaming-KafkaOffsetReader.adoc

File metadata and controls

181 lines (122 loc) · 5.97 KB

KafkaOffsetReader

KafkaOffsetReader is created when:

Table 1. KafkaOffsetReader’s Options
Name Default Value Description

fetchOffset.numRetries

3

fetchOffset.retryIntervalMs

1000

How long to wait before retries.

KafkaOffsetReader defines the predefined fixed schema of Kafka source.

Table 2. KafkaOffsetReader’s Internal Registries and Counters
Name Description

consumer

Kafka’s Consumer (with keys and values of Array[Byte] type)

Initialized when KafkaOffsetReader is created.

Used when KafkaOffsetReader:

execContext

groupId

kafkaReaderThread

maxOffsetFetchAttempts

nextId

offsetFetchAttemptIntervalMs

Tip

Enable INFO or DEBUG logging levels for org.apache.spark.sql.kafka010.KafkaOffsetReader to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.kafka010.KafkaOffsetReader=DEBUG

Refer to Logging.

nextGroupId Internal Method

Caution
FIXME

resetConsumer Internal Method

Caution
FIXME

fetchTopicPartitions Method

fetchTopicPartitions(): Set[TopicPartition]
Caution
FIXME
Note
fetchTopicPartitions is used when KafkaRelation getPartitionOffsets.

Fetching Earliest Offsets — fetchEarliestOffsets Method

fetchEarliestOffsets(newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long]
Caution
FIXME
Note
fetchEarliestOffsets is used when KafkaSource rateLimit and generates a DataFrame for a batch (when new partitions have been assigned).

Fetching Latest Offsets — fetchLatestOffsets Method

fetchLatestOffsets(): Map[TopicPartition, Long]
Caution
FIXME
Note
fetchLatestOffsets is used when KafkaSource gets offsets or initialPartitionOffsets is initialized.

withRetriesWithoutInterrupt Internal Method

withRetriesWithoutInterrupt(body: => Map[TopicPartition, Long]): Map[TopicPartition, Long]

Creating KafkaOffsetReader Instance

KafkaOffsetReader takes the following when created:

KafkaOffsetReader initializes the internal registries and counters.

Fetching Offsets for Selected TopicPartitions — fetchSpecificOffsets Method

fetchSpecificOffsets(partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long]
KafkaOffsetReader fetchSpecificOffsets
Figure 1. KafkaOffsetReader’s fetchSpecificOffsets

fetchSpecificOffsets requests the Kafka Consumer to poll(0).

fetchSpecificOffsets requests the Kafka Consumer for assigned partitions (using Consumer.assignment()).

fetchSpecificOffsets requests the Kafka Consumer to pause(partitions).

You should see the following DEBUG message in the logs:

DEBUG KafkaOffsetReader: Partitions assigned to consumer: [partitions]. Seeking to [partitionOffsets]

For every partition offset in the input partitionOffsets, fetchSpecificOffsets requests the Kafka Consumer to:

  • seekToEnd for the latest (aka -1)

  • seekToBeginning for the earliest (aka -2)

  • seek for other offsets

In the end, fetchSpecificOffsets creates a collection of Kafka’s TopicPartition and position (using the Kafka Consumer).

Note
fetchSpecificOffsets is used when KafkaSource fetches and verifies initial partition offsets.

Creating Kafka Consumer — createConsumer Internal Method

createConsumer(): Consumer[Array[Byte], Array[Byte]]
Note
createConsumer is used when KafkaOffsetReader is created (and initializes consumer) and resetConsumer