diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerFactory.java b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerFactory.java index 7ce70ec..1f3ce03 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerFactory.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerFactory.java @@ -6,7 +6,7 @@ public interface KafkaProducerFactory { - Producer createProducer(String clientId, String bootstrapServer, + Producer createProducer(String clientId, String bootstrapServer, Map optionalProperties); } diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java index 610e663..b881b35 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java @@ -3,21 +3,25 @@ import java.util.Map; import java.util.Properties; +import com.github.snuk87.keycloak.kafka.serializer.JsonSerializer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class KafkaStandardProducerFactory implements KafkaProducerFactory { + private static final Logger log = LoggerFactory.getLogger(KafkaStandardProducerFactory.class); + @Override - public Producer createProducer(String clientId, String bootstrapServer, + public Producer createProducer(String clientId, String bootstrapServer, Map optionalProperties) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); props.putAll(optionalProperties); return new KafkaProducer<>(props); diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/serializer/JsonSerializer.java b/src/main/java/com/github/snuk87/keycloak/kafka/serializer/JsonSerializer.java new file mode 100644 index 0000000..fd455d5 --- /dev/null +++ b/src/main/java/com/github/snuk87/keycloak/kafka/serializer/JsonSerializer.java @@ -0,0 +1,38 @@ +package com.github.snuk87.keycloak.kafka.serializer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Serializer; + +import java.util.Map; + +public class JsonSerializer implements Serializer { + private final ObjectMapper objectMapper = new ObjectMapper(); + + public JsonSerializer() { + + } + + @Override + public void configure(Map config, boolean isKey) { + + } + + @Override + public byte[] serialize(String topic, Object data) { + if (data == null) { + return null; + } + try { + return objectMapper.writeValueAsBytes(data); + } catch (JsonProcessingException e) { + throw new SerializationException("Error serializing JSON message", e); + } + } + + @Override + public void close() { + + } +} diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProvider.java b/src/main/java/com/github/snuk87/keycloak/kafka/spi/KafkaEventListenerProvider.java similarity index 77% rename from src/main/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProvider.java rename to src/main/java/com/github/snuk87/keycloak/kafka/spi/KafkaEventListenerProvider.java index dc52ff3..6a41e54 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProvider.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/spi/KafkaEventListenerProvider.java @@ -1,4 +1,4 @@ -package com.github.snuk87.keycloak.kafka; +package com.github.snuk87.keycloak.kafka.spi; import java.util.ArrayList; import java.util.List; @@ -8,6 +8,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import com.github.snuk87.keycloak.kafka.KafkaProducerFactory; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; @@ -17,9 +18,6 @@ import org.keycloak.events.EventType; import org.keycloak.events.admin.AdminEvent; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; - public class KafkaEventListenerProvider implements EventListenerProvider { private static final Logger LOG = Logger.getLogger(KafkaEventListenerProvider.class); @@ -30,9 +28,7 @@ public class KafkaEventListenerProvider implements EventListenerProvider { private String topicAdminEvents; - private Producer producer; - - private ObjectMapper mapper; + private Producer producer; public KafkaEventListenerProvider(String bootstrapServers, String clientId, String topicEvents, String[] events, String topicAdminEvents, Map kafkaProducerProperties, KafkaProducerFactory factory) { @@ -50,13 +46,12 @@ public KafkaEventListenerProvider(String bootstrapServers, String clientId, Stri } producer = factory.createProducer(clientId, bootstrapServers, kafkaProducerProperties); - mapper = new ObjectMapper(); } - private void produceEvent(String eventAsString, String topic) + private void produceEvent(final Object event, final String topic) throws InterruptedException, ExecutionException, TimeoutException { LOG.debug("Produce to topic: " + topicEvents + " ..."); - ProducerRecord record = new ProducerRecord<>(topic, eventAsString); + final ProducerRecord record = new ProducerRecord<>(topic, event); Future metaData = producer.send(record); RecordMetadata recordMetadata = metaData.get(30, TimeUnit.SECONDS); LOG.debug("Produced to topic: " + recordMetadata.topic()); @@ -66,22 +61,22 @@ private void produceEvent(String eventAsString, String topic) public void onEvent(Event event) { if (events.contains(event.getType())) { try { - produceEvent(mapper.writeValueAsString(event), topicEvents); - } catch (JsonProcessingException | ExecutionException | TimeoutException e) { + this.produceEvent(event, topicEvents); + } catch (ExecutionException | TimeoutException e) { LOG.error(e.getMessage(), e); } catch (InterruptedException e) { LOG.error(e.getMessage(), e); Thread.currentThread().interrupt(); } - } + } } @Override public void onEvent(AdminEvent event, boolean includeRepresentation) { if (topicAdminEvents != null) { try { - produceEvent(mapper.writeValueAsString(event), topicAdminEvents); - } catch (JsonProcessingException | ExecutionException | TimeoutException e) { + this.produceEvent(event, topicAdminEvents); + } catch (ExecutionException | TimeoutException e) { LOG.error(e.getMessage(), e); } catch (InterruptedException e) { LOG.error(e.getMessage(), e); diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderFactory.java b/src/main/java/com/github/snuk87/keycloak/kafka/spi/KafkaEventListenerProviderFactory.java similarity index 92% rename from src/main/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderFactory.java rename to src/main/java/com/github/snuk87/keycloak/kafka/spi/KafkaEventListenerProviderFactory.java index 3d4f39c..c961fc9 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderFactory.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/spi/KafkaEventListenerProviderFactory.java @@ -1,7 +1,9 @@ -package com.github.snuk87.keycloak.kafka; +package com.github.snuk87.keycloak.kafka.spi; import java.util.Map; +import com.github.snuk87.keycloak.kafka.KafkaProducerConfig; +import com.github.snuk87.keycloak.kafka.KafkaStandardProducerFactory; import org.jboss.logging.Logger; import org.keycloak.Config.Scope; import org.keycloak.events.EventListenerProvider; diff --git a/src/test/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderTests.java b/src/test/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderTests.java index de5307e..66ea508 100644 --- a/src/test/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderTests.java +++ b/src/test/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderTests.java @@ -6,6 +6,7 @@ import java.lang.reflect.Field; import java.util.Map; +import com.github.snuk87.keycloak.kafka.spi.KafkaEventListenerProvider; import org.apache.kafka.clients.producer.MockProducer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/src/test/java/com/github/snuk87/keycloak/kafka/KafkaMockProducerFactory.java b/src/test/java/com/github/snuk87/keycloak/kafka/KafkaMockProducerFactory.java index bdc6a3b..64f7ea3 100644 --- a/src/test/java/com/github/snuk87/keycloak/kafka/KafkaMockProducerFactory.java +++ b/src/test/java/com/github/snuk87/keycloak/kafka/KafkaMockProducerFactory.java @@ -2,6 +2,7 @@ import java.util.Map; +import com.github.snuk87.keycloak.kafka.serializer.JsonSerializer; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.serialization.StringSerializer; @@ -9,9 +10,9 @@ class KafkaMockProducerFactory implements KafkaProducerFactory { @Override - public Producer createProducer(String clientId, String bootstrapServer, + public Producer createProducer(String clientId, String bootstrapServer, Map optionalProperties) { - return new MockProducer<>(true, new StringSerializer(), new StringSerializer()); + return new MockProducer<>(true, new StringSerializer(), new JsonSerializer()); } }