-
Notifications
You must be signed in to change notification settings - Fork 74
/
Copy pathRedisChatMessagePublisher.java
60 lines (52 loc) · 2.35 KB
/
RedisChatMessagePublisher.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.github.rawsanj.messaging;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.rawsanj.model.ChatMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.support.atomic.RedisAtomicInteger;
import org.springframework.data.redis.support.atomic.RedisAtomicLong;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import java.net.InetAddress;
import java.net.UnknownHostException;
import static com.github.rawsanj.config.ChatConstants.MESSAGE_TOPIC;
@Component
@Slf4j
public class RedisChatMessagePublisher {
private final ReactiveStringRedisTemplate reactiveStringRedisTemplate;
private final RedisAtomicInteger chatMessageCounter;
private final RedisAtomicLong activeUserCounter;
private final ObjectMapper objectMapper;
public RedisChatMessagePublisher(ReactiveStringRedisTemplate reactiveStringRedisTemplate, RedisAtomicInteger chatMessageCounter, RedisAtomicLong activeUserCounter, ObjectMapper objectMapper) {
this.reactiveStringRedisTemplate = reactiveStringRedisTemplate;
this.chatMessageCounter = chatMessageCounter;
this.activeUserCounter = activeUserCounter;
this.objectMapper = objectMapper;
}
public Mono<Long> publishChatMessage(String message) {
Integer totalChatMessage = chatMessageCounter.incrementAndGet();
return Mono.fromCallable(() -> {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
log.error("Error getting hostname.", e);
}
return "localhost";
}).map(hostName -> {
ChatMessage chatMessage = new ChatMessage(totalChatMessage, message, hostName, activeUserCounter.get());
String chatString = "EMPTY_MESSAGE";
try {
chatString = objectMapper.writeValueAsString(chatMessage);
} catch (JsonProcessingException e) {
log.error("Error converting ChatMessage {} into string", chatMessage, e);
}
return chatString;
}).flatMap(chatString -> {
// Publish Message to Redis Channels
return reactiveStringRedisTemplate.convertAndSend(MESSAGE_TOPIC, chatString)
.doOnSuccess(aLong -> log.debug("Total of {} Messages published to Redis Topic.", totalChatMessage))
.doOnError(throwable -> log.error("Error publishing message.", throwable));
});
}
}