StoreChangelogReader
is the default ChangelogReader.
StoreChangelogReader
is created for a StreamThread for the only purpose of creating the TaskCreator, the StandbyTaskCreator and the TaskManager.
StoreChangelogReader
is used for the following:
-
Tasks to create a ProcessorStateManager
-
TaskManager when suspending all (active and standby) tasks and state (at the beginning of consumer rebalance) and updateNewAndRestoringTasks (at the end of consumer rebalance)
Tip
|
Enable Add the following line to
Refer to Application Logging Using log4j. |
StoreChangelogReader
takes the following to be created:
StoreChangelogReader
initializes the internal properties.
When created, StoreChangelogReader
is given the poll duration that is configured using StreamsConfig.POLL_MS_CONFIG configuration property.
The poll duration is used exclusively for the restore Consumer to fetch data for changelog partitions (using Consumer.poll) when restoring active StreamTasks (from the changelog partitions).
When created, StoreChangelogReader
is given a user-defined StateRestoreListener.
StoreChangelogReader
uses the StateRestoreListener exclusively when registering a StateRestorer for a changelog partition so that the StateRestorer to be registered is associated with the listener.
void register(StateRestorer restorer)
Note
|
register is part of ChangelogReader Contract to register a StateRestorer (for a changelog partition).
|
register
requests the given StateRestorer
for the partition.
If the stateRestorers internal registry does not have the partition registered, register
requests the StateRestorer
to setUserRestoreListener with the userStateRestoreListener and adds it to the stateRestorers registry. register
prints out the following TRACE message to the logs:
Added restorer for changelog [partition]
In the end, register
adds the partition to the needsInitializing internal registry.
Map<TopicPartition, Long> restoredOffsets()
Note
|
restoredOffsets is part of ChangelogReader Contract to restore offsets.
|
restoredOffsets
returns pairs of TopicPartition
and restoredOffset (from the associated StateRestorer that is persistent for the state store that is persistent).
Internally, for every pair of TopicPartition
and StateRestorer (in the stateRestorers internal registry), restoredOffsets
requests the StateRestorer
to restoredOffset when the restorer is persistent (i.e. when the associated state store is persistent).
Collection<TopicPartition> restore(
RestoringTasks active)
Note
|
restore is part of the ChangelogReader Contract to restore logging-enabled state stores using the RestoringTasks.
|
restore
initializes (with the given active RestoringTasks) when the needsInitializing internal registry has at least one changelog partition (that was added when registering a StateRestorer for a changelog partition).
restore
simply finishes with the completely-restored changelog partitions when there is no changelog partitions that need restoring. restore
also requests the restore Consumer to unsubscribe from changelog topics (KafkaConsumer.unsubscribe
).
With at least one changelog partition that needs restoring, restore
requests the restore Consumer to fetch data for the changelog partitions (KafkaConsumer.poll
) with the poll duration.
restore
…FIXME
restore
removes all changelog partitions that are completed from the needsRestoring internal registry.
In the end, restore
requests the restore Consumer to unsubscribe from changelog topics (KafkaConsumer.unsubscribe
) when the needsRestoring internal registry has no changelog partition and simply finishes with the completely-restored changelog partitions.
In case of InvalidOffsetException
, restore
…FIXME
void initialize(RestoringTasks active)
initialize
…FIXME
Note
|
initialize is used exclusively when StateRestorer is requested to restore (and there are needsInitializing changelog partitions).
|
void startRestoration(
Set<TopicPartition> initialized,
RestoringTasks active)
startRestoration
prints out the following DEBUG message to the logs:
Start restoring state stores from changelog topics [initialized]
startRestoration
requests the restore Consumer for partition assignment, adds the initialized
partitions and finally requests the restore Consumer to use the partitions only (aka manual partition assignment).
For every initialized
partition, startRestoration
uses the stateRestorers internal registry to find the associated StateRestorer
that is then requested for the checkpoint offset.
There are two possible cases of the checkpoint offsets.
When the checkpoint offset is known, startRestoration
prints out the following TRACE message to the logs:
Found checkpoint [checkpoint] from changelog [partition] for store [storeName].
startRestoration
requests the restore Consumer to seek (the fetch offsets) for the partition to the checkpoint.
startRestoration
looks up the partition in the endOffsets internal registry and prints out the following DEBUG message to the logs:
Restoring partition [partition] from offset [startingOffset] to endOffset [endOffset]
startRestoration
requests the StateRestorer
to set the starting offset (with the offset of the next record to be fetched for the partition using the restore Consumer).
startRestoration
requests the StateRestorer
to restoreStarted.
When the checkpoint offset is unknown, startRestoration
prints out the following TRACE message to the logs:
Did not find checkpoint from changelog [partition] for store [storeName], rewinding to beginning.
startRestoration
requests the restore Consumer to seek to the beginning (KafkaConsumer.seekToBeginning
) for the partition.
startRestoration
adds the partition to needsPositionUpdate
local registry.
For every StateRestorer
in the startRestoration
local registry (for which the checkpoint offset was unknown), startRestoration
requests the StateRestorer
for the partition.
startRestoration
requests the given active RestoringTasks for the restoring StreamTask of the changelog partition.
There are two possible cases of the restoring StreamTask.
With Exactly-Once Support enabled, startRestoration
prints out the following INFO message to the logs:
No checkpoint found for task [id] state store [storeName] changelog [partition] with EOS turned on. Reinitializing the task and restore its state from the beginning.
startRestoration
removes the partition from the needsInitializing internal registry (and the initialized
local registry).
startRestoration
requests the StateRestorer
to set the checkpoint offset (with the offset of the next record to be fetched for the partition using the restore Consumer).
startRestoration
requests the StreamTask
to reinitializeStateStoresForPartitions with the partition.
With Exactly-Once Support disabled, startRestoration
prints out the following INFO message to the logs:
Restoring task [id]'s state store [storeName] from beginning of the changelog [partition]
startRestoration
requests the restore Consumer for the offset of the next record to be fetched (position) for the partition to the StateRestorer
.
startRestoration
looks up the partition of the StateRestorer
in the endOffsets internal registry and prints out the following DEBUG message to the logs:
Restoring partition [partition] from offset [position] to endOffset [endOffset]
startRestoration
requests the StateRestorer
to set the starting offset to the position (of the restore Consumer).
startRestoration
requests the StateRestorer
to restoreStarted.
In the end, startRestoration
adds all initialized
partitions to the needsRestoring internal registry.
Note
|
startRestoration is used exclusively when StoreChangelogReader is requested to initialize (when requested to restore).
|
long processNext(
List<ConsumerRecord<byte[], byte[]>> records,
StateRestorer restorer,
Long endOffset)
processNext
…FIXME
Note
|
processNext is used exclusively when StoreChangelogReader is requested to restore active StreamTasks (from changelog partitions).
|
Name | Description |
---|---|
|
|
|
|
|
Changelog partitions (of StateRestorers) that need initializing (
Used in restore |
|
Changelog partitions (of StateRestorers) that need restoring (
Used in restore |
|
|
|
StateRestorers per partition of changelog topic of a state store ( Used in restore, initialize, and restoredOffsets |