Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: get all available restore commands even on poll timeout #6985

Merged
merged 1 commit into from
Feb 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,16 @@ public Iterable<ConsumerRecord<byte[], byte[]>> getNewCommands(final Duration ti
public List<QueuedCommand> getRestoreCommands(final Duration duration) {
final List<QueuedCommand> restoreCommands = Lists.newArrayList();

final long endOffset = getEndOffset();

commandConsumer.seekToBeginning(
Collections.singletonList(commandTopicPartition));

log.debug("Reading prior command records");
ConsumerRecords<byte[], byte[]> records =
commandConsumer.poll(duration);
while (!records.isEmpty()) {
log.debug("Received {} records from poll", records.count());
log.info("Reading prior command records up to offset {}", endOffset);

while (commandConsumer.position(commandTopicPartition) < endOffset) {
final ConsumerRecords<byte[], byte[]> records = commandConsumer.poll(duration);
log.info("Received {} records from command topic restore poll", records.count());
for (final ConsumerRecord<byte[], byte[]> record : records) {
try {
backupRecord(record);
Expand All @@ -132,7 +134,6 @@ public List<QueuedCommand> getRestoreCommands(final Duration duration) {
Optional.empty(),
record.offset()));
}
records = commandConsumer.poll(duration);
}
return restoreCommands;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public void setup() {
consumerRecords =
new ConsumerRecords<>(Collections.singletonMap(topicPartition, ImmutableList.of(record1, record2, record3)));
commandTopic = new CommandTopic(COMMAND_TOPIC_NAME, commandConsumer, commandTopicBackup);
when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 0L));
}

@Test
Expand Down Expand Up @@ -128,8 +129,9 @@ public void shouldNotGetCommandsWhenCommandTopicCorruptionIhBackupInRestore() {
record1,
record2))
.thenReturn(someConsumerRecords(
record3))
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
record3));
when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 3L));
when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L);
doNothing().doThrow(new CommandTopicCorruptionException("error")).when(commandTopicBackup).writeRecord(any());

// When:
Expand All @@ -153,8 +155,9 @@ public void shouldGetRestoreCommandsCorrectly() {
record1,
record2))
.thenReturn(someConsumerRecords(
record3))
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
record3));
when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 3L));
when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 2L, 3L);

// When:
final List<QueuedCommand> queuedCommandList = commandTopic
Expand All @@ -178,8 +181,9 @@ public void shouldHaveOffsetsInQueuedCommands() {
new ConsumerRecord<>("topic", 0, 0, commandId1, command1),
new ConsumerRecord<>("topic", 0, 1, commandId2, command2)))
.thenReturn(someConsumerRecords(
new ConsumerRecord<>("topic", 0, 2, commandId3, command3)))
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
new ConsumerRecord<>("topic", 0, 2, commandId3, command3)));
when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 3L));
when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 2L, 3L);

// When:
final List<QueuedCommand> queuedCommandList = commandTopic
Expand All @@ -195,6 +199,27 @@ public void shouldHaveOffsetsInQueuedCommands() {
new QueuedCommand(commandId3, command3, Optional.empty(), 2L))));
}

@Test
public void shouldGetRestoreCommandsCorrectlyOnPollTimeout() {
// Given:
when(commandConsumer.poll(any(Duration.class)))
.thenReturn(ConsumerRecords.empty())
.thenReturn(someConsumerRecords(
new ConsumerRecord<>("topic", 0, 0, commandId1, command1),
new ConsumerRecord<>("topic", 0, 1, commandId2, command2)));
when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 2L));
when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 0L, 2L);

// When:
final List<QueuedCommand> queuedCommandList = commandTopic
.getRestoreCommands(Duration.ofMillis(1));

// Then:
assertThat(queuedCommandList, equalTo(ImmutableList.of(
new QueuedCommand(commandId1, command1, Optional.empty(), 0L),
new QueuedCommand(commandId2, command2, Optional.empty(), 1L))));
}

@Test
public void shouldGetRestoreCommandsCorrectlyWithDuplicateKeys() {
// Given:
Expand All @@ -204,8 +229,9 @@ public void shouldGetRestoreCommandsCorrectlyWithDuplicateKeys() {
new ConsumerRecord<>("topic", 0, 1, commandId2, command2)))
.thenReturn(someConsumerRecords(
new ConsumerRecord<>("topic", 0, 2, commandId2, command3),
new ConsumerRecord<>("topic", 0, 3, commandId3, command3)))
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
new ConsumerRecord<>("topic", 0, 3, commandId3, command3)));
when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 4L));
when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 2L, 4L);

// When:
final List<QueuedCommand> queuedCommandList = commandTopic
Expand All @@ -227,8 +253,10 @@ public void shouldFilterNullCommandsWhileRestoringCommands() {
record1,
record2,
new ConsumerRecord<>("topic", 0, 2, commandId2, null)
))
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
));
when(commandConsumer.endOffsets(any()))
.thenReturn(Collections.singletonMap(TOPIC_PARTITION, 2L));
when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 2L);

// When:
final List<QueuedCommand> recordList = commandTopic
Expand Down Expand Up @@ -262,10 +290,9 @@ public void shouldCloseAllResources() {
public void shouldHaveAllCreateCommandsInOrder() {
// Given:
final ConsumerRecords<byte[], byte[]> records = someConsumerRecords(record1, record2, record3);

when(commandTopic.getNewCommands(any()))
.thenReturn(records)
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
when(commandTopic.getNewCommands(any())).thenReturn(records);
when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 3L));
when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 3L);

// When:
final List<QueuedCommand> commands = commandTopic.getRestoreCommands(Duration.ofMillis(10));
Expand Down Expand Up @@ -300,9 +327,9 @@ public void shouldCloseCommandTopicBackup() {
public void shouldBackupRestoreCommands() {
// Given
when(commandConsumer.poll(any(Duration.class)))
.thenReturn(someConsumerRecords(record1, record2))
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
commandTopic.start();
.thenReturn(someConsumerRecords(record1, record2));
when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 2L));
when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 2L);

// When
commandTopic.getRestoreCommands(Duration.ofHours(1));
Expand Down