diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/AvroTransformer.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/AvroTransformer.java index dd251669..a33b464e 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/AvroTransformer.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/AvroTransformer.java @@ -53,9 +53,9 @@ public void configureValueConverter(final Map config, final S3So @Override public Stream getRecords(final IOSupplier inputStreamIOSupplier, final String topic, - final int topicPartition, final S3SourceConfig s3SourceConfig) { + final int topicPartition, final S3SourceConfig s3SourceConfig, final long skipRecords) { final DatumReader datumReader = new GenericDatumReader<>(); - return readAvroRecordsAsStream(inputStreamIOSupplier, datumReader); + return readAvroRecordsAsStream(inputStreamIOSupplier, datumReader, skipRecords); } @Override @@ -65,13 +65,13 @@ public byte[] getValueBytes(final Object record, final String topic, final S3Sou } private Stream readAvroRecordsAsStream(final IOSupplier inputStreamIOSupplier, - final DatumReader datumReader) { + final DatumReader datumReader, final long skipRecords) { try (DataFileStream dataFileStream = new DataFileStream<>(inputStreamIOSupplier.get(), datumReader)) { // Wrap DataFileStream in a Stream using a Spliterator for lazy processing return StreamSupport.stream( Spliterators.spliteratorUnknownSize(dataFileStream, Spliterator.ORDERED | Spliterator.NONNULL), - false); + false).skip(skipRecords).map(record -> record); } catch (IOException e) { LOGGER.error("Error in DataFileStream: {}", e.getMessage(), e); return Stream.empty(); // Return an empty stream if initialization fails diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/ByteArrayTransformer.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/ByteArrayTransformer.java index 8e36cab8..3d006419 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/ByteArrayTransformer.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/ByteArrayTransformer.java @@ -40,7 +40,7 @@ public void configureValueConverter(final Map config, final S3So @Override public Stream getRecords(final IOSupplier inputStreamIOSupplier, final String topic, - final int topicPartition, final S3SourceConfig s3SourceConfig) { + final int topicPartition, final S3SourceConfig s3SourceConfig, final long skipRecords) { // Create a Stream that processes each chunk lazily return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) { @@ -62,7 +62,7 @@ public boolean tryAdvance(final java.util.function.Consumer acti return false; } } - }, false); + }, false).skip(skipRecords); } @Override diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/JsonTransformer.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/JsonTransformer.java index 80827fd8..541bc004 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/JsonTransformer.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/JsonTransformer.java @@ -51,8 +51,8 @@ public void configureValueConverter(final Map config, final S3So @Override public Stream getRecords(final IOSupplier inputStreamIOSupplier, final String topic, - final int topicPartition, final S3SourceConfig s3SourceConfig) { - return readJsonRecordsAsStream(inputStreamIOSupplier); + final int topicPartition, final S3SourceConfig s3SourceConfig, final long skipRecords) { + return readJsonRecordsAsStream(inputStreamIOSupplier, skipRecords); } @Override @@ -65,7 +65,8 @@ public byte[] getValueBytes(final Object record, final String topic, final S3Sou } } - private Stream readJsonRecordsAsStream(final IOSupplier inputStreamIOSupplier) { + private Stream readJsonRecordsAsStream(final IOSupplier inputStreamIOSupplier, + final long skipRecords) { // Use a Stream that lazily processes each line as a JSON object CustomSpliterator customSpliteratorParam; try { @@ -80,7 +81,7 @@ private Stream readJsonRecordsAsStream(final IOSupplier inp } catch (IOException e) { LOGGER.error("Error closing BufferedReader: {}", e.getMessage(), e); } - }); + }).skip(skipRecords); } /* diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/ParquetTransformer.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/ParquetTransformer.java index 48b0abd3..d806263c 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/ParquetTransformer.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/ParquetTransformer.java @@ -54,8 +54,8 @@ public void configureValueConverter(final Map config, final S3So @Override public Stream getRecords(final IOSupplier inputStreamIOSupplier, final String topic, - final int topicPartition, final S3SourceConfig s3SourceConfig) { - return getParquetStreamRecords(inputStreamIOSupplier, topic, topicPartition); + final int topicPartition, final S3SourceConfig s3SourceConfig, final long skipRecords) { + return getParquetStreamRecords(inputStreamIOSupplier, topic, topicPartition, skipRecords); } @Override @@ -65,7 +65,7 @@ public byte[] getValueBytes(final Object record, final String topic, final S3Sou } private Stream getParquetStreamRecords(final IOSupplier inputStreamIOSupplier, - final String topic, final int topicPartition) { + final String topic, final int topicPartition, final long skipRecords) { final String timestamp = String.valueOf(Instant.now().toEpochMilli()); File parquetFile; @@ -104,7 +104,7 @@ public boolean tryAdvance(final java.util.function.Consumer acti return false; } } - }, false).onClose(() -> { + }, false).skip(skipRecords).onClose(() -> { try { parquetReader.close(); // Ensure reader is closed when the stream is closed } catch (IOException e) { diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/Transformer.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/Transformer.java index 616cfdb7..6c900783 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/Transformer.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/input/Transformer.java @@ -29,7 +29,7 @@ public interface Transformer { void configureValueConverter(Map config, S3SourceConfig s3SourceConfig); Stream getRecords(IOSupplier inputStreamIOSupplier, String topic, int topicPartition, - S3SourceConfig s3SourceConfig); + S3SourceConfig s3SourceConfig, long skipRecords); byte[] getValueBytes(Object record, String topic, S3SourceConfig s3SourceConfig); } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/OffsetManager.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/OffsetManager.java index 2fc195f0..22c875ba 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/OffsetManager.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/OffsetManager.java @@ -85,6 +85,19 @@ public String getObjectMapKey(final String currentObjectKey) { return OBJECT_KEY + SEPARATOR + currentObjectKey; } + public long numberOfRecordsToSkipProcessing(final Map partitionMap, final String currentObjectKey) { + if (offsets.containsKey(partitionMap)) { + final Map offsetVal = offsets.get(partitionMap); + final String objectMapKey = getObjectMapKey(currentObjectKey); + + if (offsetVal.containsKey(objectMapKey)) { + return (long) offsetVal.get(objectMapKey); + } + return 0; + } + return 0; + } + public boolean shouldSkipRecord(final Map partitionMap, final String currentObjectKey, final long numOfProcessedRecs) { if (offsets.containsKey(partitionMap)) { diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java index 3a6c4081..58327322 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java @@ -131,29 +131,20 @@ private List readNext() { final byte[] keyBytes = currentObjectKey.getBytes(StandardCharsets.UTF_8); final List sourceRecords = new ArrayList<>(); - int numOfProcessedRecs = 1; - boolean checkOffsetMap = true; + final long numberOfRecsAlreadyProcessed = offsetManager.numberOfRecordsToSkipProcessing(partitionMap, + currentObjectKey); + try (Stream recordStream = transformer.getRecords(s3Object::getObjectContent, topic, - topicPartition, s3SourceConfig)) { + topicPartition, s3SourceConfig, numberOfRecsAlreadyProcessed)) { final Iterator recordIterator = recordStream.iterator(); while (recordIterator.hasNext()) { final Object record = recordIterator.next(); - // Check if the record should be skipped based on the offset - if (offsetManager.shouldSkipRecord(partitionMap, currentObjectKey, numOfProcessedRecs) - && checkOffsetMap) { - numOfProcessedRecs++; - continue; - } - final byte[] valueBytes = transformer.getValueBytes(record, topic, s3SourceConfig); - checkOffsetMap = false; sourceRecords.add(getSourceRecord(keyBytes, valueBytes, topic, topicPartition, offsetManager, startOffset, partitionMap)); - numOfProcessedRecs++; - // Break if we have reached the max records per poll if (sourceRecords.size() >= s3SourceConfig.getInt(MAX_POLL_RECORDS)) { break; diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/input/ByteArrayTransformerTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/input/ByteArrayTransformerTest.java index 2486cfad..91a9eacc 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/input/ByteArrayTransformerTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/input/ByteArrayTransformerTest.java @@ -54,7 +54,7 @@ void testGetRecordsSingleChunk() { final IOSupplier inputStreamIOSupplier = () -> inputStream; final Stream records = byteArrayTransformer.getRecords(inputStreamIOSupplier, TEST_TOPIC, 0, - s3SourceConfig); + s3SourceConfig, 0L); final List recs = records.collect(Collectors.toList()); assertThat(recs).hasSize(1); @@ -68,7 +68,7 @@ void testGetRecordsEmptyInputStream() { final IOSupplier inputStreamIOSupplier = () -> inputStream; final Stream records = byteArrayTransformer.getRecords(inputStreamIOSupplier, TEST_TOPIC, 0, - s3SourceConfig); + s3SourceConfig, 0L); assertThat(records).hasSize(0); } diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/input/JsonTransformerTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/input/JsonTransformerTest.java index bdf4780d..a895cc30 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/input/JsonTransformerTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/input/JsonTransformerTest.java @@ -18,7 +18,6 @@ import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.SCHEMAS_ENABLE; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -64,7 +63,7 @@ void testConfigureValueConverter() { final Map config = new HashMap<>(); jsonTransformer.configureValueConverter(config, s3SourceConfig); - assertEquals("false", config.get(SCHEMAS_ENABLE), "SCHEMAS_ENABLE should be set to false"); + assertThat(config.get(SCHEMAS_ENABLE)).as("SCHEMAS_ENABLE should be set to false").isEqualTo("false"); } @Test @@ -72,8 +71,8 @@ void testHandleValueDataWithValidJson() { final InputStream validJsonInputStream = new ByteArrayInputStream( "{\"key\":\"value\"}".getBytes(StandardCharsets.UTF_8)); final IOSupplier inputStreamIOSupplier = () -> validJsonInputStream; - final Stream jsonNodes = jsonTransformer.getRecords(inputStreamIOSupplier, TESTTOPIC, 1, - s3SourceConfig); + final Stream jsonNodes = jsonTransformer.getRecords(inputStreamIOSupplier, TESTTOPIC, 1, s3SourceConfig, + 0L); assertThat(jsonNodes.collect(Collectors.toList())).hasSize(1); } @@ -84,8 +83,8 @@ void testHandleValueDataWithInvalidJson() { "invalid-json".getBytes(StandardCharsets.UTF_8)); final IOSupplier inputStreamIOSupplier = () -> invalidJsonInputStream; - final Stream jsonNodes = jsonTransformer.getRecords(inputStreamIOSupplier, TESTTOPIC, 1, - s3SourceConfig); + final Stream jsonNodes = jsonTransformer.getRecords(inputStreamIOSupplier, TESTTOPIC, 1, s3SourceConfig, + 0L); assertThat(jsonNodes.collect(Collectors.toList())).hasSize(0); } @@ -95,8 +94,8 @@ void testSerializeJsonDataValid() throws IOException { final InputStream validJsonInputStream = new ByteArrayInputStream( "{\"key\":\"value\"}".getBytes(StandardCharsets.UTF_8)); final IOSupplier inputStreamIOSupplier = () -> validJsonInputStream; - final Stream jsonNodes = jsonTransformer.getRecords(inputStreamIOSupplier, TESTTOPIC, 1, - s3SourceConfig); + final Stream jsonNodes = jsonTransformer.getRecords(inputStreamIOSupplier, TESTTOPIC, 1, s3SourceConfig, + 0L); final byte[] serializedData = jsonTransformer.getValueBytes(jsonNodes.findFirst().get(), TESTTOPIC, s3SourceConfig); @@ -109,7 +108,7 @@ void testSerializeJsonDataValid() throws IOException { @Test void testGetRecordsWithIOException() throws IOException { when(inputStreamIOSupplierMock.get()).thenThrow(new IOException("Test IOException")); - final Stream resultStream = jsonTransformer.getRecords(inputStreamIOSupplierMock, "topic", 0, null); + final Stream resultStream = jsonTransformer.getRecords(inputStreamIOSupplierMock, "topic", 0, null, 0L); assertThat(resultStream).isEmpty(); } @@ -127,7 +126,8 @@ void testCustomSpliteratorStreamProcessing() throws IOException { @Test void testCustomSpliteratorWithIOExceptionDuringInitialization() throws IOException { when(inputStreamIOSupplierMock.get()).thenThrow(new IOException("Test IOException during initialization")); - final Stream resultStream = jsonTransformer.getRecords(inputStreamIOSupplierMock, "topic", 0, null); + final Stream resultStream = jsonTransformer.getRecords(inputStreamIOSupplierMock, "topic", + 0, null, 0L); assertThat(resultStream).isEmpty(); } diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/input/ParquetTransformerTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/input/ParquetTransformerTest.java index 08f46259..442e8d6c 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/input/ParquetTransformerTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/input/ParquetTransformerTest.java @@ -64,7 +64,7 @@ void testHandleValueDataWithZeroBytes() { final String topic = "test-topic"; final int topicPartition = 0; final Stream recs = parquetTransformer.getRecords(inputStreamIOSupplier, topic, topicPartition, - s3SourceConfig); + s3SourceConfig, 0L); assertThat(recs).isEmpty(); } @@ -80,7 +80,7 @@ void testGetRecordsWithValidData() throws Exception { final int topicPartition = 0; final List records = parquetTransformer - .getRecords(inputStreamIOSupplier, topic, topicPartition, s3SourceConfig) + .getRecords(inputStreamIOSupplier, topic, topicPartition, s3SourceConfig, 0L) .collect(Collectors.toList()); assertThat(records).isNotEmpty(); @@ -101,7 +101,7 @@ void testGetRecordsWithInvalidData() { final int topicPartition = 0; final Stream records = parquetTransformer.getRecords(inputStreamIOSupplier, topic, topicPartition, - s3SourceConfig); + s3SourceConfig, 0L); assertThat(records).isEmpty(); } @@ -127,7 +127,7 @@ void testIOExceptionCreatingTempFile() { final IOSupplier inputStreamSupplier = mock(IOSupplier.class); final Stream resultStream = parquetTransformer.getRecords(inputStreamSupplier, "test-topic", 1, - null); + null, 0L); assertThat(resultStream).isEmpty(); } @@ -140,7 +140,7 @@ void testIOExceptionDuringDataCopy() throws IOException { final IOSupplier inputStreamSupplier = () -> inputStreamMock; final Stream resultStream = parquetTransformer.getRecords(inputStreamSupplier, "test-topic", 1, - null); + null, 0L); assertThat(resultStream).isEmpty(); } diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessorTest.java index cc7d765c..b299bd81 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessorTest.java @@ -17,7 +17,6 @@ package io.aiven.kafka.connect.s3.source.utils; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; @@ -90,7 +89,7 @@ void testProcessRecordsNoRecords() { transformer, fileReader, offsetManager ); - assertTrue(processedRecords.isEmpty(), "Processed records should be empty when there are no records."); + assertThat(processedRecords.isEmpty()).as("Processed records should be empty when there are no records.").isTrue(); } @Test @@ -132,7 +131,7 @@ void testProcessRecordsConnectorStopped() { transformer, fileReader, offsetManager ); - assertTrue(processedRecords.isEmpty(), "Processed records should be empty when connector is stopped."); + assertThat(processedRecords.isEmpty()).as("Processed records should be empty when connector is stopped.").isTrue(); verify(sourceRecordIterator, never()).next(); } diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java index e6ba4475..39baf9e7 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java @@ -16,10 +16,8 @@ package io.aiven.kafka.connect.s3.source.utils; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyString; @@ -79,7 +77,8 @@ void testIteratorProcessesS3Objects() throws Exception { when(mockS3Client.getObject(anyString(), anyString())).thenReturn(mockS3Object); when(mockS3Object.getObjectContent()).thenReturn(mockInputStream); - when(mockTransformer.getRecords(any(), anyString(), anyInt(), any())).thenReturn(Stream.of(new Object())); + when(mockTransformer.getRecords(any(), anyString(), anyInt(), any(), anyLong())) + .thenReturn(Stream.of(new Object())); final String outStr = "this is a test"; when(mockTransformer.getValueBytes(any(), anyString(), any())) @@ -91,16 +90,16 @@ void testIteratorProcessesS3Objects() throws Exception { SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, mockS3Client, "test-bucket", mockOffsetManager, mockTransformer, mockFileReader); - assertFalse(iterator.hasNext()); - assertNull(iterator.next()); + assertThat(iterator.hasNext()).isFalse(); + assertThat(iterator.next()).isNull(); when(mockFileReader.fetchObjectSummaries(any())).thenReturn(mockObjectSummaries.listIterator()); iterator = new SourceRecordIterator(mockConfig, mockS3Client, "test-bucket", mockOffsetManager, mockTransformer, mockFileReader); - assertTrue(iterator.hasNext()); - assertNotNull(iterator.next()); + assertThat(iterator.hasNext()).isTrue(); + assertThat(iterator.next()).isNotNull(); } }