diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java index f65bcb13b48f..f4a7a422a6e1 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java @@ -104,14 +104,16 @@ public Iterable> getNewCommands(final Duration ti public List getRestoreCommands(final Duration duration) { final List restoreCommands = Lists.newArrayList(); + final long endOffset = getEndOffset(); + commandConsumer.seekToBeginning( Collections.singletonList(commandTopicPartition)); - log.debug("Reading prior command records"); - ConsumerRecords records = - commandConsumer.poll(duration); - while (!records.isEmpty()) { - log.debug("Received {} records from poll", records.count()); + log.info("Reading prior command records up to offset {}", endOffset); + + while (commandConsumer.position(commandTopicPartition) < endOffset) { + final ConsumerRecords records = commandConsumer.poll(duration); + log.info("Received {} records from command topic restore poll", records.count()); for (final ConsumerRecord record : records) { try { backupRecord(record); @@ -132,7 +134,6 @@ public List getRestoreCommands(final Duration duration) { Optional.empty(), record.offset())); } - records = commandConsumer.poll(duration); } return restoreCommands; } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicTest.java index be1bc79ef8ce..ff54c8ff79b3 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicTest.java @@ -91,6 +91,7 @@ public void setup() { consumerRecords = new ConsumerRecords<>(Collections.singletonMap(topicPartition, ImmutableList.of(record1, record2, record3))); commandTopic = new CommandTopic(COMMAND_TOPIC_NAME, commandConsumer, commandTopicBackup); + when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 0L)); } @Test @@ -128,8 +129,9 @@ public void shouldNotGetCommandsWhenCommandTopicCorruptionIhBackupInRestore() { record1, record2)) .thenReturn(someConsumerRecords( - record3)) - .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); + record3)); + when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 3L)); + when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L); doNothing().doThrow(new CommandTopicCorruptionException("error")).when(commandTopicBackup).writeRecord(any()); // When: @@ -153,8 +155,9 @@ public void shouldGetRestoreCommandsCorrectly() { record1, record2)) .thenReturn(someConsumerRecords( - record3)) - .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); + record3)); + when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 3L)); + when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 2L, 3L); // When: final List queuedCommandList = commandTopic @@ -178,8 +181,9 @@ public void shouldHaveOffsetsInQueuedCommands() { new ConsumerRecord<>("topic", 0, 0, commandId1, command1), new ConsumerRecord<>("topic", 0, 1, commandId2, command2))) .thenReturn(someConsumerRecords( - new ConsumerRecord<>("topic", 0, 2, commandId3, command3))) - .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); + new ConsumerRecord<>("topic", 0, 2, commandId3, command3))); + when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 3L)); + when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 2L, 3L); // When: final List queuedCommandList = commandTopic @@ -195,6 +199,27 @@ public void shouldHaveOffsetsInQueuedCommands() { new QueuedCommand(commandId3, command3, Optional.empty(), 2L)))); } + @Test + public void shouldGetRestoreCommandsCorrectlyOnPollTimeout() { + // Given: + when(commandConsumer.poll(any(Duration.class))) + .thenReturn(ConsumerRecords.empty()) + .thenReturn(someConsumerRecords( + new ConsumerRecord<>("topic", 0, 0, commandId1, command1), + new ConsumerRecord<>("topic", 0, 1, commandId2, command2))); + when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 2L)); + when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 0L, 2L); + + // When: + final List queuedCommandList = commandTopic + .getRestoreCommands(Duration.ofMillis(1)); + + // Then: + assertThat(queuedCommandList, equalTo(ImmutableList.of( + new QueuedCommand(commandId1, command1, Optional.empty(), 0L), + new QueuedCommand(commandId2, command2, Optional.empty(), 1L)))); + } + @Test public void shouldGetRestoreCommandsCorrectlyWithDuplicateKeys() { // Given: @@ -204,8 +229,9 @@ public void shouldGetRestoreCommandsCorrectlyWithDuplicateKeys() { new ConsumerRecord<>("topic", 0, 1, commandId2, command2))) .thenReturn(someConsumerRecords( new ConsumerRecord<>("topic", 0, 2, commandId2, command3), - new ConsumerRecord<>("topic", 0, 3, commandId3, command3))) - .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); + new ConsumerRecord<>("topic", 0, 3, commandId3, command3))); + when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 4L)); + when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 2L, 4L); // When: final List queuedCommandList = commandTopic @@ -227,8 +253,10 @@ public void shouldFilterNullCommandsWhileRestoringCommands() { record1, record2, new ConsumerRecord<>("topic", 0, 2, commandId2, null) - )) - .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); + )); + when(commandConsumer.endOffsets(any())) + .thenReturn(Collections.singletonMap(TOPIC_PARTITION, 2L)); + when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 2L); // When: final List recordList = commandTopic @@ -262,10 +290,9 @@ public void shouldCloseAllResources() { public void shouldHaveAllCreateCommandsInOrder() { // Given: final ConsumerRecords records = someConsumerRecords(record1, record2, record3); - - when(commandTopic.getNewCommands(any())) - .thenReturn(records) - .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); + when(commandTopic.getNewCommands(any())).thenReturn(records); + when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 3L)); + when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 3L); // When: final List commands = commandTopic.getRestoreCommands(Duration.ofMillis(10)); @@ -300,9 +327,9 @@ public void shouldCloseCommandTopicBackup() { public void shouldBackupRestoreCommands() { // Given when(commandConsumer.poll(any(Duration.class))) - .thenReturn(someConsumerRecords(record1, record2)) - .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); - commandTopic.start(); + .thenReturn(someConsumerRecords(record1, record2)); + when(commandConsumer.endOffsets(any())).thenReturn(ImmutableMap.of(TOPIC_PARTITION, 2L)); + when(commandConsumer.position(TOPIC_PARTITION)).thenReturn(0L, 2L); // When commandTopic.getRestoreCommands(Duration.ofHours(1));