Skip to content

Commit

Permalink
feat: 샵 좋아요 이벤트에 MessagePackSerializer, MessagePackDeserializer 추가
Browse files Browse the repository at this point in the history
  • Loading branch information
sssukho committed Jan 15, 2025
1 parent 501302d commit 09f9504
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.beuatify_project.bp_common.event.ShopLikeCancelEvent;
import com.beuatify_project.bp_common.event.ShopLikeEvent;
import com.beuatify_project.bp_common.event.SignUpCertificationMailEvent;
import com.beuatify_project.bp_common.serializer.MessagePackSerializer;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand All @@ -14,7 +15,6 @@
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

@EnableKafka
@Configuration
Expand All @@ -28,7 +28,10 @@ public Map<String, Object> producerConfig() {
return Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, configProperties.getBroker(),
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MessagePackSerializer.class
// ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class,

);
}

@Bean(name = "ShopLikeEventProducerFactory")
Expand Down
3 changes: 3 additions & 0 deletions bp-common/build.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
dependencies {
implementation project(':bp-utils')
implementation 'jakarta.validation:jakarta.validation-api:3.0.2'
implementation 'org.msgpack:msgpack-core:0.9.9'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.17.2'
implementation 'org.apache.kafka:kafka-clients:3.7.1'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.beuatify_project.bp_common.serializer;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessageUnpacker;

public class MessagePackDeserializer<T> implements Deserializer<T> {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final Class<T> targetClass;

public MessagePackDeserializer(final Class<T> targetClass) {
this.targetClass = targetClass;
}

@Override
public T deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
try (MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(data)) {
// JSON 문자열로 변환된 데이터를 다시 POJO로 역직렬화
String jsonString = unpacker.unpackString();
// JSON -> Object 변환 (Jackson 또는 Gson 사용 가능)
return OBJECT_MAPPER.readValue(jsonString, targetClass);
} catch (Exception e) {
throw new RuntimeException("Error deserializing object from MessagePack", e);
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.beuatify_project.bp_common.serializer;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serializer;
import org.msgpack.core.MessageBufferPacker;
import org.msgpack.core.MessagePack;

public class MessagePackSerializer<T> implements Serializer<T> {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

@Override
public byte[] serialize(final String s, final T data) {
if (data == null) {
return null;
}

try (MessageBufferPacker packer = MessagePack.newDefaultBufferPacker()) {
// Object를 JSON으로 변환 후 MessagePack으로 직렬화
packer.packString(OBJECT_MAPPER.writeValueAsString(data));
return packer.toByteArray();
} catch (Exception e) {
throw new RuntimeException("Failed to serialize object to MessagePack", e);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,31 @@
import com.beuatify_project.bp_common.event.ShopLikeCancelEvent;
import com.beuatify_project.bp_common.event.ShopLikeEvent;
import com.beuatify_project.bp_common.event.SignUpCertificationMailEvent;
import com.beuatify_project.bp_common.serializer.MessagePackDeserializer;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

@EnableKafka
@Configuration
@RequiredArgsConstructor
@Slf4j
public class KafkaConsumerConfig {

private static final String TRUSTED_PACKAGES = "com.beautify_project.bp_kafka_event_consumer.event";
private static final String TRUSTED_PACKAGES = "com.beuatify_project.bp_common.event";
private static final String TOPIC_CONFIG_NAME_SHOP_LIKE_EVENT = "SHOP-LIKE-EVENT";
private static final String TOPIC_CONFIG_NAME_SHOP_LIKE_CANCEL_EVENT = "SHOP-LIKE-CANCEL-EVENT";
private static final String TOPIC_CONFIG_NAME_SIGNUP_CERTIFICATION_MAIL_EVENT = "MAIL-SIGN-UP-CERTIFICATION-EVENT";
Expand All @@ -38,12 +43,10 @@ public Map<String, Object> shopLikeEventConsumerConfig() {
configProperties.getTopic().get(TOPIC_CONFIG_NAME_SHOP_LIKE_EVENT).getGroupId(),

ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
// 역직렬화 실패 무한 로그 방지
ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class,
ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class, // 역직렬화 실패 무한 로그 방지

ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class,
// 역직렬화 실패 무한 로그 방지
ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MessagePackDeserializer.class,
ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class, // 역직렬화 실패 무한 로그 방지

ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
configProperties.getTopic().get(TOPIC_CONFIG_NAME_SHOP_LIKE_EVENT).getBatchSize(),
Expand All @@ -56,8 +59,8 @@ public Map<String, Object> shopLikeEventConsumerConfig() {
@Bean(name = "shopLikeEventConsumerFactory")
public ConsumerFactory<String, ShopLikeEvent> shopLikeEventConsumerFactory() {
// 들어오는 record 를 객체로 받기 위한 deserializer
final JsonDeserializer<ShopLikeEvent> deserializer = new JsonDeserializer<>(
ShopLikeEvent.class, false);
final MessagePackDeserializer<ShopLikeEvent> deserializer = new MessagePackDeserializer<>(
ShopLikeEvent.class);

return new DefaultKafkaConsumerFactory<>(shopLikeEventConsumerConfig(), new StringDeserializer(),
deserializer);
Expand All @@ -68,6 +71,14 @@ public ConcurrentKafkaListenerContainerFactory<String, ShopLikeEvent> shopLikeEv
ConcurrentKafkaListenerContainerFactory<String, ShopLikeEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(shopLikeEventConsumerFactory());
factory.setBatchListener(true);
factory.setCommonErrorHandler(new DefaultErrorHandler((record, exception) -> {
if (exception instanceof SerializationException || exception instanceof IllegalStateException) {
log.error(
"Skip event due to deserialization error: topic - {} | partition - {} | value - {}",
record.topic(), record.partition(), record.value());
// TODO: 별도의 큐 처리 또는 추가 로직으로 처리 필요
}
}));
return factory;
}

Expand Down

0 comments on commit 09f9504

Please sign in to comment.