diff --git a/order-service/src/main/java/by/devtools/order/service/impl/KafkaProducer.java b/order-service/src/main/java/by/devtools/order/service/impl/KafkaProducer.java new file mode 100644 index 0000000..146a4fa --- /dev/null +++ b/order-service/src/main/java/by/devtools/order/service/impl/KafkaProducer.java @@ -0,0 +1,30 @@ +package by.devtools.order.service.impl; + +import by.devtools.order.util.JsonUtil; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.stereotype.Service; + +import java.util.concurrent.CompletableFuture; + +@Slf4j +@Service +@RequiredArgsConstructor +public class KafkaProducer { + + private final KafkaTemplate kafkaTemplate; + + public void sendMessage(String topic, T payload) { + CompletableFuture> future = kafkaTemplate.send(topic, JsonUtil.toJson(payload)); + future.whenComplete((result, ex) -> { + if (ex == null) { + log.info("Successfully sent message={} to topic={}", payload, topic); + } else { + log.warn("Error occurred when sending message={} to topic={}. Exception message: {} ", + payload, topic, ex.getMessage()); + } + }); + } +} diff --git a/order-service/src/main/java/by/devtools/order/service/impl/OrderServiceImpl.java b/order-service/src/main/java/by/devtools/order/service/impl/OrderServiceImpl.java index d626e9b..def748a 100644 --- a/order-service/src/main/java/by/devtools/order/service/impl/OrderServiceImpl.java +++ b/order-service/src/main/java/by/devtools/order/service/impl/OrderServiceImpl.java @@ -8,7 +8,6 @@ import by.devtools.order.model.Order; import by.devtools.order.repository.OrderRepository; import by.devtools.order.service.OrderService; -import by.devtools.order.util.JsonUtil; import by.devtools.order.util.ServiceNames; import lombok.RequiredArgsConstructor; import org.springframework.kafka.core.KafkaTemplate; @@ -23,6 +22,7 @@ public class OrderServiceImpl implements OrderService { private final OrderMapper orderMapper; + private final KafkaProducer kafkaProducer; private final KafkaTemplate kafkaTemplate; private final OrderRepository orderRepository; @@ -91,7 +91,7 @@ private boolean allServicesStatusesAccepted(Order order) { private void sendRollback(Order order) { OrderDto orderDto = orderMapper.orderToDto(order); - kafkaTemplate.send("rollback-topic", JsonUtil.toJson(orderDto)); + kafkaProducer.sendMessage("rollback-topic", orderDto); } /** diff --git a/order-service/src/test/java/by/devtools/order/service/impl/OrderServiceImplTest.java b/order-service/src/test/java/by/devtools/order/service/impl/OrderServiceImplTest.java index 081e216..60e886c 100644 --- a/order-service/src/test/java/by/devtools/order/service/impl/OrderServiceImplTest.java +++ b/order-service/src/test/java/by/devtools/order/service/impl/OrderServiceImplTest.java @@ -16,7 +16,6 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.springframework.kafka.core.KafkaTemplate; import java.util.Optional; @@ -34,7 +33,7 @@ class OrderServiceImplTest { private OrderMapper orderMapper; @Mock - private KafkaTemplate kafkaTemplate; + private KafkaProducer kafkaProducer; @Mock private OrderRepository orderRepository; @@ -257,7 +256,7 @@ void check_processResultEvent_forBothRejected_should_sendRollbackToKafka() { verify(orderRepository).findById(TestData.ID); verify(orderRepository).save(orderCapture.capture()); - verify(kafkaTemplate).send(eq("rollback-topic"), any()); + verify(kafkaProducer).sendMessage(eq("rollback-topic"), any()); assertThat(orderCapture.getValue()).isEqualTo(expectedOrderToSave); } @@ -283,7 +282,7 @@ void check_processResultEvent_forAcceptedInventory_should_sendRollbackToKafka() verify(orderRepository).findById(TestData.ID); verify(orderRepository).save(orderCapture.capture()); - verify(kafkaTemplate).send(eq("rollback-topic"), any()); + verify(kafkaProducer).sendMessage(eq("rollback-topic"), any()); assertThat(orderCapture.getValue()).isEqualTo(expectedOrderToSave); }