Skip to content

Commit

Permalink
Update to move OffsetManager and remove unnessecary some config links…
Browse files Browse the repository at this point in the history
… from S3SourceConfig to abstractConfig

Signed-off-by: Aindriu Lavelle <[email protected]>
  • Loading branch information
aindriu-aiven committed Nov 27, 2024
1 parent 549e852 commit 837b66a
Show file tree
Hide file tree
Showing 18 changed files with 143 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,49 +14,42 @@
* limitations under the License.
*/

package io.aiven.kafka.connect.s3.source.utils;
package io.aiven.kafka.connect.common.source.offsets;

import static java.util.stream.Collectors.toMap;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.kafka.connect.source.SourceTaskContext;

import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OffsetManager {

private static final Logger LOGGER = LoggerFactory.getLogger(OffsetManager.class);
public static final String SEPARATOR = "_";
// TODO look using map maker to eject keys which are no longer needed. (i.e. there is only a requirement to actually
// have the last file completed in memory plus the current file)
// Guava's CacheBuilder might be a good choice
private final Map<Map<String, Object>, Map<String, Object>> offsets;

public OffsetManager(final SourceTaskContext context, final S3SourceConfig s3SourceConfig) {
final String s3Bucket = s3SourceConfig.getString(S3SourceConfig.AWS_S3_BUCKET_NAME_CONFIG);
final Set<Integer> partitions = parsePartitions(s3SourceConfig);
final Set<String> topics = parseTopics(s3SourceConfig);

// Build the partition keys and fetch offsets from offset storage
final List<Map<String, Object>> partitionKeys = buildPartitionKeys(s3Bucket, partitions, topics);
final Map<Map<String, Object>, Map<String, Object>> offsetMap = context.offsetStorageReader()
.offsets(partitionKeys);
public OffsetManager(final SourceTaskContext context, final List<Map<String, Object>> partitionKeys) {

LOGGER.info(" ********** offsetMap ***** {}", offsetMap);
this.offsets = offsetMap.entrySet()
this.offsets = context.offsetStorageReader()
.offsets(partitionKeys)
.entrySet()
.stream()
.filter(e -> e.getValue() != null)
.collect(toMap(entry -> new HashMap<>(entry.getKey()), entry -> new HashMap<>(entry.getValue())));
LOGGER.info(" ********** offsets ***** {}", offsets);

if (LOGGER.isDebugEnabled()) {
LOGGER.debug(" ********** offsets ***** {}", offsets);
}
}

/**
Expand All @@ -83,7 +76,7 @@ public <T extends OffsetManagerEntry> T getEntry(final OffsetManagerKey key,
return creator.apply(offsets.get(key.getPartitionMap()));
}

void updateCurrentOffsets(final OffsetManagerEntry entry) {
public void updateCurrentOffsets(final OffsetManagerEntry entry) {
offsets.compute(entry.getManagerKey().getPartitionMap(), (k, v) -> {
if (v == null) {
return new HashMap<>(entry.getProperties());
Expand All @@ -94,28 +87,6 @@ void updateCurrentOffsets(final OffsetManagerEntry entry) {
});
}

// TODO move this to S3OffsetManager creation.
private static Set<Integer> parsePartitions(final S3SourceConfig s3SourceConfig) {
final String partitionString = s3SourceConfig.getString(S3SourceConfig.TARGET_TOPIC_PARTITIONS);
return Arrays.stream(partitionString.split(",")).map(Integer::parseInt).collect(Collectors.toSet());
}

// TODO move this to S3OffsetManager creation.
private static Set<String> parseTopics(final S3SourceConfig s3SourceConfig) {
final String topicString = s3SourceConfig.getString(S3SourceConfig.TARGET_TOPICS);
return Arrays.stream(topicString.split(",")).collect(Collectors.toSet());
}

// TODO move this to S3OffsetManager creation. May not be needed.
private static List<Map<String, Object>> buildPartitionKeys(final String bucket, final Set<Integer> partitions,
final Set<String> topics) {
final List<Map<String, Object>> partitionKeys = new ArrayList<>();
partitions.forEach(partition -> topics.forEach(topic -> {
partitionKeys.add(ConnectUtils.getPartitionMap(topic, partition, bucket));
}));
return partitionKeys;
}

/**
* The definition of an entry in the OffsetManager.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.aiven.kafka.connect.s3.source;

import static io.aiven.kafka.connect.common.source.offsets.OffsetManager.SEPARATOR;
import static io.aiven.kafka.connect.s3.source.S3SourceTask.OBJECT_KEY;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.AWS_ACCESS_KEY_ID_CONFIG;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.AWS_S3_BUCKET_NAME_CONFIG;
Expand All @@ -28,7 +29,6 @@
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.TARGET_TOPIC_PARTITIONS;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.VALUE_CONVERTER_SCHEMA_REGISTRY_URL;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.VALUE_SERIALIZER;
import static io.aiven.kafka.connect.s3.source.utils.OffsetManager.SEPARATOR;
import static java.util.Map.entry;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.Converter;

import io.aiven.kafka.connect.common.source.offsets.OffsetManager;
import io.aiven.kafka.connect.s3.source.config.S3ClientFactory;
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;
import io.aiven.kafka.connect.s3.source.input.Transformer;
import io.aiven.kafka.connect.s3.source.input.TransformerFactory;
import io.aiven.kafka.connect.s3.source.utils.FileReader;
import io.aiven.kafka.connect.s3.source.utils.OffsetManager;
import io.aiven.kafka.connect.s3.source.utils.RecordProcessor;
import io.aiven.kafka.connect.s3.source.utils.S3OffsetManager;
import io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry;
import io.aiven.kafka.connect.s3.source.utils.S3SourceRecord;
import io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator;
Expand Down Expand Up @@ -124,7 +125,7 @@ public void start(final Map<String, String> props) {
initializeConverters();
initializeS3Client();
transformer = TransformerFactory.getTransformer(s3SourceConfig);
offsetManager = new OffsetManager(context, s3SourceConfig);
offsetManager = new S3OffsetManager(context, s3SourceConfig);
final String s3Bucket = s3SourceConfig.getString(AWS_S3_BUCKET_NAME_CONFIG);
fileReader = new FileReader(s3SourceConfig, s3Bucket, failedObjectKeys);
prepareReaderFromOffsetStorageReader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;
import org.apache.kafka.common.config.AbstractConfig;

import com.amazonaws.util.IOUtils;
import org.apache.avro.file.DataFileReader;
Expand All @@ -47,21 +47,21 @@ public class AvroTransformer implements Transformer {
private static final Logger LOGGER = LoggerFactory.getLogger(AvroTransformer.class);

@Override
public void configureValueConverter(final Map<String, String> config, final S3SourceConfig s3SourceConfig) {
config.put(SCHEMA_REGISTRY_URL, s3SourceConfig.getString(SCHEMA_REGISTRY_URL));
public void configureValueConverter(final Map<String, String> config, final AbstractConfig sourceConfig) {
config.put(SCHEMA_REGISTRY_URL, sourceConfig.getString(SCHEMA_REGISTRY_URL));
}

@Override
public Stream<Object> getRecords(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final int topicPartition, final S3SourceConfig s3SourceConfig) {
final int topicPartition, final AbstractConfig sourceConfig) {
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
return readAvroRecordsAsStream(inputStreamIOSupplier, datumReader);
}

@Override
public byte[] getValueBytes(final Object record, final String topic, final S3SourceConfig s3SourceConfig) {
public byte[] getValueBytes(final Object record, final String topic, final AbstractConfig sourceConfig) {
return TransformationUtils.serializeAvroRecordToBytes(Collections.singletonList((GenericRecord) record), topic,
s3SourceConfig);
sourceConfig);
}

private Stream<Object> readAvroRecordsAsStream(final IOSupplier<InputStream> inputStreamIOSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;
import org.apache.kafka.common.config.AbstractConfig;

import org.apache.commons.io.function.IOSupplier;
import org.slf4j.Logger;
Expand All @@ -34,13 +34,13 @@ public class ByteArrayTransformer implements Transformer {
private static final Logger LOGGER = LoggerFactory.getLogger(ByteArrayTransformer.class);

@Override
public void configureValueConverter(final Map<String, String> config, final S3SourceConfig s3SourceConfig) {
public void configureValueConverter(final Map<String, String> config, final AbstractConfig sourceConfig) {
// For byte array transformations, ByteArrayConverter is the converter which is the default config.
}

@Override
public Stream<Object> getRecords(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final int topicPartition, final S3SourceConfig s3SourceConfig) {
final int topicPartition, final AbstractConfig sourceConfig) {

// Create a Stream that processes each chunk lazily
return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) {
Expand All @@ -66,7 +66,7 @@ public boolean tryAdvance(final java.util.function.Consumer<? super Object> acti
}

@Override
public byte[] getValueBytes(final Object record, final String topic, final S3SourceConfig s3SourceConfig) {
public byte[] getValueBytes(final Object record, final String topic, final AbstractConfig sourceConfig) {
return (byte[]) record;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;
import org.apache.kafka.common.config.AbstractConfig;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -45,18 +45,18 @@ public class JsonTransformer implements Transformer {
final ObjectMapper objectMapper = new ObjectMapper();

@Override
public void configureValueConverter(final Map<String, String> config, final S3SourceConfig s3SourceConfig) {
public void configureValueConverter(final Map<String, String> config, final AbstractConfig sourceConfig) {
config.put(SCHEMAS_ENABLE, "false");
}

@Override
public Stream<Object> getRecords(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final int topicPartition, final S3SourceConfig s3SourceConfig) {
final int topicPartition, final AbstractConfig sourceConfig) {
return readJsonRecordsAsStream(inputStreamIOSupplier);
}

@Override
public byte[] getValueBytes(final Object record, final String topic, final S3SourceConfig s3SourceConfig) {
public byte[] getValueBytes(final Object record, final String topic, final AbstractConfig sourceConfig) {
try {
return objectMapper.writeValueAsBytes(record);
} catch (JsonProcessingException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;
import org.apache.kafka.common.config.AbstractConfig;

import org.apache.avro.generic.GenericRecord;
import org.apache.commons.compress.utils.IOUtils;
Expand All @@ -48,20 +48,20 @@ public class ParquetTransformer implements Transformer {
private static final Logger LOGGER = LoggerFactory.getLogger(ParquetTransformer.class);

@Override
public void configureValueConverter(final Map<String, String> config, final S3SourceConfig s3SourceConfig) {
config.put(SCHEMA_REGISTRY_URL, s3SourceConfig.getString(SCHEMA_REGISTRY_URL));
public void configureValueConverter(final Map<String, String> config, final AbstractConfig sourceConfig) {
config.put(SCHEMA_REGISTRY_URL, sourceConfig.getString(SCHEMA_REGISTRY_URL));
}

@Override
public Stream<Object> getRecords(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final int topicPartition, final S3SourceConfig s3SourceConfig) {
final int topicPartition, final AbstractConfig sourceConfig) {
return getParquetStreamRecords(inputStreamIOSupplier, topic, topicPartition);
}

@Override
public byte[] getValueBytes(final Object record, final String topic, final S3SourceConfig s3SourceConfig) {
public byte[] getValueBytes(final Object record, final String topic, final AbstractConfig sourceConfig) {
return TransformationUtils.serializeAvroRecordToBytes(Collections.singletonList((GenericRecord) record), topic,
s3SourceConfig);
sourceConfig);
}

private Stream<Object> getParquetStreamRecords(final IOSupplier<InputStream> inputStreamIOSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.List;
import java.util.Map;

import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;
import org.apache.kafka.common.config.AbstractConfig;

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -41,11 +41,11 @@ private TransformationUtils() {
}

static byte[] serializeAvroRecordToBytes(final List<GenericRecord> avroRecords, final String topic,
final S3SourceConfig s3SourceConfig) {
final AbstractConfig sourceConfig) {
final Map<String, String> config = Collections.singletonMap(SCHEMA_REGISTRY_URL,
s3SourceConfig.getString(SCHEMA_REGISTRY_URL));
sourceConfig.getString(SCHEMA_REGISTRY_URL));

try (KafkaAvroSerializer avroSerializer = (KafkaAvroSerializer) s3SourceConfig.getClass(VALUE_SERIALIZER)
try (KafkaAvroSerializer avroSerializer = (KafkaAvroSerializer) sourceConfig.getClass(VALUE_SERIALIZER)
.getDeclaredConstructor()
.newInstance(); ByteArrayOutputStream out = new ByteArrayOutputStream()) {
avroSerializer.configure(config, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@
import java.util.Map;
import java.util.stream.Stream;

import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;
import org.apache.kafka.common.config.AbstractConfig;

import org.apache.commons.io.function.IOSupplier;

public interface Transformer {

// TODO make this method accept an S3OffsetManagerEntry and update the values in the configuration directly.
void configureValueConverter(Map<String, String> config, S3SourceConfig s3SourceConfig);
void configureValueConverter(Map<String, String> config, AbstractConfig sourceConfig);

// TODO make this method accept an S3OffsetManagerEntry to retrieve the topic an topicParitiion.
Stream<Object> getRecords(IOSupplier<InputStream> inputStreamIOSupplier, String topic, int topicPartition,
S3SourceConfig s3SourceConfig);
AbstractConfig sourceConfig);

byte[] getValueBytes(Object record, String topic, S3SourceConfig s3SourceConfig);
byte[] getValueBytes(Object record, String topic, AbstractConfig sourceConfig);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.function.Predicate;
import java.util.stream.Stream;

import io.aiven.kafka.connect.common.source.offsets.OffsetManager;
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;
import io.aiven.kafka.connect.s3.source.input.Transformer;

Expand Down
Loading

0 comments on commit 837b66a

Please sign in to comment.