Skip to content

Commit

Permalink
implemented changes to allow MapResource to consume events directly f…
Browse files Browse the repository at this point in the history
…rom topic and dispatch to websocket endpoint
  • Loading branch information
my3sons authored and gunnarmorling committed Nov 14, 2020
1 parent 2c434f3 commit f8eb772
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 9 deletions.
27 changes: 23 additions & 4 deletions temperature-map/src/main/java/eu/javaland/knative/MapResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@

import io.quarkus.runtime.StartupEvent;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Incoming;

@Path("/")
public class MapResource {

@ConfigProperty(name = "map.read.from.topic", defaultValue = "false")
boolean readFromTopic;

@Inject
MapEndpoint websocketEndpoint;

Expand All @@ -38,6 +44,22 @@ public Response newMeasurement(String request) {

JsonReader reader = Json.createReader(new StringReader(request));
JsonObject measurement = reader.readObject();

websocketEndpoint.onMeasurement(initializeMeasurementFromJson(measurement));

return Response.ok().build();
}

@Incoming("temperature-values")
public void onMeasurement(JsonObject measurement) {
if (!readFromTopic) {
return;
}

websocketEndpoint.onMeasurement(initializeMeasurementFromJson(measurement));
}

private Measurement initializeMeasurementFromJson(JsonObject measurement) {
Measurement temperatureMeasurement = new Measurement();

temperatureMeasurement.stationId = measurement.getInt("stationId");
Expand All @@ -51,10 +73,7 @@ public Response newMeasurement(String request) {
temperatureMeasurement.ts = measurement.getString("ts");
temperatureMeasurement.value = measurement.getJsonNumber("value").doubleValue();
temperatureMeasurement.icon = measurement.getString("icon");

websocketEndpoint.onMeasurement(temperatureMeasurement);

return Response.ok().build();
return temperatureMeasurement;
}

public void onStartup(@Observes StartupEvent se) {
Expand Down
10 changes: 5 additions & 5 deletions temperature-map/src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# mp.messaging.incoming.temperature-values.connector=smallrye-kafka
# mp.messaging.incoming.temperature-values.topic=temperature-values-enriched
# mp.messaging.incoming.temperature-values.key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
# mp.messaging.incoming.temperature-values.value.deserializer=eu.javaland.knative.JsonpDeserializer
# mp.messaging.incoming.temperature-values.auto.offset.reset=latest
mp.messaging.incoming.temperature-values.connector=smallrye-kafka
mp.messaging.incoming.temperature-values.topic=temperature-values-enriched
mp.messaging.incoming.temperature-values.key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
mp.messaging.incoming.temperature-values.value.deserializer=eu.javaland.knative.JsonpDeserializer
mp.messaging.incoming.temperature-values.auto.offset.reset=latest

0 comments on commit f8eb772

Please sign in to comment.