-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add support for AWS glue schema registry (#3083)
- Loading branch information
Dexter Lee
authored
Aug 12, 2021
1 parent
8c9c696
commit 2df9d4f
Showing
17 changed files
with
386 additions
and
199 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
83 changes: 83 additions & 0 deletions
83
gms/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
package com.linkedin.gms.factory.kafka; | ||
|
||
import com.linkedin.gms.factory.kafka.schemaregistry.AwsGlueSchemaRegistryFactory; | ||
import com.linkedin.gms.factory.kafka.schemaregistry.KafkaSchemaRegistryFactory; | ||
import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig; | ||
import java.time.Duration; | ||
import java.util.Arrays; | ||
import java.util.Map; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.avro.generic.GenericRecord; | ||
import org.apache.kafka.common.serialization.StringDeserializer; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.beans.factory.annotation.Qualifier; | ||
import org.springframework.beans.factory.annotation.Value; | ||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties; | ||
import org.springframework.boot.context.properties.EnableConfigurationProperties; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Configuration; | ||
import org.springframework.context.annotation.Import; | ||
import org.springframework.context.annotation.Lazy; | ||
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; | ||
import org.springframework.kafka.config.KafkaListenerContainerFactory; | ||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; | ||
|
||
|
||
@Slf4j | ||
@Configuration | ||
@EnableConfigurationProperties(KafkaProperties.class) | ||
@Import({KafkaSchemaRegistryFactory.class, AwsGlueSchemaRegistryFactory.class}) | ||
public class KafkaEventConsumerFactory { | ||
|
||
@Value("${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}") | ||
private String kafkaBootstrapServers; | ||
|
||
@Value("${SCHEMA_REGISTRY_TYPE:KAFKA}") | ||
private String schemaRegistryType; | ||
|
||
@Autowired | ||
@Lazy | ||
@Qualifier("kafkaSchemaRegistry") | ||
private SchemaRegistryConfig kafkaSchemaRegistryConfig; | ||
|
||
@Autowired | ||
@Lazy | ||
@Qualifier("awsGlueSchemaRegistry") | ||
private SchemaRegistryConfig awsGlueSchemaRegistryConfig; | ||
|
||
@Bean(name = "kafkaEventConsumer") | ||
protected KafkaListenerContainerFactory<?> createInstance(KafkaProperties properties) { | ||
|
||
KafkaProperties.Consumer consumerProps = properties.getConsumer(); | ||
|
||
// Specify (de)serializers for record keys and for record values. | ||
consumerProps.setKeyDeserializer(StringDeserializer.class); | ||
// Records will be flushed every 10 seconds. | ||
consumerProps.setEnableAutoCommit(true); | ||
consumerProps.setAutoCommitInterval(Duration.ofSeconds(10)); | ||
|
||
// KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS | ||
if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) { | ||
consumerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(","))); | ||
} // else we rely on KafkaProperties which defaults to localhost:9092 | ||
|
||
SchemaRegistryConfig schemaRegistryConfig; | ||
if (schemaRegistryType.equals(KafkaSchemaRegistryFactory.TYPE)) { | ||
schemaRegistryConfig = kafkaSchemaRegistryConfig; | ||
} else { | ||
schemaRegistryConfig = awsGlueSchemaRegistryConfig; | ||
} | ||
|
||
consumerProps.setValueDeserializer(schemaRegistryConfig.getDeserializer()); | ||
Map<String, Object> props = properties.buildConsumerProperties(); | ||
props.putAll(schemaRegistryConfig.getProperties()); | ||
|
||
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = | ||
new ConcurrentKafkaListenerContainerFactory<>(); | ||
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props)); | ||
|
||
log.info("Event-based KafkaListenerContainerFactory built successfully"); | ||
|
||
return factory; | ||
} | ||
} |
45 changes: 35 additions & 10 deletions
45
...ory/common/KafkaEventProducerFactory.java → ...tory/kafka/KafkaEventProducerFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,44 +1,69 @@ | ||
package com.linkedin.gms.factory.common; | ||
package com.linkedin.gms.factory.kafka; | ||
|
||
import io.confluent.kafka.serializers.KafkaAvroSerializer; | ||
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; | ||
import com.linkedin.gms.factory.kafka.schemaregistry.AwsGlueSchemaRegistryFactory; | ||
import com.linkedin.gms.factory.kafka.schemaregistry.KafkaSchemaRegistryFactory; | ||
import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig; | ||
import java.util.Arrays; | ||
import java.util.Map; | ||
import org.apache.avro.generic.IndexedRecord; | ||
import org.apache.kafka.clients.producer.KafkaProducer; | ||
import org.apache.kafka.clients.producer.Producer; | ||
import org.apache.kafka.clients.producer.ProducerConfig; | ||
import org.apache.kafka.common.serialization.StringSerializer; | ||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.beans.factory.annotation.Qualifier; | ||
import org.springframework.beans.factory.annotation.Value; | ||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties; | ||
import org.springframework.boot.context.properties.EnableConfigurationProperties; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Configuration; | ||
import org.springframework.context.annotation.Import; | ||
import org.springframework.context.annotation.Lazy; | ||
|
||
|
||
@Configuration | ||
@EnableConfigurationProperties(KafkaProperties.class) | ||
@Import({KafkaSchemaRegistryFactory.class, AwsGlueSchemaRegistryFactory.class}) | ||
public class KafkaEventProducerFactory { | ||
|
||
@Value("${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}") | ||
private String kafkaBootstrapServers; | ||
|
||
@Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}") | ||
private String kafkaSchemaRegistryUrl; | ||
@Value("${SCHEMA_REGISTRY_TYPE:KAFKA}") | ||
private String schemaRegistryType; | ||
|
||
@Autowired | ||
@Lazy | ||
@Qualifier("kafkaSchemaRegistry") | ||
private SchemaRegistryConfig kafkaSchemaRegistryConfig; | ||
|
||
@Autowired | ||
@Lazy | ||
@Qualifier("awsGlueSchemaRegistry") | ||
private SchemaRegistryConfig awsGlueSchemaRegistryConfig; | ||
|
||
@Bean(name = "kafkaEventProducer") | ||
protected Producer<String, IndexedRecord> createInstance(KafkaProperties properties) { | ||
KafkaProperties.Producer producerProps = properties.getProducer(); | ||
|
||
producerProps.setKeySerializer(StringSerializer.class); | ||
producerProps.setValueSerializer(KafkaAvroSerializer.class); | ||
|
||
// KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS | ||
if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) { | ||
producerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(","))); | ||
} // else we rely on KafkaProperties which defaults to localhost:9092 | ||
|
||
SchemaRegistryConfig schemaRegistryConfig; | ||
if (schemaRegistryType.equals(KafkaSchemaRegistryFactory.TYPE)) { | ||
schemaRegistryConfig = kafkaSchemaRegistryConfig; | ||
} else { | ||
schemaRegistryConfig = awsGlueSchemaRegistryConfig; | ||
} | ||
|
||
Map<String, Object> props = properties.buildProducerProperties(); | ||
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl); | ||
|
||
return new KafkaProducer(props); | ||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, schemaRegistryConfig.getSerializer().getName()); | ||
props.putAll(schemaRegistryConfig.getProperties()); | ||
|
||
return new KafkaProducer<>(props); | ||
} | ||
} |
55 changes: 55 additions & 0 deletions
55
gms/factories/src/main/java/com/linkedin/gms/factory/kafka/SimpleKafkaConsumerFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
package com.linkedin.gms.factory.kafka; | ||
|
||
import com.linkedin.gms.factory.kafka.schemaregistry.AwsGlueSchemaRegistryFactory; | ||
import com.linkedin.gms.factory.kafka.schemaregistry.KafkaSchemaRegistryFactory; | ||
import java.time.Duration; | ||
import java.util.Arrays; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.avro.generic.GenericRecord; | ||
import org.apache.kafka.common.serialization.StringDeserializer; | ||
import org.springframework.beans.factory.annotation.Value; | ||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties; | ||
import org.springframework.boot.context.properties.EnableConfigurationProperties; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Configuration; | ||
import org.springframework.context.annotation.Import; | ||
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; | ||
import org.springframework.kafka.config.KafkaListenerContainerFactory; | ||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; | ||
|
||
|
||
@Slf4j | ||
@Configuration | ||
@EnableConfigurationProperties(KafkaProperties.class) | ||
@Import({KafkaSchemaRegistryFactory.class, AwsGlueSchemaRegistryFactory.class}) | ||
public class SimpleKafkaConsumerFactory { | ||
|
||
@Value("${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}") | ||
private String kafkaBootstrapServers; | ||
|
||
@Bean(name = "simpleKafkaConsumer") | ||
protected KafkaListenerContainerFactory<?> createInstance(KafkaProperties properties) { | ||
|
||
KafkaProperties.Consumer consumerProps = properties.getConsumer(); | ||
|
||
// Specify (de)serializers for record keys and for record values. | ||
consumerProps.setKeyDeserializer(StringDeserializer.class); | ||
consumerProps.setValueDeserializer(StringDeserializer.class); | ||
// Records will be flushed every 10 seconds. | ||
consumerProps.setEnableAutoCommit(true); | ||
consumerProps.setAutoCommitInterval(Duration.ofSeconds(10)); | ||
|
||
// KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS | ||
if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) { | ||
consumerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(","))); | ||
} // else we rely on KafkaProperties which defaults to localhost:9092 | ||
|
||
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = | ||
new ConcurrentKafkaListenerContainerFactory<>(); | ||
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties())); | ||
|
||
log.info("Simple KafkaListenerContainerFactory built successfully"); | ||
|
||
return factory; | ||
} | ||
} |
41 changes: 41 additions & 0 deletions
41
...main/java/com/linkedin/gms/factory/kafka/schemaregistry/AwsGlueSchemaRegistryFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
package com.linkedin.gms.factory.kafka.schemaregistry; | ||
|
||
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer; | ||
import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistryKafkaSerializer; | ||
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; | ||
import com.amazonaws.services.schemaregistry.utils.AvroRecordType; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import javax.annotation.Nonnull; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.springframework.beans.factory.annotation.Value; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Configuration; | ||
|
||
|
||
@Slf4j | ||
@Configuration | ||
public class AwsGlueSchemaRegistryFactory { | ||
|
||
public static final String TYPE = "AWS_GLUE"; | ||
|
||
@Value("${AWS_GLUE_SCHEMA_REGISTRY_REGION:us-east-1}") | ||
private String awsRegion; | ||
@Value("${AWS_GLUE_SCHEMA_REGISTRY_NAME:#{null}}") | ||
private Optional<String> registryName; | ||
|
||
@Bean(name = "awsGlueSchemaRegistry") | ||
@Nonnull | ||
protected SchemaRegistryConfig getInstance() { | ||
Map<String, Object> props = new HashMap<>(); | ||
props.put(AWSSchemaRegistryConstants.AWS_REGION, awsRegion); | ||
props.put(AWSSchemaRegistryConstants.DATA_FORMAT, "AVRO"); | ||
props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true"); | ||
props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); | ||
registryName.ifPresent(s -> props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, s)); | ||
log.info("Creating AWS Glue registry"); | ||
return new SchemaRegistryConfig(GlueSchemaRegistryKafkaSerializer.class, GlueSchemaRegistryKafkaDeserializer.class, | ||
props); | ||
} | ||
} |
Oops, something went wrong.