Skip to content

Commit

Permalink
Add json even deserialization to websockets
Browse files Browse the repository at this point in the history
  • Loading branch information
dvdgeisler committed Nov 29, 2022
1 parent 832879e commit 3c4de79
Showing 1 changed file with 17 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package de.dvdgeisler.iot.dirigera.client.api.http;

import com.fasterxml.jackson.databind.ObjectMapper;
import de.dvdgeisler.iot.dirigera.client.api.model.Home;
import de.dvdgeisler.iot.dirigera.client.api.model.events.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import org.springframework.web.reactive.socket.client.WebSocketClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;

Expand All @@ -25,6 +29,7 @@ public class ClientApi extends AbstractClientApi {
private final static Logger log = LoggerFactory.getLogger(ClientApi.class);
private final String hostname;
private final short port;
private final ObjectMapper objectMapper;

public final ClientDeviceApi device;
public final ClientDeviceSetApi deviceSet;
Expand All @@ -41,6 +46,7 @@ public ClientApi(
@Value("${dirigera.hostname}") final String hostname,
@Value("${dirigera.port:8443}") final short port,
final TokenStore tokenStore,
final ObjectMapper objectMapper,
final ClientDeviceApi device,
final ClientDeviceSetApi deviceSet,
final ClientGatewayApi gateway,
Expand All @@ -55,6 +61,7 @@ public ClientApi(
super(String.format("https://%s:%d/v1/", hostname, port), tokenStore);
this.hostname = hostname;
this.port = port;
this.objectMapper = objectMapper;
this.device = device;
this.deviceSet = deviceSet;
this.gateway = gateway;
Expand Down Expand Up @@ -89,14 +96,13 @@ public Mono<Map> dump() {
.bodyToMono(Map.class);
}

public Mono<Void> websocket(final Consumer<String> consumer) {
public Mono<Void> websocket(final Consumer<Event> consumer) {
final String token;
final String authorizationHeader;
final HttpClient httpClient;
final WebSocketClient client;



try {
token = this.tokenStore.getAccessToken();
authorizationHeader = String.format("Bearer %s", token);
Expand All @@ -106,7 +112,15 @@ public Mono<Void> websocket(final Consumer<String> consumer) {
client = new ReactorNettyWebSocketClient(httpClient);
return client.execute(URI.create(String.format("https://%s:%d/v1/", this.hostname, this.port)), session ->
session.receive()
.map(WebSocketMessage::getPayloadAsText)
.map(WebSocketMessage::getPayload)
.map(DataBuffer::asInputStream)
.flatMap(i -> {
try {
return Flux.just(this.objectMapper.readValue(i, Event.class));
} catch (IOException e) {
return Flux.error(e);
}
})
.doOnNext(consumer)
.repeat()
.then()
Expand Down

0 comments on commit 3c4de79

Please sign in to comment.