Skip to content

Commit

Permalink
Skip records in a file if already processed - dr
Browse files Browse the repository at this point in the history
  • Loading branch information
muralibasani committed Nov 25, 2024
1 parent a31b0ca commit 1ec7a7a
Show file tree
Hide file tree
Showing 12 changed files with 60 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ public void configureValueConverter(final Map<String, String> config, final S3So

@Override
public Stream<Object> getRecords(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final int topicPartition, final S3SourceConfig s3SourceConfig) {
final int topicPartition, final S3SourceConfig s3SourceConfig, final long skipRecords) {
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
return readAvroRecordsAsStream(inputStreamIOSupplier, datumReader);
return readAvroRecordsAsStream(inputStreamIOSupplier, datumReader, skipRecords);
}

@Override
Expand All @@ -65,13 +65,13 @@ public byte[] getValueBytes(final Object record, final String topic, final S3Sou
}

private Stream<Object> readAvroRecordsAsStream(final IOSupplier<InputStream> inputStreamIOSupplier,
final DatumReader<GenericRecord> datumReader) {
final DatumReader<GenericRecord> datumReader, final long skipRecords) {
try (DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(inputStreamIOSupplier.get(),
datumReader)) {
// Wrap DataFileStream in a Stream using a Spliterator for lazy processing
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(dataFileStream, Spliterator.ORDERED | Spliterator.NONNULL),
false);
false).skip(skipRecords).map(record -> record);
} catch (IOException e) {
LOGGER.error("Error in DataFileStream: {}", e.getMessage(), e);
return Stream.empty(); // Return an empty stream if initialization fails
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void configureValueConverter(final Map<String, String> config, final S3So

@Override
public Stream<Object> getRecords(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final int topicPartition, final S3SourceConfig s3SourceConfig) {
final int topicPartition, final S3SourceConfig s3SourceConfig, final long skipRecords) {

// Create a Stream that processes each chunk lazily
return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) {
Expand All @@ -62,7 +62,7 @@ public boolean tryAdvance(final java.util.function.Consumer<? super Object> acti
return false;
}
}
}, false);
}, false).skip(skipRecords);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ public void configureValueConverter(final Map<String, String> config, final S3So

@Override
public Stream<Object> getRecords(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final int topicPartition, final S3SourceConfig s3SourceConfig) {
return readJsonRecordsAsStream(inputStreamIOSupplier);
final int topicPartition, final S3SourceConfig s3SourceConfig, final long skipRecords) {
return readJsonRecordsAsStream(inputStreamIOSupplier, skipRecords);
}

@Override
Expand All @@ -65,7 +65,8 @@ public byte[] getValueBytes(final Object record, final String topic, final S3Sou
}
}

private Stream<Object> readJsonRecordsAsStream(final IOSupplier<InputStream> inputStreamIOSupplier) {
private Stream<Object> readJsonRecordsAsStream(final IOSupplier<InputStream> inputStreamIOSupplier,
final long skipRecords) {
// Use a Stream that lazily processes each line as a JSON object
CustomSpliterator customSpliteratorParam;
try {
Expand All @@ -80,7 +81,7 @@ private Stream<Object> readJsonRecordsAsStream(final IOSupplier<InputStream> inp
} catch (IOException e) {
LOGGER.error("Error closing BufferedReader: {}", e.getMessage(), e);
}
});
}).skip(skipRecords);
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public void configureValueConverter(final Map<String, String> config, final S3So

@Override
public Stream<Object> getRecords(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final int topicPartition, final S3SourceConfig s3SourceConfig) {
return getParquetStreamRecords(inputStreamIOSupplier, topic, topicPartition);
final int topicPartition, final S3SourceConfig s3SourceConfig, final long skipRecords) {
return getParquetStreamRecords(inputStreamIOSupplier, topic, topicPartition, skipRecords);
}

@Override
Expand All @@ -65,7 +65,7 @@ public byte[] getValueBytes(final Object record, final String topic, final S3Sou
}

private Stream<Object> getParquetStreamRecords(final IOSupplier<InputStream> inputStreamIOSupplier,
final String topic, final int topicPartition) {
final String topic, final int topicPartition, final long skipRecords) {
final String timestamp = String.valueOf(Instant.now().toEpochMilli());
File parquetFile;

Expand Down Expand Up @@ -104,7 +104,7 @@ public boolean tryAdvance(final java.util.function.Consumer<? super Object> acti
return false;
}
}
}, false).onClose(() -> {
}, false).skip(skipRecords).onClose(() -> {
try {
parquetReader.close(); // Ensure reader is closed when the stream is closed
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public interface Transformer {
void configureValueConverter(Map<String, String> config, S3SourceConfig s3SourceConfig);

Stream<Object> getRecords(IOSupplier<InputStream> inputStreamIOSupplier, String topic, int topicPartition,
S3SourceConfig s3SourceConfig);
S3SourceConfig s3SourceConfig, long skipRecords);

byte[] getValueBytes(Object record, String topic, S3SourceConfig s3SourceConfig);
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,19 @@ public String getObjectMapKey(final String currentObjectKey) {
return OBJECT_KEY + SEPARATOR + currentObjectKey;
}

public long numberOfRecordsToSkipProcessing(final Map<String, Object> partitionMap, final String currentObjectKey) {
if (offsets.containsKey(partitionMap)) {
final Map<String, Object> offsetVal = offsets.get(partitionMap);
final String objectMapKey = getObjectMapKey(currentObjectKey);

if (offsetVal.containsKey(objectMapKey)) {
return (long) offsetVal.get(objectMapKey);
}
return 0;
}
return 0;
}

public boolean shouldSkipRecord(final Map<String, Object> partitionMap, final String currentObjectKey,
final long numOfProcessedRecs) {
if (offsets.containsKey(partitionMap)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,29 +131,20 @@ private List<S3SourceRecord> readNext() {
final byte[] keyBytes = currentObjectKey.getBytes(StandardCharsets.UTF_8);
final List<S3SourceRecord> sourceRecords = new ArrayList<>();

int numOfProcessedRecs = 1;
boolean checkOffsetMap = true;
final long numberOfRecsAlreadyProcessed = offsetManager.numberOfRecordsToSkipProcessing(partitionMap,
currentObjectKey);

try (Stream<Object> recordStream = transformer.getRecords(s3Object::getObjectContent, topic,
topicPartition, s3SourceConfig)) {
topicPartition, s3SourceConfig, numberOfRecsAlreadyProcessed)) {
final Iterator<Object> recordIterator = recordStream.iterator();
while (recordIterator.hasNext()) {
final Object record = recordIterator.next();

// Check if the record should be skipped based on the offset
if (offsetManager.shouldSkipRecord(partitionMap, currentObjectKey, numOfProcessedRecs)
&& checkOffsetMap) {
numOfProcessedRecs++;
continue;
}

final byte[] valueBytes = transformer.getValueBytes(record, topic, s3SourceConfig);
checkOffsetMap = false;

sourceRecords.add(getSourceRecord(keyBytes, valueBytes, topic, topicPartition, offsetManager,
startOffset, partitionMap));

numOfProcessedRecs++;

// Break if we have reached the max records per poll
if (sourceRecords.size() >= s3SourceConfig.getInt(MAX_POLL_RECORDS)) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void testGetRecordsSingleChunk() {
final IOSupplier<InputStream> inputStreamIOSupplier = () -> inputStream;

final Stream<Object> records = byteArrayTransformer.getRecords(inputStreamIOSupplier, TEST_TOPIC, 0,
s3SourceConfig);
s3SourceConfig, 0L);

final List<Object> recs = records.collect(Collectors.toList());
assertThat(recs).hasSize(1);
Expand All @@ -68,7 +68,7 @@ void testGetRecordsEmptyInputStream() {
final IOSupplier<InputStream> inputStreamIOSupplier = () -> inputStream;

final Stream<Object> records = byteArrayTransformer.getRecords(inputStreamIOSupplier, TEST_TOPIC, 0,
s3SourceConfig);
s3SourceConfig, 0L);

assertThat(records).hasSize(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.SCHEMAS_ENABLE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -64,16 +63,16 @@ void testConfigureValueConverter() {
final Map<String, String> config = new HashMap<>();

jsonTransformer.configureValueConverter(config, s3SourceConfig);
assertEquals("false", config.get(SCHEMAS_ENABLE), "SCHEMAS_ENABLE should be set to false");
assertThat(config.get(SCHEMAS_ENABLE)).as("SCHEMAS_ENABLE should be set to false").isEqualTo("false");
}

@Test
void testHandleValueDataWithValidJson() {
final InputStream validJsonInputStream = new ByteArrayInputStream(
"{\"key\":\"value\"}".getBytes(StandardCharsets.UTF_8));
final IOSupplier<InputStream> inputStreamIOSupplier = () -> validJsonInputStream;
final Stream<Object> jsonNodes = jsonTransformer.getRecords(inputStreamIOSupplier, TESTTOPIC, 1,
s3SourceConfig);
final Stream<Object> jsonNodes = jsonTransformer.getRecords(inputStreamIOSupplier, TESTTOPIC, 1, s3SourceConfig,
0L);

assertThat(jsonNodes.collect(Collectors.toList())).hasSize(1);
}
Expand All @@ -84,8 +83,8 @@ void testHandleValueDataWithInvalidJson() {
"invalid-json".getBytes(StandardCharsets.UTF_8));
final IOSupplier<InputStream> inputStreamIOSupplier = () -> invalidJsonInputStream;

final Stream<Object> jsonNodes = jsonTransformer.getRecords(inputStreamIOSupplier, TESTTOPIC, 1,
s3SourceConfig);
final Stream<Object> jsonNodes = jsonTransformer.getRecords(inputStreamIOSupplier, TESTTOPIC, 1, s3SourceConfig,
0L);

assertThat(jsonNodes.collect(Collectors.toList())).hasSize(0);
}
Expand All @@ -95,8 +94,8 @@ void testSerializeJsonDataValid() throws IOException {
final InputStream validJsonInputStream = new ByteArrayInputStream(
"{\"key\":\"value\"}".getBytes(StandardCharsets.UTF_8));
final IOSupplier<InputStream> inputStreamIOSupplier = () -> validJsonInputStream;
final Stream<Object> jsonNodes = jsonTransformer.getRecords(inputStreamIOSupplier, TESTTOPIC, 1,
s3SourceConfig);
final Stream<Object> jsonNodes = jsonTransformer.getRecords(inputStreamIOSupplier, TESTTOPIC, 1, s3SourceConfig,
0L);
final byte[] serializedData = jsonTransformer.getValueBytes(jsonNodes.findFirst().get(), TESTTOPIC,
s3SourceConfig);

Expand All @@ -109,7 +108,7 @@ void testSerializeJsonDataValid() throws IOException {
@Test
void testGetRecordsWithIOException() throws IOException {
when(inputStreamIOSupplierMock.get()).thenThrow(new IOException("Test IOException"));
final Stream<Object> resultStream = jsonTransformer.getRecords(inputStreamIOSupplierMock, "topic", 0, null);
final Stream<Object> resultStream = jsonTransformer.getRecords(inputStreamIOSupplierMock, "topic", 0, null, 0L);

assertThat(resultStream).isEmpty();
}
Expand All @@ -127,7 +126,8 @@ void testCustomSpliteratorStreamProcessing() throws IOException {
@Test
void testCustomSpliteratorWithIOExceptionDuringInitialization() throws IOException {
when(inputStreamIOSupplierMock.get()).thenThrow(new IOException("Test IOException during initialization"));
final Stream<Object> resultStream = jsonTransformer.getRecords(inputStreamIOSupplierMock, "topic", 0, null);
final Stream<Object> resultStream = jsonTransformer.getRecords(inputStreamIOSupplierMock, "topic",
0, null, 0L);

assertThat(resultStream).isEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void testHandleValueDataWithZeroBytes() {
final String topic = "test-topic";
final int topicPartition = 0;
final Stream<Object> recs = parquetTransformer.getRecords(inputStreamIOSupplier, topic, topicPartition,
s3SourceConfig);
s3SourceConfig, 0L);

assertThat(recs).isEmpty();
}
Expand All @@ -80,7 +80,7 @@ void testGetRecordsWithValidData() throws Exception {
final int topicPartition = 0;

final List<Object> records = parquetTransformer
.getRecords(inputStreamIOSupplier, topic, topicPartition, s3SourceConfig)
.getRecords(inputStreamIOSupplier, topic, topicPartition, s3SourceConfig, 0L)
.collect(Collectors.toList());

assertThat(records).isNotEmpty();
Expand All @@ -101,7 +101,7 @@ void testGetRecordsWithInvalidData() {
final int topicPartition = 0;

final Stream<Object> records = parquetTransformer.getRecords(inputStreamIOSupplier, topic, topicPartition,
s3SourceConfig);
s3SourceConfig, 0L);
assertThat(records).isEmpty();
}

Expand All @@ -127,7 +127,7 @@ void testIOExceptionCreatingTempFile() {

final IOSupplier<InputStream> inputStreamSupplier = mock(IOSupplier.class);
final Stream<Object> resultStream = parquetTransformer.getRecords(inputStreamSupplier, "test-topic", 1,
null);
null, 0L);

assertThat(resultStream).isEmpty();
}
Expand All @@ -140,7 +140,7 @@ void testIOExceptionDuringDataCopy() throws IOException {

final IOSupplier<InputStream> inputStreamSupplier = () -> inputStreamMock;
final Stream<Object> resultStream = parquetTransformer.getRecords(inputStreamSupplier, "test-topic", 1,
null);
null, 0L);

assertThat(resultStream).isEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package io.aiven.kafka.connect.s3.source.utils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -90,7 +89,7 @@ void testProcessRecordsNoRecords() {
transformer, fileReader, offsetManager
);

assertTrue(processedRecords.isEmpty(), "Processed records should be empty when there are no records.");
assertThat(processedRecords.isEmpty()).as("Processed records should be empty when there are no records.").isTrue();
}

@Test
Expand Down Expand Up @@ -132,7 +131,7 @@ void testProcessRecordsConnectorStopped() {
transformer, fileReader, offsetManager
);

assertTrue(processedRecords.isEmpty(), "Processed records should be empty when connector is stopped.");
assertThat(processedRecords.isEmpty()).as("Processed records should be empty when connector is stopped.").isTrue();
verify(sourceRecordIterator, never()).next();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@

package io.aiven.kafka.connect.s3.source.utils;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyString;
Expand Down Expand Up @@ -79,7 +77,8 @@ void testIteratorProcessesS3Objects() throws Exception {
when(mockS3Client.getObject(anyString(), anyString())).thenReturn(mockS3Object);
when(mockS3Object.getObjectContent()).thenReturn(mockInputStream);

when(mockTransformer.getRecords(any(), anyString(), anyInt(), any())).thenReturn(Stream.of(new Object()));
when(mockTransformer.getRecords(any(), anyString(), anyInt(), any(), anyLong()))
.thenReturn(Stream.of(new Object()));

final String outStr = "this is a test";
when(mockTransformer.getValueBytes(any(), anyString(), any()))
Expand All @@ -91,16 +90,16 @@ void testIteratorProcessesS3Objects() throws Exception {
SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, mockS3Client, "test-bucket",
mockOffsetManager, mockTransformer, mockFileReader);

assertFalse(iterator.hasNext());
assertNull(iterator.next());
assertThat(iterator.hasNext()).isFalse();
assertThat(iterator.next()).isNull();

when(mockFileReader.fetchObjectSummaries(any())).thenReturn(mockObjectSummaries.listIterator());

iterator = new SourceRecordIterator(mockConfig, mockS3Client, "test-bucket", mockOffsetManager,
mockTransformer, mockFileReader);

assertTrue(iterator.hasNext());
assertNotNull(iterator.next());
assertThat(iterator.hasNext()).isTrue();
assertThat(iterator.next()).isNotNull();
}
}

Expand Down

0 comments on commit 1ec7a7a

Please sign in to comment.