Skip to content

Commit

Permalink
fix: get all available restore commands even on poll timeout (#6985)
Browse files Browse the repository at this point in the history
our restore logic differentiates on restore behaviour vs live behaviour
to avoid making some newly added referential integrity checks and therefore
ensure we execute anything that was already logged. While this isn't the
best way to guarantee compatibility, it's what we're doing now and we have
a problem: sometimes the restore poll times out and returns no records,
so we don't execute any restore phase. This patch fixes this by first
sampling the end offset of the log and ensuring that all rows between the
begin and end are returned.
  • Loading branch information
rodesai authored Feb 23, 2021
1 parent 3827af7 commit 28a7ba9
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,16 @@ public Iterable<ConsumerRecord<byte[], byte[]>> getNewCommands(final Duration ti
public List<QueuedCommand> getRestoreCommands(final Duration duration) {
final List<QueuedCommand> restoreCommands = Lists.newArrayList();

final long endOffset = getEndOffset();

commandConsumer.seekToBeginning(
Collections.singletonList(commandTopicPartition));

log.debug("Reading prior command records");
ConsumerRecords<byte[], byte[]> records =
commandConsumer.poll(duration);
while (!records.isEmpty()) {
log.debug("Received {} records from poll", records.count());
log.info("Reading prior command records up to offset {}", endOffset);

while (commandConsumer.position(commandTopicPartition) < endOffset) {
final ConsumerRecords<byte[], byte[]> records = commandConsumer.poll(duration);
log.info("Received {} records from command topic restore poll", records.count());
for (final ConsumerRecord<byte[], byte[]> record : records) {
try {
backupRecord(record);
Expand All @@ -132,7 +134,6 @@ public List<QueuedCommand> getRestoreCommands(final Duration duration) {
Optional.empty(),
record.offset()));
}
records = commandConsumer.poll(duration);
}
return restoreCommands;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public void setup() {
consumerRecords =
new ConsumerRecords<>(Collections.singletonMap(topicPartition, ImmutableList.of(record1, record2, record3)));
commandTopic = new CommandTopic(COMMAND_TOPIC_NAME, commandConsumer, commandTopicBackup);
when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 0L));
}

@Test
Expand Down Expand Up @@ -128,8 +129,9 @@ public void shouldNotGetCommandsWhenCommandTopicCorruptionIhBackupInRestore() {
record1,
record2))
.thenReturn(someConsumerRecords(
record3))
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
record3));
when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 3L));
when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L);
doNothing().doThrow(new CommandTopicCorruptionException("error")).when(commandTopicBackup).writeRecord(any());

// When:
Expand All @@ -153,8 +155,9 @@ public void shouldGetRestoreCommandsCorrectly() {
record1,
record2))
.thenReturn(someConsumerRecords(
record3))
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
record3));
when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 3L));
when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 2L, 3L);

// When:
final List<QueuedCommand> queuedCommandList = commandTopic
Expand All @@ -178,8 +181,9 @@ public void shouldHaveOffsetsInQueuedCommands() {
new ConsumerRecord<>("topic", 0, 0, commandId1, command1),
new ConsumerRecord<>("topic", 0, 1, commandId2, command2)))
.thenReturn(someConsumerRecords(
new ConsumerRecord<>("topic", 0, 2, commandId3, command3)))
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
new ConsumerRecord<>("topic", 0, 2, commandId3, command3)));
when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 3L));
when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 2L, 3L);

// When:
final List<QueuedCommand> queuedCommandList = commandTopic
Expand All @@ -195,6 +199,27 @@ public void shouldHaveOffsetsInQueuedCommands() {
new QueuedCommand(commandId3, command3, Optional.empty(), 2L))));
}

@Test
public void shouldGetRestoreCommandsCorrectlyOnPollTimeout() {
// Given:
when(commandConsumer.poll(any(Duration.class)))
.thenReturn(ConsumerRecords.empty())
.thenReturn(someConsumerRecords(
new ConsumerRecord<>("topic", 0, 0, commandId1, command1),
new ConsumerRecord<>("topic", 0, 1, commandId2, command2)));
when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 2L));
when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 0L, 2L);

// When:
final List<QueuedCommand> queuedCommandList = commandTopic
.getRestoreCommands(Duration.ofMillis(1));

// Then:
assertThat(queuedCommandList, equalTo(ImmutableList.of(
new QueuedCommand(commandId1, command1, Optional.empty(), 0L),
new QueuedCommand(commandId2, command2, Optional.empty(), 1L))));
}

@Test
public void shouldGetRestoreCommandsCorrectlyWithDuplicateKeys() {
// Given:
Expand All @@ -204,8 +229,9 @@ public void shouldGetRestoreCommandsCorrectlyWithDuplicateKeys() {
new ConsumerRecord<>("topic", 0, 1, commandId2, command2)))
.thenReturn(someConsumerRecords(
new ConsumerRecord<>("topic", 0, 2, commandId2, command3),
new ConsumerRecord<>("topic", 0, 3, commandId3, command3)))
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
new ConsumerRecord<>("topic", 0, 3, commandId3, command3)));
when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 4L));
when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 2L, 4L);

// When:
final List<QueuedCommand> queuedCommandList = commandTopic
Expand All @@ -227,8 +253,10 @@ public void shouldFilterNullCommandsWhileRestoringCommands() {
record1,
record2,
new ConsumerRecord<>("topic", 0, 2, commandId2, null)
))
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
));
when(commandConsumer.endOffsets(any()))
.thenReturn(Collections.singletonMap(TOPIC_PARTITION, 2L));
when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 2L);

// When:
final List<QueuedCommand> recordList = commandTopic
Expand Down Expand Up @@ -262,10 +290,9 @@ public void shouldCloseAllResources() {
public void shouldHaveAllCreateCommandsInOrder() {
// Given:
final ConsumerRecords<byte[], byte[]> records = someConsumerRecords(record1, record2, record3);

when(commandTopic.getNewCommands(any()))
.thenReturn(records)
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
when(commandTopic.getNewCommands(any())).thenReturn(records);
when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 3L));
when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 3L);

// When:
final List<QueuedCommand> commands = commandTopic.getRestoreCommands(Duration.ofMillis(10));
Expand Down Expand Up @@ -300,9 +327,9 @@ public void shouldCloseCommandTopicBackup() {
public void shouldBackupRestoreCommands() {
// Given
when(commandConsumer.poll(any(Duration.class)))
.thenReturn(someConsumerRecords(record1, record2))
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
commandTopic.start();
.thenReturn(someConsumerRecords(record1, record2));
when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 2L));
when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 2L);

// When
commandTopic.getRestoreCommands(Duration.ofHours(1));
Expand Down

0 comments on commit 28a7ba9

Please sign in to comment.