From 28a7ba9a3d05a821eea702dcfaff046463a5394c Mon Sep 17 00:00:00 2001 From: Rohan Date: Tue, 23 Feb 2021 11:53:29 -0800 Subject: [PATCH] fix: get all available restore commands even on poll timeout (#6985) our restore logic differentiates on restore behaviour vs live behaviour to avoid making some newly added referential integrity checks and therefore ensure we execute anything that was already logged. While this isn't the best way to guarantee compatibility, it's what we're doing now and we have a problem: sometimes the restore poll times out and returns no records, so we don't execute any restore phase. This patch fixes this by first sampling the end offset of the log and ensuring that all rows between the begin and end are returned. --- .../ksql/rest/server/CommandTopic.java | 13 ++-- .../ksql/rest/server/CommandTopicTest.java | 61 +++++++++++++------ 2 files changed, 51 insertions(+), 23 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java index f65bcb13b48f..f4a7a422a6e1 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java @@ -104,14 +104,16 @@ public Iterable> getNewCommands(final Duration ti public List getRestoreCommands(final Duration duration) { final List restoreCommands = Lists.newArrayList(); + final long endOffset = getEndOffset(); + commandConsumer.seekToBeginning( Collections.singletonList(commandTopicPartition)); - log.debug("Reading prior command records"); - ConsumerRecords records = - commandConsumer.poll(duration); - while (!records.isEmpty()) { - log.debug("Received {} records from poll", records.count()); + log.info("Reading prior command records up to offset {}", endOffset); + + while (commandConsumer.position(commandTopicPartition) < endOffset) { + final ConsumerRecords records = commandConsumer.poll(duration); + log.info("Received {} records from command topic restore poll", records.count()); for (final ConsumerRecord record : records) { try { backupRecord(record); @@ -132,7 +134,6 @@ public List getRestoreCommands(final Duration duration) { Optional.empty(), record.offset())); } - records = commandConsumer.poll(duration); } return restoreCommands; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicTest.java index be1bc79ef8ce..ff54c8ff79b3 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicTest.java @@ -91,6 +91,7 @@ public void setup() { consumerRecords = new ConsumerRecords<>(Collections.singletonMap(topicPartition, ImmutableList.of(record1, record2, record3))); commandTopic = new CommandTopic(COMMAND_TOPIC_NAME, commandConsumer, commandTopicBackup); + when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 0L)); } @Test @@ -128,8 +129,9 @@ public void shouldNotGetCommandsWhenCommandTopicCorruptionIhBackupInRestore() { record1, record2)) .thenReturn(someConsumerRecords( - record3)) - .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); + record3)); + when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 3L)); + when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L); doNothing().doThrow(new CommandTopicCorruptionException("error")).when(commandTopicBackup).writeRecord(any()); // When: @@ -153,8 +155,9 @@ public void shouldGetRestoreCommandsCorrectly() { record1, record2)) .thenReturn(someConsumerRecords( - record3)) - .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); + record3)); + when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 3L)); + when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 2L, 3L); // When: final List queuedCommandList = commandTopic @@ -178,8 +181,9 @@ public void shouldHaveOffsetsInQueuedCommands() { new ConsumerRecord<>("topic", 0, 0, commandId1, command1), new ConsumerRecord<>("topic", 0, 1, commandId2, command2))) .thenReturn(someConsumerRecords( - new ConsumerRecord<>("topic", 0, 2, commandId3, command3))) - .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); + new ConsumerRecord<>("topic", 0, 2, commandId3, command3))); + when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 3L)); + when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 2L, 3L); // When: final List queuedCommandList = commandTopic @@ -195,6 +199,27 @@ public void shouldHaveOffsetsInQueuedCommands() { new QueuedCommand(commandId3, command3, Optional.empty(), 2L)))); } + @Test + public void shouldGetRestoreCommandsCorrectlyOnPollTimeout() { + // Given: + when(commandConsumer.poll(any(Duration.class))) + .thenReturn(ConsumerRecords.empty()) + .thenReturn(someConsumerRecords( + new ConsumerRecord<>("topic", 0, 0, commandId1, command1), + new ConsumerRecord<>("topic", 0, 1, commandId2, command2))); + when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 2L)); + when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 0L, 2L); + + // When: + final List queuedCommandList = commandTopic + .getRestoreCommands(Duration.ofMillis(1)); + + // Then: + assertThat(queuedCommandList, equalTo(ImmutableList.of( + new QueuedCommand(commandId1, command1, Optional.empty(), 0L), + new QueuedCommand(commandId2, command2, Optional.empty(), 1L)))); + } + @Test public void shouldGetRestoreCommandsCorrectlyWithDuplicateKeys() { // Given: @@ -204,8 +229,9 @@ public void shouldGetRestoreCommandsCorrectlyWithDuplicateKeys() { new ConsumerRecord<>("topic", 0, 1, commandId2, command2))) .thenReturn(someConsumerRecords( new ConsumerRecord<>("topic", 0, 2, commandId2, command3), - new ConsumerRecord<>("topic", 0, 3, commandId3, command3))) - .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); + new ConsumerRecord<>("topic", 0, 3, commandId3, command3))); + when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 4L)); + when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 2L, 4L); // When: final List queuedCommandList = commandTopic @@ -227,8 +253,10 @@ public void shouldFilterNullCommandsWhileRestoringCommands() { record1, record2, new ConsumerRecord<>("topic", 0, 2, commandId2, null) - )) - .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); + )); + when(commandConsumer.endOffsets(any())) + .thenReturn(Collections.singletonMap(TOPIC_PARTITION, 2L)); + when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 2L); // When: final List recordList = commandTopic @@ -262,10 +290,9 @@ public void shouldCloseAllResources() { public void shouldHaveAllCreateCommandsInOrder() { // Given: final ConsumerRecords records = someConsumerRecords(record1, record2, record3); - - when(commandTopic.getNewCommands(any())) - .thenReturn(records) - .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); + when(commandTopic.getNewCommands(any())).thenReturn(records); + when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 3L)); + when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 3L); // When: final List commands = commandTopic.getRestoreCommands(Duration.ofMillis(10)); @@ -300,9 +327,9 @@ public void shouldCloseCommandTopicBackup() { public void shouldBackupRestoreCommands() { // Given when(commandConsumer.poll(any(Duration.class))) - .thenReturn(someConsumerRecords(record1, record2)) - .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); - commandTopic.start(); + .thenReturn(someConsumerRecords(record1, record2)); + when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 2L)); + when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 2L); // When commandTopic.getRestoreCommands(Duration.ofHours(1));