Skip to content

Commit

Permalink
refactor: add separate class for send messages to kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
KiryaHandsome committed Oct 22, 2023
1 parent 2818aec commit 1566de2
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package by.devtools.order.service.impl;

import by.devtools.order.util.JsonUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;

@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaProducer {

private final KafkaTemplate<Integer, String> kafkaTemplate;

public <T> void sendMessage(String topic, T payload) {
CompletableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, JsonUtil.toJson(payload));
future.whenComplete((result, ex) -> {
if (ex == null) {
log.info("Successfully sent message={} to topic={}", payload, topic);
} else {
log.warn("Error occurred when sending message={} to topic={}. Exception message: {} ",
payload, topic, ex.getMessage());
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import by.devtools.order.model.Order;
import by.devtools.order.repository.OrderRepository;
import by.devtools.order.service.OrderService;
import by.devtools.order.util.JsonUtil;
import by.devtools.order.util.ServiceNames;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
Expand All @@ -23,6 +22,7 @@
public class OrderServiceImpl implements OrderService {

private final OrderMapper orderMapper;
private final KafkaProducer kafkaProducer;
private final KafkaTemplate<Integer, String> kafkaTemplate;
private final OrderRepository orderRepository;

Expand Down Expand Up @@ -91,7 +91,7 @@ private boolean allServicesStatusesAccepted(Order order) {

private void sendRollback(Order order) {
OrderDto orderDto = orderMapper.orderToDto(order);
kafkaTemplate.send("rollback-topic", JsonUtil.toJson(orderDto));
kafkaProducer.sendMessage("rollback-topic", orderDto);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.kafka.core.KafkaTemplate;

import java.util.Optional;

Expand All @@ -34,7 +33,7 @@ class OrderServiceImplTest {
private OrderMapper orderMapper;

@Mock
private KafkaTemplate<Integer, String> kafkaTemplate;
private KafkaProducer kafkaProducer;

@Mock
private OrderRepository orderRepository;
Expand Down Expand Up @@ -257,7 +256,7 @@ void check_processResultEvent_forBothRejected_should_sendRollbackToKafka() {

verify(orderRepository).findById(TestData.ID);
verify(orderRepository).save(orderCapture.capture());
verify(kafkaTemplate).send(eq("rollback-topic"), any());
verify(kafkaProducer).sendMessage(eq("rollback-topic"), any());

assertThat(orderCapture.getValue()).isEqualTo(expectedOrderToSave);
}
Expand All @@ -283,7 +282,7 @@ void check_processResultEvent_forAcceptedInventory_should_sendRollbackToKafka()

verify(orderRepository).findById(TestData.ID);
verify(orderRepository).save(orderCapture.capture());
verify(kafkaTemplate).send(eq("rollback-topic"), any());
verify(kafkaProducer).sendMessage(eq("rollback-topic"), any());

assertThat(orderCapture.getValue()).isEqualTo(expectedOrderToSave);
}
Expand Down

0 comments on commit 1566de2

Please sign in to comment.