Skip to content

Commit

Permalink
FileReader improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
AnatolyPopov committed Nov 8, 2024
1 parent 5084cbf commit 7aecceb
Showing 1 changed file with 19 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,20 @@

import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.FETCH_PAGE_SIZE;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileReader {

private static final Logger LOGGER = LoggerFactory.getLogger(FileReader.class);
public static final int PAGE_SIZE_FACTOR = 2;
private final S3SourceConfig s3SourceConfig;
private final String bucketName;
Expand All @@ -50,60 +45,27 @@ public FileReader(final S3SourceConfig s3SourceConfig, final String bucketName,
this.failedObjectKeys = new HashSet<>(failedObjectKeys);
}

@SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
Iterator<S3ObjectSummary> fetchObjectSummaries(final AmazonS3 s3Client) {
return new Iterator<>() {
private String continuationToken = null; // NOPMD
private List<S3ObjectSummary> currentBatch = new ArrayList<>();
private int currentIndex = 0; // NOPMD
private boolean isTruncated = true;

@Override
public boolean hasNext() {
// If there are unprocessed objects in the current batch, we return true
if (currentIndex < currentBatch.size()) {
return true;
}

if (isTruncated) {
fetchNextBatch();
return !currentBatch.isEmpty();
}

return false;
}

@Override
public S3ObjectSummary next() {
if (!hasNext()) {
return null;
}

return currentBatch.get(currentIndex++);
}

private void fetchNextBatch() {
currentBatch.clear();
currentIndex = 0;

final ListObjectsV2Request request = new ListObjectsV2Request().withBucketName(bucketName)
.withMaxKeys(s3SourceConfig.getInt(FETCH_PAGE_SIZE) * PAGE_SIZE_FACTOR)
.withContinuationToken(continuationToken);

final ListObjectsV2Result objectListing = s3Client.listObjectsV2(request);
currentBatch = objectListing.getObjectSummaries()
final ListObjectsV2Request request = new ListObjectsV2Request().withBucketName(bucketName)
.withMaxKeys(s3SourceConfig.getInt(FETCH_PAGE_SIZE) * PAGE_SIZE_FACTOR);

final Stream<S3ObjectSummary> s3ObjectStream = Stream
.iterate(s3Client.listObjectsV2(request), Objects::nonNull, response -> {
if (response.isTruncated()) {
return s3Client.listObjectsV2(new ListObjectsV2Request().withBucketName(bucketName)
.withMaxKeys(s3SourceConfig.getInt(FETCH_PAGE_SIZE) * PAGE_SIZE_FACTOR)
.withContinuationToken(response.getNextContinuationToken()));
} else {
return null;
}
})
.flatMap(response -> response.getObjectSummaries()
.stream()
.filter(objectSummary -> objectSummary.getSize() > 0)
.filter(objectSummary -> !failedObjectKeys.contains(objectSummary.getKey()))
.collect(Collectors.toList());

continuationToken = objectListing.getNextContinuationToken();
isTruncated = objectListing.isTruncated();

currentBatch.forEach(objSummary -> LOGGER.debug("Objects to be processed {} ", objSummary.getKey()));
}
};
.filter(objectSummary -> !failedObjectKeys.contains(objectSummary.getKey())));
return s3ObjectStream.iterator();
}

public void addFailedObjectKeys(final String objectKey) {
this.failedObjectKeys.add(objectKey);
}
Expand Down

0 comments on commit 7aecceb

Please sign in to comment.