Skip to content

Commit

Permalink
SnuK87#46 - Implement KafkaSerializer for JSON serialization instead …
Browse files Browse the repository at this point in the history
…using explicit object mapping
  • Loading branch information
ttimot24 committed Sep 18, 2024
1 parent e86a615 commit 8abc8c0
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

public interface KafkaProducerFactory {

Producer<String, String> createProducer(String clientId, String bootstrapServer,
Producer<String, Object> createProducer(String clientId, String bootstrapServer,
Map<String, Object> optionalProperties);

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,25 @@
import java.util.Map;
import java.util.Properties;

import com.github.snuk87.keycloak.kafka.serializer.JsonSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class KafkaStandardProducerFactory implements KafkaProducerFactory {

private static final Logger log = LoggerFactory.getLogger(KafkaStandardProducerFactory.class);

@Override
public Producer<String, String> createProducer(String clientId, String bootstrapServer,
public Producer<String, Object> createProducer(String clientId, String bootstrapServer,
Map<String, Object> optionalProperties) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
props.putAll(optionalProperties);

return new KafkaProducer<>(props);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.github.snuk87.keycloak.kafka.serializer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

public class JsonSerializer implements Serializer<Object> {
private final ObjectMapper objectMapper = new ObjectMapper();

public JsonSerializer() {

}

@Override
public void configure(Map<String, ?> config, boolean isKey) {

}

@Override
public byte[] serialize(String topic, Object data) {
if (data == null) {
return null;
}
try {
return objectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new SerializationException("Error serializing JSON message", e);
}
}

@Override
public void close() {

}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.github.snuk87.keycloak.kafka;
package com.github.snuk87.keycloak.kafka.spi;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -8,6 +8,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.github.snuk87.keycloak.kafka.KafkaProducerFactory;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
Expand All @@ -17,9 +18,6 @@
import org.keycloak.events.EventType;
import org.keycloak.events.admin.AdminEvent;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class KafkaEventListenerProvider implements EventListenerProvider {

private static final Logger LOG = Logger.getLogger(KafkaEventListenerProvider.class);
Expand All @@ -30,9 +28,7 @@ public class KafkaEventListenerProvider implements EventListenerProvider {

private String topicAdminEvents;

private Producer<String, String> producer;

private ObjectMapper mapper;
private Producer<String, Object> producer;

public KafkaEventListenerProvider(String bootstrapServers, String clientId, String topicEvents, String[] events,
String topicAdminEvents, Map<String, Object> kafkaProducerProperties, KafkaProducerFactory factory) {
Expand All @@ -50,13 +46,12 @@ public KafkaEventListenerProvider(String bootstrapServers, String clientId, Stri
}

producer = factory.createProducer(clientId, bootstrapServers, kafkaProducerProperties);
mapper = new ObjectMapper();
}

private void produceEvent(String eventAsString, String topic)
private void produceEvent(final Object event, final String topic)
throws InterruptedException, ExecutionException, TimeoutException {
LOG.debug("Produce to topic: " + topicEvents + " ...");
ProducerRecord<String, String> record = new ProducerRecord<>(topic, eventAsString);
final ProducerRecord<String, Object> record = new ProducerRecord<>(topic, event);
Future<RecordMetadata> metaData = producer.send(record);
RecordMetadata recordMetadata = metaData.get(30, TimeUnit.SECONDS);
LOG.debug("Produced to topic: " + recordMetadata.topic());
Expand All @@ -66,22 +61,22 @@ private void produceEvent(String eventAsString, String topic)
public void onEvent(Event event) {
if (events.contains(event.getType())) {
try {
produceEvent(mapper.writeValueAsString(event), topicEvents);
} catch (JsonProcessingException | ExecutionException | TimeoutException e) {
this.produceEvent(event, topicEvents);
} catch (ExecutionException | TimeoutException e) {
LOG.error(e.getMessage(), e);
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
Thread.currentThread().interrupt();
}
}
}
}

@Override
public void onEvent(AdminEvent event, boolean includeRepresentation) {
if (topicAdminEvents != null) {
try {
produceEvent(mapper.writeValueAsString(event), topicAdminEvents);
} catch (JsonProcessingException | ExecutionException | TimeoutException e) {
this.produceEvent(event, topicAdminEvents);
} catch (ExecutionException | TimeoutException e) {
LOG.error(e.getMessage(), e);
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.github.snuk87.keycloak.kafka;
package com.github.snuk87.keycloak.kafka.spi;

import java.util.Map;

import com.github.snuk87.keycloak.kafka.KafkaProducerConfig;
import com.github.snuk87.keycloak.kafka.KafkaStandardProducerFactory;
import org.jboss.logging.Logger;
import org.keycloak.Config.Scope;
import org.keycloak.events.EventListenerProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.lang.reflect.Field;
import java.util.Map;

import com.github.snuk87.keycloak.kafka.spi.KafkaEventListenerProvider;
import org.apache.kafka.clients.producer.MockProducer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@

import java.util.Map;

import com.github.snuk87.keycloak.kafka.serializer.JsonSerializer;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringSerializer;

class KafkaMockProducerFactory implements KafkaProducerFactory {

@Override
public Producer<String, String> createProducer(String clientId, String bootstrapServer,
public Producer<String, Object> createProducer(String clientId, String bootstrapServer,
Map<String, Object> optionalProperties) {
return new MockProducer<>(true, new StringSerializer(), new StringSerializer());
return new MockProducer<>(true, new StringSerializer(), new JsonSerializer());
}

}

0 comments on commit 8abc8c0

Please sign in to comment.