From 3c4de795700fde7aa134de16164f116248d68fb3 Mon Sep 17 00:00:00 2001 From: David Geisler Date: Tue, 29 Nov 2022 02:40:06 +0100 Subject: [PATCH] Add json even deserialization to websockets --- .../dirigera/client/api/http/ClientApi.java | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/dirigera-client-api/src/main/java/de/dvdgeisler/iot/dirigera/client/api/http/ClientApi.java b/dirigera-client-api/src/main/java/de/dvdgeisler/iot/dirigera/client/api/http/ClientApi.java index fc88093..4180bbd 100644 --- a/dirigera-client-api/src/main/java/de/dvdgeisler/iot/dirigera/client/api/http/ClientApi.java +++ b/dirigera-client-api/src/main/java/de/dvdgeisler/iot/dirigera/client/api/http/ClientApi.java @@ -1,9 +1,12 @@ package de.dvdgeisler.iot.dirigera.client.api.http; +import com.fasterxml.jackson.databind.ObjectMapper; import de.dvdgeisler.iot.dirigera.client.api.model.Home; +import de.dvdgeisler.iot.dirigera.client.api.model.events.Event; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; @@ -11,6 +14,7 @@ import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient; import org.springframework.web.reactive.socket.client.WebSocketClient; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; @@ -25,6 +29,7 @@ public class ClientApi extends AbstractClientApi { private final static Logger log = LoggerFactory.getLogger(ClientApi.class); private final String hostname; private final short port; + private final ObjectMapper objectMapper; public final ClientDeviceApi device; public final ClientDeviceSetApi deviceSet; @@ -41,6 +46,7 @@ public ClientApi( @Value("${dirigera.hostname}") final String hostname, @Value("${dirigera.port:8443}") final short port, final TokenStore tokenStore, + final ObjectMapper objectMapper, final ClientDeviceApi device, final ClientDeviceSetApi deviceSet, final ClientGatewayApi gateway, @@ -55,6 +61,7 @@ public ClientApi( super(String.format("https://%s:%d/v1/", hostname, port), tokenStore); this.hostname = hostname; this.port = port; + this.objectMapper = objectMapper; this.device = device; this.deviceSet = deviceSet; this.gateway = gateway; @@ -89,14 +96,13 @@ public Mono dump() { .bodyToMono(Map.class); } - public Mono websocket(final Consumer consumer) { + public Mono websocket(final Consumer consumer) { final String token; final String authorizationHeader; final HttpClient httpClient; final WebSocketClient client; - try { token = this.tokenStore.getAccessToken(); authorizationHeader = String.format("Bearer %s", token); @@ -106,7 +112,15 @@ public Mono websocket(final Consumer consumer) { client = new ReactorNettyWebSocketClient(httpClient); return client.execute(URI.create(String.format("https://%s:%d/v1/", this.hostname, this.port)), session -> session.receive() - .map(WebSocketMessage::getPayloadAsText) + .map(WebSocketMessage::getPayload) + .map(DataBuffer::asInputStream) + .flatMap(i -> { + try { + return Flux.just(this.objectMapper.readValue(i, Event.class)); + } catch (IOException e) { + return Flux.error(e); + } + }) .doOnNext(consumer) .repeat() .then()