From f8eb7729bdf5fb0e8e4b336b27e185778f78040b Mon Sep 17 00:00:00 2001 From: "carey.a.boldenow" Date: Sat, 14 Nov 2020 15:17:15 -0600 Subject: [PATCH] implemented changes to allow MapResource to consume events directly from topic and dispatch to websocket endpoint --- .../java/eu/javaland/knative/MapResource.java | 27 ++++++++++++++++--- .../src/main/resources/application.properties | 10 +++---- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/temperature-map/src/main/java/eu/javaland/knative/MapResource.java b/temperature-map/src/main/java/eu/javaland/knative/MapResource.java index b47dbfb..18e57e0 100644 --- a/temperature-map/src/main/java/eu/javaland/knative/MapResource.java +++ b/temperature-map/src/main/java/eu/javaland/knative/MapResource.java @@ -18,9 +18,15 @@ import io.quarkus.runtime.StartupEvent; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.eclipse.microprofile.reactive.messaging.Incoming; + @Path("/") public class MapResource { + @ConfigProperty(name = "map.read.from.topic", defaultValue = "false") + boolean readFromTopic; + @Inject MapEndpoint websocketEndpoint; @@ -38,6 +44,22 @@ public Response newMeasurement(String request) { JsonReader reader = Json.createReader(new StringReader(request)); JsonObject measurement = reader.readObject(); + + websocketEndpoint.onMeasurement(initializeMeasurementFromJson(measurement)); + + return Response.ok().build(); + } + + @Incoming("temperature-values") + public void onMeasurement(JsonObject measurement) { + if (!readFromTopic) { + return; + } + + websocketEndpoint.onMeasurement(initializeMeasurementFromJson(measurement)); + } + + private Measurement initializeMeasurementFromJson(JsonObject measurement) { Measurement temperatureMeasurement = new Measurement(); temperatureMeasurement.stationId = measurement.getInt("stationId"); @@ -51,10 +73,7 @@ public Response newMeasurement(String request) { temperatureMeasurement.ts = measurement.getString("ts"); temperatureMeasurement.value = measurement.getJsonNumber("value").doubleValue(); temperatureMeasurement.icon = measurement.getString("icon"); - - websocketEndpoint.onMeasurement(temperatureMeasurement); - - return Response.ok().build(); + return temperatureMeasurement; } public void onStartup(@Observes StartupEvent se) { diff --git a/temperature-map/src/main/resources/application.properties b/temperature-map/src/main/resources/application.properties index 36983f3..6b5b1be 100644 --- a/temperature-map/src/main/resources/application.properties +++ b/temperature-map/src/main/resources/application.properties @@ -1,5 +1,5 @@ -# mp.messaging.incoming.temperature-values.connector=smallrye-kafka -# mp.messaging.incoming.temperature-values.topic=temperature-values-enriched -# mp.messaging.incoming.temperature-values.key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer -# mp.messaging.incoming.temperature-values.value.deserializer=eu.javaland.knative.JsonpDeserializer -# mp.messaging.incoming.temperature-values.auto.offset.reset=latest +mp.messaging.incoming.temperature-values.connector=smallrye-kafka +mp.messaging.incoming.temperature-values.topic=temperature-values-enriched +mp.messaging.incoming.temperature-values.key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer +mp.messaging.incoming.temperature-values.value.deserializer=eu.javaland.knative.JsonpDeserializer +mp.messaging.incoming.temperature-values.auto.offset.reset=latest