From 09f95047252d4d64bf5fc80ffcb5d400ee3b54cd Mon Sep 17 00:00:00 2001 From: sssukho Date: Thu, 16 Jan 2025 05:30:38 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20=EC=83=B5=20=EC=A2=8B=EC=95=84=EC=9A=94?= =?UTF-8?q?=20=EC=9D=B4=EB=B2=A4=ED=8A=B8=EC=97=90=20MessagePackSerializer?= =?UTF-8?q?,=20MessagePackDeserializer=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/KafkaProducerConfig.java | 7 ++-- bp-common/build.gradle | 3 ++ .../serializer/MessagePackDeserializer.java | 32 +++++++++++++++++++ .../serializer/MessagePackSerializer.java | 27 ++++++++++++++++ .../config/KafkaConsumerConfig.java | 27 +++++++++++----- 5 files changed, 86 insertions(+), 10 deletions(-) create mode 100644 bp-common/src/main/java/com/beuatify_project/bp_common/serializer/MessagePackDeserializer.java create mode 100644 bp-common/src/main/java/com/beuatify_project/bp_common/serializer/MessagePackSerializer.java diff --git a/bp-app-api/src/main/java/com/beautify_project/bp_app_api/config/KafkaProducerConfig.java b/bp-app-api/src/main/java/com/beautify_project/bp_app_api/config/KafkaProducerConfig.java index f7678e4..4621e82 100644 --- a/bp-app-api/src/main/java/com/beautify_project/bp_app_api/config/KafkaProducerConfig.java +++ b/bp-app-api/src/main/java/com/beautify_project/bp_app_api/config/KafkaProducerConfig.java @@ -4,6 +4,7 @@ import com.beuatify_project.bp_common.event.ShopLikeCancelEvent; import com.beuatify_project.bp_common.event.ShopLikeEvent; import com.beuatify_project.bp_common.event.SignUpCertificationMailEvent; +import com.beuatify_project.bp_common.serializer.MessagePackSerializer; import java.util.Map; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.producer.ProducerConfig; @@ -14,7 +15,6 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.support.serializer.JsonSerializer; @EnableKafka @Configuration @@ -28,7 +28,10 @@ public Map producerConfig() { return Map.of( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, configProperties.getBroker(), ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class, - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MessagePackSerializer.class +// ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class, + + ); } @Bean(name = "ShopLikeEventProducerFactory") diff --git a/bp-common/build.gradle b/bp-common/build.gradle index deb8246..8dfb438 100644 --- a/bp-common/build.gradle +++ b/bp-common/build.gradle @@ -1,4 +1,7 @@ dependencies { implementation project(':bp-utils') implementation 'jakarta.validation:jakarta.validation-api:3.0.2' + implementation 'org.msgpack:msgpack-core:0.9.9' + implementation 'com.fasterxml.jackson.core:jackson-databind:2.17.2' + implementation 'org.apache.kafka:kafka-clients:3.7.1' } diff --git a/bp-common/src/main/java/com/beuatify_project/bp_common/serializer/MessagePackDeserializer.java b/bp-common/src/main/java/com/beuatify_project/bp_common/serializer/MessagePackDeserializer.java new file mode 100644 index 0000000..cb59609 --- /dev/null +++ b/bp-common/src/main/java/com/beuatify_project/bp_common/serializer/MessagePackDeserializer.java @@ -0,0 +1,32 @@ +package com.beuatify_project.bp_common.serializer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.serialization.Deserializer; +import org.msgpack.core.MessagePack; +import org.msgpack.core.MessageUnpacker; + +public class MessagePackDeserializer implements Deserializer { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private final Class targetClass; + + public MessagePackDeserializer(final Class targetClass) { + this.targetClass = targetClass; + } + + @Override + public T deserialize(String topic, byte[] data) { + if (data == null) { + return null; + } + try (MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(data)) { + // JSON 문자열로 변환된 데이터를 다시 POJO로 역직렬화 + String jsonString = unpacker.unpackString(); + // JSON -> Object 변환 (Jackson 또는 Gson 사용 가능) + return OBJECT_MAPPER.readValue(jsonString, targetClass); + } catch (Exception e) { + throw new RuntimeException("Error deserializing object from MessagePack", e); + } + } +} + diff --git a/bp-common/src/main/java/com/beuatify_project/bp_common/serializer/MessagePackSerializer.java b/bp-common/src/main/java/com/beuatify_project/bp_common/serializer/MessagePackSerializer.java new file mode 100644 index 0000000..f7abc7c --- /dev/null +++ b/bp-common/src/main/java/com/beuatify_project/bp_common/serializer/MessagePackSerializer.java @@ -0,0 +1,27 @@ +package com.beuatify_project.bp_common.serializer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.serialization.Serializer; +import org.msgpack.core.MessageBufferPacker; +import org.msgpack.core.MessagePack; + +public class MessagePackSerializer implements Serializer { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Override + public byte[] serialize(final String s, final T data) { + if (data == null) { + return null; + } + + try (MessageBufferPacker packer = MessagePack.newDefaultBufferPacker()) { + // Object를 JSON으로 변환 후 MessagePack으로 직렬화 + packer.packString(OBJECT_MAPPER.writeValueAsString(data)); + return packer.toByteArray(); + } catch (Exception e) { + throw new RuntimeException("Failed to serialize object to MessagePack", e); + } + + } +} diff --git a/bp-kafka-event-consumer/src/main/java/com/beautify_project/bp_kafka_event_consumer/config/KafkaConsumerConfig.java b/bp-kafka-event-consumer/src/main/java/com/beautify_project/bp_kafka_event_consumer/config/KafkaConsumerConfig.java index cccd2a5..d910651 100644 --- a/bp-kafka-event-consumer/src/main/java/com/beautify_project/bp_kafka_event_consumer/config/KafkaConsumerConfig.java +++ b/bp-kafka-event-consumer/src/main/java/com/beautify_project/bp_kafka_event_consumer/config/KafkaConsumerConfig.java @@ -4,9 +4,12 @@ import com.beuatify_project.bp_common.event.ShopLikeCancelEvent; import com.beuatify_project.bp_common.event.ShopLikeEvent; import com.beuatify_project.bp_common.event.SignUpCertificationMailEvent; +import com.beuatify_project.bp_common.serializer.MessagePackDeserializer; import java.util.Map; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -14,6 +17,7 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; @@ -21,9 +25,10 @@ @EnableKafka @Configuration @RequiredArgsConstructor +@Slf4j public class KafkaConsumerConfig { - private static final String TRUSTED_PACKAGES = "com.beautify_project.bp_kafka_event_consumer.event"; + private static final String TRUSTED_PACKAGES = "com.beuatify_project.bp_common.event"; private static final String TOPIC_CONFIG_NAME_SHOP_LIKE_EVENT = "SHOP-LIKE-EVENT"; private static final String TOPIC_CONFIG_NAME_SHOP_LIKE_CANCEL_EVENT = "SHOP-LIKE-CANCEL-EVENT"; private static final String TOPIC_CONFIG_NAME_SIGNUP_CERTIFICATION_MAIL_EVENT = "MAIL-SIGN-UP-CERTIFICATION-EVENT"; @@ -38,12 +43,10 @@ public Map shopLikeEventConsumerConfig() { configProperties.getTopic().get(TOPIC_CONFIG_NAME_SHOP_LIKE_EVENT).getGroupId(), ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class, - // 역직렬화 실패 무한 로그 방지 - ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class, + ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class, // 역직렬화 실패 무한 로그 방지 - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class, - // 역직렬화 실패 무한 로그 방지 - ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MessagePackDeserializer.class, + ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class, // 역직렬화 실패 무한 로그 방지 ConsumerConfig.MAX_POLL_RECORDS_CONFIG, configProperties.getTopic().get(TOPIC_CONFIG_NAME_SHOP_LIKE_EVENT).getBatchSize(), @@ -56,8 +59,8 @@ public Map shopLikeEventConsumerConfig() { @Bean(name = "shopLikeEventConsumerFactory") public ConsumerFactory shopLikeEventConsumerFactory() { // 들어오는 record 를 객체로 받기 위한 deserializer - final JsonDeserializer deserializer = new JsonDeserializer<>( - ShopLikeEvent.class, false); + final MessagePackDeserializer deserializer = new MessagePackDeserializer<>( + ShopLikeEvent.class); return new DefaultKafkaConsumerFactory<>(shopLikeEventConsumerConfig(), new StringDeserializer(), deserializer); @@ -68,6 +71,14 @@ public ConcurrentKafkaListenerContainerFactory shopLikeEv ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(shopLikeEventConsumerFactory()); factory.setBatchListener(true); + factory.setCommonErrorHandler(new DefaultErrorHandler((record, exception) -> { + if (exception instanceof SerializationException || exception instanceof IllegalStateException) { + log.error( + "Skip event due to deserialization error: topic - {} | partition - {} | value - {}", + record.topic(), record.partition(), record.value()); + // TODO: 별도의 큐 처리 또는 추가 로직으로 처리 필요 + } + })); return factory; }