From 4a4a57171aef14bf247543811bbe103a5d90b63d Mon Sep 17 00:00:00 2001 From: Aindriu Lavelle Date: Tue, 29 Oct 2024 11:40:01 +0000 Subject: [PATCH] Reduce S3 memory usage by clearing records already sent to S3 Signed-off-by: Aindriu Lavelle --- .../common/grouper/GroupedSinkRecord.java | 77 +++++++++++++++++++ .../KeyAndTopicPartitionRecordGrouper.java | 6 ++ .../common/grouper/KeyRecordGrouper.java | 6 ++ .../connect/common/grouper/RecordGrouper.java | 10 ++- .../TopicPartitionKeyRecordGrouper.java | 44 +++++++---- .../grouper/TopicPartitionRecordGrouper.java | 47 +++++++---- .../io/aiven/kafka/connect/s3/S3SinkTask.java | 14 ++-- 7 files changed, 166 insertions(+), 38 deletions(-) create mode 100644 commons/src/main/java/io/aiven/kafka/connect/common/grouper/GroupedSinkRecord.java diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/GroupedSinkRecord.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/GroupedSinkRecord.java new file mode 100644 index 00000000..0c358cc9 --- /dev/null +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/GroupedSinkRecord.java @@ -0,0 +1,77 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.grouper; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.kafka.connect.sink.SinkRecord; + +public class GroupedSinkRecord { + + private int numberOfRecords; + final private List sinkRecords; + final private String filename; + final private long recordCreationDate = System.currentTimeMillis(); + + public GroupedSinkRecord(final String filename) { + this.filename = filename; + sinkRecords = new ArrayList<>(); + numberOfRecords = 0; + } + + public GroupedSinkRecord(final String filename, final List sinkRecords) { + this.filename = filename; + this.sinkRecords = new ArrayList<>(sinkRecords); + numberOfRecords = sinkRecords.size(); + } + public GroupedSinkRecord(final String filename, final SinkRecord sinkRecord) { + this.filename = filename; + this.sinkRecords = new ArrayList<>(); + this.sinkRecords.add(sinkRecord); + numberOfRecords = 1; + } + + public void addSinkRecord(final SinkRecord sinkRecord) { + this.sinkRecords.add(sinkRecord); + this.numberOfRecords++; + } + + public List getSinkRecords() { + // Ensure access to the Sink Records can only be changed through the apis and not accidentally by another + // process. + return Collections.unmodifiableList(sinkRecords); + } + + public void removeSinkRecords(final List sinkRecords) { + this.sinkRecords.removeAll(sinkRecords); + } + + public int getNumberOfRecords() { + return numberOfRecords; + } + + public String getFilename() { + return filename; + } + + public long getRecordCreationDate() { + return recordCreationDate; + } + +} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyAndTopicPartitionRecordGrouper.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyAndTopicPartitionRecordGrouper.java index 43ce6c2f..b2402385 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyAndTopicPartitionRecordGrouper.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyAndTopicPartitionRecordGrouper.java @@ -103,6 +103,12 @@ public void clear() { fileBuffers.clear(); } + @Override + public void clearProcessedRecords(final String identifier, final List records) { + // One entry per file, so the entire file can be removed to reduce memory overhead. + fileBuffers.remove(identifier); + } + @Override public Map> records() { return Collections.unmodifiableMap(fileBuffers); diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyRecordGrouper.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyRecordGrouper.java index b9af899e..5ba409b8 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyRecordGrouper.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/KeyRecordGrouper.java @@ -90,6 +90,12 @@ public void clear() { fileBuffers.clear(); } + @Override + public void clearProcessedRecords(final String identifier, final List records) { + // One record per file, so remove the entry to reduce memory + fileBuffers.remove(identifier); + } + @Override public Map> records() { return Collections.unmodifiableMap(fileBuffers); diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouper.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouper.java index 1e0e6c18..2126e0a6 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouper.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouper.java @@ -38,10 +38,18 @@ public interface RecordGrouper { */ void clear(); + /** + * Clear processed records from memory + * + * @param records + * all records already processed to Sink + */ + void clearProcessedRecords(String identifier, List records); + /** * Get all records associated with files, grouped by the file name. * - * @return map of records assotiated with files + * @return map of records associated with files */ Map> records(); diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java index 55e09763..872ce00d 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouper.java @@ -17,13 +17,13 @@ package io.aiven.kafka.connect.common.grouper; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Schema; @@ -34,6 +34,8 @@ import io.aiven.kafka.connect.common.templating.Template; import io.aiven.kafka.connect.common.templating.VariableTemplatePart.Parameter; +import org.apache.commons.lang3.tuple.Pair; + public class TopicPartitionKeyRecordGrouper implements RecordGrouper { private static final Map TIMESTAMP_FORMATTERS = Map.of("yyyy", @@ -42,13 +44,13 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper { private final Template filenameTemplate; - private final Map currentHeadRecords = new HashMap<>(); + private final Map> currentHeadRecords = new HashMap<>(); - private final Map> fileBuffers = new HashMap<>(); + private final Map fileBuffers = new HashMap<>(); private final Function> setTimestampBasedOnRecord; - private final Rotator> rotator; + private final Rotator rotator; TopicPartitionKeyRecordGrouper(final Template filenameTemplate, final Integer maxRecordsPerFile, final TimestampSource tsSource) { @@ -64,7 +66,7 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper { if (unlimited) { return false; } else { - return buffer == null || buffer.size() >= maxRecordsPerFile; + return buffer == null || buffer.getNumberOfRecords() >= maxRecordsPerFile; } }; } @@ -73,7 +75,7 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper { public void put(final SinkRecord record) { Objects.requireNonNull(record, "record cannot be null"); final String recordKey = resolveRecordKeyFor(record); - fileBuffers.computeIfAbsent(recordKey, ignored -> new ArrayList<>()).add(record); + fileBuffers.computeIfAbsent(recordKey, ignored -> new GroupedSinkRecord(recordKey)).addSinkRecord(record); } protected String resolveRecordKeyFor(final SinkRecord record) { @@ -81,7 +83,8 @@ protected String resolveRecordKeyFor(final SinkRecord record) { final TopicPartitionKey tpk = new TopicPartitionKey(new TopicPartition(record.topic(), record.kafkaPartition()), key); - final SinkRecord currentHeadRecord = currentHeadRecords.computeIfAbsent(tpk, ignored -> record); + final Pair currentHeadRecord = currentHeadRecords.computeIfAbsent(tpk, + ignored -> Pair.of(record.kafkaOffset(), record.kafkaPartition())); String objectKey = generateObjectKey(tpk, currentHeadRecord, record); if (rotator.rotate(fileBuffers.get(objectKey))) { // Create new file using this record as the head record. @@ -102,14 +105,14 @@ private String recordKey(final SinkRecord record) { return key; } - public String generateObjectKey(final TopicPartitionKey tpk, final SinkRecord headRecord, + public String generateObjectKey(final TopicPartitionKey tpk, final Pair headRecord, final SinkRecord currentRecord) { final Function setKafkaOffset = usePaddingParameter -> usePaddingParameter.asBoolean() - ? String.format("%020d", headRecord.kafkaOffset()) - : Long.toString(headRecord.kafkaOffset()); + ? String.format("%020d", headRecord.getLeft()) + : Long.toString(headRecord.getLeft()); final Function setKafkaPartition = usePaddingParameter -> usePaddingParameter.asBoolean() - ? String.format("%010d", headRecord.kafkaPartition()) - : Long.toString(headRecord.kafkaPartition()); + ? String.format("%010d", headRecord.getRight()) + : Long.toString(headRecord.getRight()); return filenameTemplate.instance() .bindVariable(FilenameTemplateVariable.TOPIC.name, tpk.topicPartition::topic) @@ -123,8 +126,8 @@ public String generateObjectKey(final TopicPartitionKey tpk, final SinkRecord he protected String generateNewRecordKey(final SinkRecord record) { final var key = recordKey(record); final var tpk = new TopicPartitionKey(new TopicPartition(record.topic(), record.kafkaPartition()), key); - currentHeadRecords.put(tpk, record); - return generateObjectKey(tpk, record, record); + currentHeadRecords.put(tpk, Pair.of(record.kafkaOffset(), record.kafkaPartition())); + return generateObjectKey(tpk, Pair.of(record.kafkaOffset(), record.kafkaPartition()), record); } @Override @@ -133,9 +136,20 @@ public void clear() { fileBuffers.clear(); } + @Override + public void clearProcessedRecords(final String identifier, final List records) { + final GroupedSinkRecord grouperRecord = fileBuffers.getOrDefault(identifier, null); + if (Objects.isNull(grouperRecord)) { + return; + } + grouperRecord.removeSinkRecords(records); + } + @Override public Map> records() { - return Collections.unmodifiableMap(fileBuffers); + return Collections.unmodifiableMap(fileBuffers.values() + .stream() + .collect(Collectors.toMap(GroupedSinkRecord::getFilename, GroupedSinkRecord::getSinkRecords))); } public static class TopicPartitionKey { diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouper.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouper.java index 5a080e0a..6fb22e7a 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouper.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouper.java @@ -17,13 +17,13 @@ package io.aiven.kafka.connect.common.grouper; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.sink.SinkRecord; @@ -33,6 +33,8 @@ import io.aiven.kafka.connect.common.templating.Template; import io.aiven.kafka.connect.common.templating.VariableTemplatePart.Parameter; +import org.apache.commons.lang3.tuple.Pair; + /** * A {@link RecordGrouper} that groups records by topic and partition. * @@ -50,14 +52,14 @@ class TopicPartitionRecordGrouper implements RecordGrouper { DateTimeFormatter.ofPattern("dd"), "HH", DateTimeFormatter.ofPattern("HH")); private final Template filenameTemplate; + // Offsets are a Long and Partitions are an Integer + private final Map> currentHeadRecords = new HashMap<>(); - private final Map currentHeadRecords = new HashMap<>(); - - private final Map> fileBuffers = new HashMap<>(); + private final Map fileBuffers = new HashMap<>(); private final Function> setTimestampBasedOnRecord; - private final Rotator> rotator; + private final Rotator rotator; /** * A constructor. @@ -83,7 +85,7 @@ class TopicPartitionRecordGrouper implements RecordGrouper { if (unlimited) { return false; } else { - return buffer == null || buffer.size() >= maxRecordsPerFile; + return buffer == null || buffer.getNumberOfRecords() >= maxRecordsPerFile; } }; } @@ -92,28 +94,30 @@ class TopicPartitionRecordGrouper implements RecordGrouper { public void put(final SinkRecord record) { Objects.requireNonNull(record, "record cannot be null"); final String recordKey = resolveRecordKeyFor(record); - fileBuffers.computeIfAbsent(recordKey, ignored -> new ArrayList<>()).add(record); + fileBuffers.computeIfAbsent(recordKey, ignored -> new GroupedSinkRecord(recordKey)).addSinkRecord(record); } protected String resolveRecordKeyFor(final SinkRecord record) { final TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition()); - final SinkRecord currentHeadRecord = currentHeadRecords.computeIfAbsent(topicPartition, ignored -> record); + final Pair currentHeadRecord = currentHeadRecords.computeIfAbsent(topicPartition, + ignored -> Pair.of(record.kafkaOffset(), record.kafkaPartition())); String recordKey = generateRecordKey(topicPartition, currentHeadRecord, record); if (rotator.rotate(fileBuffers.get(recordKey))) { // Create new file using this record as the head record. recordKey = generateNewRecordKey(record); } + return recordKey; } - private String generateRecordKey(final TopicPartition topicPartition, final SinkRecord headRecord, + private String generateRecordKey(final TopicPartition topicPartition, final Pair headRecord, final SinkRecord currentRecord) { final Function setKafkaOffset = usePaddingParameter -> usePaddingParameter.asBoolean() - ? String.format("%020d", headRecord.kafkaOffset()) - : Long.toString(headRecord.kafkaOffset()); + ? String.format("%020d", headRecord.getLeft()) + : Long.toString(headRecord.getLeft()); final Function setKafkaPartition = usePaddingParameter -> usePaddingParameter.asBoolean() - ? String.format("%010d", headRecord.kafkaPartition()) - : Long.toString(headRecord.kafkaPartition()); + ? String.format("%010d", headRecord.getRight()) + : Long.toString(headRecord.getRight()); return filenameTemplate.instance() .bindVariable(FilenameTemplateVariable.TOPIC.name, topicPartition::topic) @@ -125,8 +129,8 @@ private String generateRecordKey(final TopicPartition topicPartition, final Sink protected String generateNewRecordKey(final SinkRecord record) { final TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition()); - currentHeadRecords.put(topicPartition, record); - return generateRecordKey(topicPartition, record, record); + currentHeadRecords.put(topicPartition, Pair.of(record.kafkaOffset(), record.kafkaPartition())); + return generateRecordKey(topicPartition, Pair.of(record.kafkaOffset(), record.kafkaPartition()), record); } @Override @@ -135,9 +139,20 @@ public void clear() { fileBuffers.clear(); } + @Override + public void clearProcessedRecords(final String identifier, final List records) { + final GroupedSinkRecord grouperRecord = fileBuffers.getOrDefault(identifier, null); + if (Objects.isNull(grouperRecord)) { + return; + } + grouperRecord.removeSinkRecords(records); + } + @Override public Map> records() { - return Collections.unmodifiableMap(fileBuffers); + return Collections.unmodifiableMap(fileBuffers.values() + .stream() + .collect(Collectors.toMap(GroupedSinkRecord::getFilename, GroupedSinkRecord::getSinkRecords))); } } diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java index 6dc7560e..f99cafeb 100644 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java +++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; @@ -134,7 +133,7 @@ public void put(final Collection records) { LOGGER.info("Processing {} records", records.size()); records.forEach(recordGrouper::put); if (!isKeyRecordGrouper) { - recordGrouper.records().forEach((filename, groupedRecords) -> writeToS3(filename, groupedRecords, records)); + recordGrouper.records().forEach(this::writeToS3); } } @@ -212,8 +211,11 @@ private OutputWriter getOutputWriter(final String filename, final SinkRecord sin * @param records * all records in this record grouping, including those already written to S3 */ - private void writeToS3(final String filename, final List records, - final Collection recordToBeWritten) { + private void writeToS3(final String filename, final List records) { + // If no new records are supplied in this put operation return immediately + if (records.isEmpty()) { + return; + } final SinkRecord sinkRecord = records.get(0); // This writer is being left open until a flush occurs. final OutputWriter writer; // NOPMD CloseResource @@ -221,8 +223,8 @@ private void writeToS3(final String filename, final List records, writer = getOutputWriter(filename, sinkRecord); // Record Grouper returns all records for that filename, all we want is the new batch of records to be added // to the multi part upload. - writer.writeRecords(records.stream().filter(recordToBeWritten::contains).collect(Collectors.toList())); - + writer.writeRecords(records); + recordGrouper.clearProcessedRecords(filename, records); } catch (IOException e) { throw new ConnectException(e); }