From 6f6691e7459ab8891a4afa3f96e9365ece4b7135 Mon Sep 17 00:00:00 2001 From: Matej Vasek Date: Wed, 9 Dec 2020 22:27:46 +0100 Subject: [PATCH] Expose CloudEvents Signed-off-by: Matej Vasek --- .../org/acme/funqy/cloudevent/FunqyTest.java | 1 + .../io/quarkus/funqy/test/BinaryPayload.java | 19 + .../quarkus/funqy/test/BinaryPayloadTest.java | 79 ++++ .../funqy/test/ExposedCloudEventTest.java | 210 +++++++++ .../funqy/test/ExposedCloudEvents.java | 119 +++++ .../io/quarkus/funqy/test/MappingTest.java | 6 + .../knative/events/AbstractCloudEvent.java | 19 + .../funqy/knative/events/CloudEvent.java | 17 +- .../knative/events/CloudEventBuilder.java | 198 ++++++++ .../knative/events/HeaderCloudEventImpl.java | 139 +++++- .../knative/events/JsonCloudEventImpl.java | 181 ++++++- .../events/KnativeEventsBindingRecorder.java | 46 +- .../knative/events/VertxRequestHandler.java | 446 ++++++++++++------ .../funqy/runtime/FunctionInvoker.java | 2 +- 14 files changed, 1307 insertions(+), 175 deletions(-) create mode 100644 extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/BinaryPayload.java create mode 100644 extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/BinaryPayloadTest.java create mode 100644 extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/ExposedCloudEventTest.java create mode 100644 extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/ExposedCloudEvents.java create mode 100644 extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/knative/events/AbstractCloudEvent.java create mode 100644 extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/knative/events/CloudEventBuilder.java diff --git a/devtools/platform-descriptor-json/src/main/resources/codestarts/quarkus/singleton-examples/funqy-knative-events-example/java/src/test/java/org/acme/funqy/cloudevent/FunqyTest.java b/devtools/platform-descriptor-json/src/main/resources/codestarts/quarkus/singleton-examples/funqy-knative-events-example/java/src/test/java/org/acme/funqy/cloudevent/FunqyTest.java index d8c0d6595ce38..38b42eacc8236 100644 --- a/devtools/platform-descriptor-json/src/main/resources/codestarts/quarkus/singleton-examples/funqy-knative-events-example/java/src/test/java/org/acme/funqy/cloudevent/FunqyTest.java +++ b/devtools/platform-descriptor-json/src/main/resources/codestarts/quarkus/singleton-examples/funqy-knative-events-example/java/src/test/java/org/acme/funqy/cloudevent/FunqyTest.java @@ -17,6 +17,7 @@ public class FunqyTest { @Test public void testCloudEvent() { RestAssured.given().contentType("application/json") + .header("ce-specversion", "1.0") .header("ce-id", UUID.randomUUID().toString()) .header("ce-type", "myCloudEventGreeting") .header("ce-source", "test") diff --git a/extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/BinaryPayload.java b/extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/BinaryPayload.java new file mode 100644 index 0000000000000..37e8c0ccfab79 --- /dev/null +++ b/extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/BinaryPayload.java @@ -0,0 +1,19 @@ +package io.quarkus.funqy.test; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import io.quarkus.funqy.Funq; +import io.quarkus.funqy.knative.events.CloudEventMapping; + +public class BinaryPayload { + + @Funq + @CloudEventMapping(trigger = "test-type") + public byte[] doubleInt32BE(byte[] data) { + int i = ByteBuffer.wrap(data).order(ByteOrder.BIG_ENDIAN).getInt(); + byte[] result = new byte[4]; + ByteBuffer.wrap(result).order(ByteOrder.BIG_ENDIAN).putInt(i * 2); + return result; + } +} diff --git a/extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/BinaryPayloadTest.java b/extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/BinaryPayloadTest.java new file mode 100644 index 0000000000000..5dc4b26e11e48 --- /dev/null +++ b/extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/BinaryPayloadTest.java @@ -0,0 +1,79 @@ +package io.quarkus.funqy.test; + +import static org.hamcrest.Matchers.equalTo; + +import java.util.Base64; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class BinaryPayloadTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) + .addClasses(BinaryPayload.class)); + + private static final byte[] IN = new byte[] { 0x12, 0x34, 0x56, 0x78 }; + private static final String IN_AS_BASE_64 = Base64.getEncoder().encodeToString(IN); + private static final byte[] OUT = new byte[] { 0x24, 0x68, (byte) 0xAC, (byte) 0xf0 }; + private static final String OUT_AS_BASE_64 = Base64.getEncoder().encodeToString(OUT); + + @Test + void testBinaryEncoding() { + + byte[] ba = RestAssured.given().contentType("application/octet-stream") + .header("ce-specversion", "1.0") + .header("ce-id", "test-id") + .header("ce-type", "test-type") + .header("ce-source", "test-source") + .body(IN) + .post("/") + .then() + .statusCode(200) + .extract().response().getBody().asByteArray(); + + Assertions.assertArrayEquals(OUT, ba); + } + + @Test + void testStructuredEncodingV1() { + RestAssured.given().contentType("application/cloudevents+json") + .body(STRUCTURED_ENCODED_EVENT_V1_1_BODY) + .post("/") + .then() + .statusCode(200) + .body("data_base64", equalTo(OUT_AS_BASE_64)); + } + + @Test + void testStructuredEncodingV03() { + RestAssured.given().contentType("application/cloudevents+json") + .body(STRUCTURED_ENCODED_EVENT_V03_BODY) + .post("/") + .then() + .statusCode(200) + .body("datacontentencoding", equalTo("base64")) + .body("data", equalTo(OUT_AS_BASE_64)); + } + + private static final String STRUCTURED_ENCODED_EVENT_V1_1_BODY = "{ \"id\" : \"test-id\", " + + " \"specversion\": \"1.1\", " + + " \"source\": \"test-source\", " + + " \"type\": \"test-type\", " + + " \"data_base64\": \"" + IN_AS_BASE_64 + "\" " + + "}"; + + private static final String STRUCTURED_ENCODED_EVENT_V03_BODY = "{ \"id\" : \"test-id\", " + + " \"specversion\": \"0.3\", " + + " \"source\": \"test-source\", " + + " \"type\": \"test-type\", " + + " \"datacontentencoding\": \"base64\", " + + " \"data\": \"" + IN_AS_BASE_64 + "\" " + + "}"; +} diff --git a/extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/ExposedCloudEventTest.java b/extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/ExposedCloudEventTest.java new file mode 100644 index 0000000000000..af7873a7438a3 --- /dev/null +++ b/extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/ExposedCloudEventTest.java @@ -0,0 +1,210 @@ +package io.quarkus.funqy.test; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; +import io.restassured.specification.RequestSpecification; + +public class ExposedCloudEventTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) + .addClasses(ExposedCloudEvents.class)); + + @Test + public void testVanillaHttp() { + // when a function handles CloudEvent explicitly, vanilla HTTP is considered to be a bad request. + RestAssured.given().contentType("application/json") + .body("{}") + .post("/doubleIt") + .then() + .statusCode(400); + } + + @Test + public void testCloudEventAttributeDefaultsForStructuredEncoding() { + String event = "{ \"id\" : \"test-id\", " + + " \"specversion\": \"1.0\", " + + " \"source\": \"test-source\", " + + " \"type\": \"test-defaults\" " + + "}"; + RestAssured.given().contentType("application/cloudevents+json") + .body(event) + .post("/") + .then() + .statusCode(200) + .body("specversion", equalTo("1.0")) + .body("id", notNullValue()) + .body("type", equalTo("default-type")) + .body("source", equalTo("default-source")); + } + + @Test + public void testCloudEventAttributeDefaultsForBinaryEncoding() { + RestAssured.given() + .header("ce-id", "test-id") + .header("ce-specversion", "1.0") + .header("ce-type", "test-defaults") + .header("ce-source", "test-source") + .post() + .then() + .statusCode(204) + .header("ce-specversion", equalTo("1.0")) + .header("ce-id", notNullValue()) + .header("ce-type", equalTo("default-type")) + .header("ce-source", equalTo("default-source")); + } + + @Test + public void testGenericInput() { + RestAssured.given().contentType("application/json") + .header("ce-id", "test-id") + .header("ce-specversion", "1.0") + .header("ce-type", "test-generics") + .header("ce-source", "test-source") + .body("[{\"i\" : 1}, {\"i\" : 2}, {\"i\" : 3}]") + .then() + .statusCode(200) + .body(equalTo("6")); + } + + @ParameterizedTest + @MethodSource("provideBinaryEncodingTestArgs") + public void testBinaryEncoding(Map headers, String specversion, String dataSchemaHdrName) { + + RequestSpecification req = RestAssured.given().contentType("application/json"); + + for (Map.Entry h : headers.entrySet()) { + req = req.header(h.getKey(), h.getValue()); + } + + req.body(BINARY_ENCODED_EVENT_BODY) + .post("/") + .then() + .statusCode(200) + .header("ce-specversion", equalTo(specversion)) + .header("ce-id", equalTo("double-it-id")) + .header("ce-type", equalTo("double-it-type")) + .header("ce-source", equalTo("/OfDoubleIt")) + .header(dataSchemaHdrName, equalTo("dataschema-server")) + .header("ce-extserver", equalTo("ext-server-val")) + .body("i", equalTo(42)) + .body("s", equalTo("abcabc")); + } + + @ParameterizedTest + @MethodSource("provideStructuredEncodingTestArgs") + public void testStructuredEncoding(String event, String specversion, String dataSchemaFieldName) { + RestAssured.given().contentType("application/cloudevents+json") + .body(event) + .post("/") + .then() + .statusCode(200) + .body("specversion", equalTo(specversion)) + .body("id", equalTo("double-it-id")) + .body("type", equalTo("double-it-type")) + .body("source", equalTo("/OfDoubleIt")) + .body(dataSchemaFieldName, equalTo("dataschema-server")) + .body("extserver", equalTo("ext-server-val")) + .body("data.i", equalTo(42)) + .body("data.s", equalTo("abcabc")); + } + + static { + Map common = new HashMap<>(); + common.put("ce-id", "test-id"); + common.put("ce-type", "test-type"); + common.put("ce-source", "/OfTest"); + common.put("ce-subject", "test-subj"); + common.put("ce-time", "2018-04-05T17:31:00Z"); + common.put("ce-extclient", "ext-client-val"); + + Map v1 = new HashMap<>(common); + v1.put("ce-specversion", "1.0"); + v1.put("ce-dataschema", "test-dataschema-client"); + BINARY_ENCODED_EVENT_V1_HEADERS = Collections.unmodifiableMap(v1); + + Map v1_1 = new HashMap<>(common); + v1_1.put("ce-specversion", "1.1"); + v1_1.put("ce-dataschema", "test-dataschema-client"); + BINARY_ENCODED_EVENT_V1_1_HEADERS = Collections.unmodifiableMap(v1_1); + + Map v03 = new HashMap<>(common); + v03.put("ce-specversion", "0.3"); + v03.put("ce-schemaurl", "test-dataschema-client"); + BINARY_ENCODED_EVENT_V03_HEADERS = Collections.unmodifiableMap(v03); + } + + public static final Map BINARY_ENCODED_EVENT_V1_HEADERS; + public static final Map BINARY_ENCODED_EVENT_V1_1_HEADERS; + public static final Map BINARY_ENCODED_EVENT_V03_HEADERS; + + private static Stream provideBinaryEncodingTestArgs() { + return Stream. builder() + .add(Arguments.arguments(BINARY_ENCODED_EVENT_V1_HEADERS, "1.0", "ce-dataschema")) + .add(Arguments.arguments(BINARY_ENCODED_EVENT_V1_1_HEADERS, "1.1", "ce-dataschema")) + .add(Arguments.arguments(BINARY_ENCODED_EVENT_V03_HEADERS, "0.3", "ce-schemaurl")) + .build(); + } + + public static final String BINARY_ENCODED_EVENT_BODY = " { \"i\" : 21, \"s\" : \"abc\" } "; + + static final String STRUCTURED_ENCODED_EVENT_V1_BODY = "{ \"id\" : \"test-id\", " + + " \"specversion\": \"1.0\", " + + " \"source\": \"/OfTest\", " + + " \"subject\": \"test-subj\", " + + " \"time\": \"2018-04-05T17:31:00Z\", " + + " \"type\": \"test-type\", " + + " \"extclient\": \"ext-client-val\", " + + " \"dataschema\": \"test-dataschema-client\", " + + " \"datacontenttype\": \"application/json\", " + + " \"data\": { \"i\" : 21, \"s\" : \"abc\" } " + + "}"; + + static final String STRUCTURED_ENCODED_EVENT_V1_1_BODY = "{ \"id\" : \"test-id\", " + + " \"specversion\": \"1.1\", " + + " \"source\": \"/OfTest\", " + + " \"subject\": \"test-subj\", " + + " \"time\": \"2018-04-05T17:31:00Z\", " + + " \"type\": \"test-type\", " + + " \"extclient\": \"ext-client-val\", " + + " \"dataschema\": \"test-dataschema-client\", " + + " \"datacontenttype\": \"application/json\", " + + " \"data\": { \"i\" : 21, \"s\" : \"abc\" } " + + "}"; + + static final String STRUCTURED_ENCODED_EVENT_V03_BODY = "{ \"id\" : \"test-id\", " + + " \"specversion\": \"0.3\", " + + " \"source\": \"/OfTest\", " + + " \"subject\": \"test-subj\", " + + " \"time\": \"2018-04-05T17:31:00Z\", " + + " \"type\": \"test-type\", " + + " \"extclient\": \"ext-client-val\", " + + " \"schemaurl\": \"test-dataschema-client\", " + + " \"datacontenttype\": \"application/json\", " + + " \"data\": { \"i\" : 21, \"s\" : \"abc\" } " + + "}"; + + private static Stream provideStructuredEncodingTestArgs() { + return Stream. builder() + .add(Arguments.arguments(STRUCTURED_ENCODED_EVENT_V1_BODY, "1.0", "dataschema")) + .add(Arguments.arguments(STRUCTURED_ENCODED_EVENT_V1_1_BODY, "1.1", "dataschema")) + .add(Arguments.arguments(STRUCTURED_ENCODED_EVENT_V03_BODY, "0.3", "schemaurl")) + .build(); + } +} diff --git a/extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/ExposedCloudEvents.java b/extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/ExposedCloudEvents.java new file mode 100644 index 0000000000000..b44e6ff9a6ad1 --- /dev/null +++ b/extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/ExposedCloudEvents.java @@ -0,0 +1,119 @@ +package io.quarkus.funqy.test; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import io.quarkus.funqy.Funq; +import io.quarkus.funqy.knative.events.CloudEvent; +import io.quarkus.funqy.knative.events.CloudEventBuilder; +import io.quarkus.funqy.knative.events.CloudEventMapping; + +public class ExposedCloudEvents { + + @Funq + @CloudEventMapping(trigger = "test-type") + public CloudEvent doubleIt(CloudEvent event) { + + if (event == null) + throw new RuntimeException("Event is null!"); + if (!event.id().equals("test-id")) + throw new RuntimeException("Bad id!"); + if (!event.source().equals("/OfTest")) + throw new RuntimeException("Bad source!"); + if (!event.type().equals("test-type")) + throw new RuntimeException("Bad type!"); + if (!event.subject().equals("test-subj")) + throw new RuntimeException("Bad subject!"); + if (!event.dataSchema().equals("test-dataschema-client")) + throw new RuntimeException("Bad dataschema!"); + if (!event.extensions().equals(Collections.singletonMap("extclient", "ext-client-val"))) + throw new RuntimeException("Bad extensions!"); + if (event.time() == null) + throw new RuntimeException("Bad time!"); + + TestBean inBean = event.data(); + return CloudEventBuilder.create() + .specVersion(event.specVersion()) + .id("double-it-id") + .type("double-it-type") + .source("/OfDoubleIt") + .extensions(Collections.singletonMap("extserver", "ext-server-val")) + .dataSchema("dataschema-server") + .build(new TestBean(inBean.getI() * 2, inBean.getS() + inBean.getS())); + } + + @Funq + @CloudEventMapping(trigger = "test-defaults", responseSource = "default-source", responseType = "default-type") + public CloudEvent withDefaults(CloudEvent event) { + return CloudEventBuilder.create().build(); + } + + @Funq + @CloudEventMapping(trigger = "test-generics") + public CloudEvent sum(CloudEvent> event) { + Integer data = event.data().stream() + .map(TestBean::getI) + .reduce((a, b) -> a + b) + .orElse(0); + + return CloudEventBuilder.create() + .specVersion(event.specVersion()) + .id("test-sum-id") + .type("test-sum-type") + .build(data); + } + + public static class TestBean implements Serializable { + private int i; + private String s; + + public TestBean() { + } + + public TestBean(int i, String s) { + this.i = i; + this.s = s; + } + + public int getI() { + return i; + } + + public void setI(int i) { + this.i = i; + } + + public String getS() { + return s; + } + + public void setS(String s) { + this.s = s; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + TestBean testBean = (TestBean) o; + return i == testBean.i && Objects.equals(s, testBean.s); + } + + @Override + public int hashCode() { + return Objects.hash(s, i); + } + + @Override + public String toString() { + return "TestBean{" + + "i=" + i + + ", s='" + s + '\'' + + '}'; + } + } +} diff --git a/extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/MappingTest.java b/extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/MappingTest.java index 184f22956839f..d1ef5a25da77f 100644 --- a/extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/MappingTest.java +++ b/extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/MappingTest.java @@ -26,6 +26,7 @@ public class MappingTest { public void testMapping() { RestAssured.given().contentType("application/json") .header("ce-id", UUID.randomUUID().toString()) + .header("ce-specversion", "1.0") .header("ce-type", "tolower") .header("ce-source", "test") .body("\"HELLO\"") @@ -41,6 +42,7 @@ public void testMapping() { public void testAnnotatedMapping() { RestAssured.given().contentType("application/json") .header("ce-id", UUID.randomUUID().toString()) + .header("ce-specversion", "1.0") .header("ce-type", "echo") .header("ce-source", "test") .body("\"HELLO\"") @@ -59,6 +61,7 @@ public void testNoopDefaultMapping() { EncoderConfig.encoderConfig().appendDefaultContentCharsetToContentTypeIfUndefined(false))) .contentType("") .header("ce-id", UUID.randomUUID().toString()) + .header("ce-specversion", "1.0") .header("ce-type", "noop") .header("ce-source", "test") .post("/") @@ -75,6 +78,7 @@ public void testNoopDefaultMappingGet() { EncoderConfig.encoderConfig().appendDefaultContentCharsetToContentTypeIfUndefined(false))) .contentType("") .header("ce-id", UUID.randomUUID().toString()) + .header("ce-specversion", "1.0") .header("ce-type", "noop") .header("ce-source", "test") .get("/") @@ -101,6 +105,7 @@ public void testNoopGet() { public void testDefaultMapping() { RestAssured.given().contentType("application/json") .header("ce-id", UUID.randomUUID().toString()) + .header("ce-specversion", "1.0") .header("ce-type", "doubleIt") .header("ce-source", "test") .body("2") @@ -128,6 +133,7 @@ public void testHttp() { public void testNoTrigger() { RestAssured.given().contentType("application/json") .header("ce-id", UUID.randomUUID().toString()) + .header("ce-specversion", "1.0") .header("ce-type", "foo") .header("ce-source", "test") .body("2") diff --git a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/knative/events/AbstractCloudEvent.java b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/knative/events/AbstractCloudEvent.java new file mode 100644 index 0000000000000..43f3f49afebb9 --- /dev/null +++ b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/knative/events/AbstractCloudEvent.java @@ -0,0 +1,19 @@ +package io.quarkus.funqy.knative.events; + +public abstract class AbstractCloudEvent implements CloudEvent { + @Override + public String toString() { + return "CloudEvent{" + + "specVersion='" + specVersion() + '\'' + + ", id='" + id() + '\'' + + ", type='" + type() + '\'' + + ", source='" + source() + '\'' + + ", subject='" + subject() + '\'' + + ", time=" + time() + + ", extensions=" + extensions() + + ", dataSchema=" + dataSchema() + + ", dataContentType='" + dataContentType() + '\'' + + ", data=" + data() + + '}'; + } +} diff --git a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/knative/events/CloudEvent.java b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/knative/events/CloudEvent.java index cd96fb91fecdf..b0bba9bb74b95 100644 --- a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/knative/events/CloudEvent.java +++ b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/knative/events/CloudEvent.java @@ -1,19 +1,32 @@ package io.quarkus.funqy.knative.events; import java.time.OffsetDateTime; +import java.util.Map; /** - * Cloud event. Represents only the headers. No data. + * CloudEvent. * */ -public interface CloudEvent { +public interface CloudEvent { + String id(); String specVersion(); String source(); + String type(); + String subject(); OffsetDateTime time(); + + Map extensions(); + + String dataSchema(); + + String dataContentType(); + + T data(); + } diff --git a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/knative/events/CloudEventBuilder.java b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/knative/events/CloudEventBuilder.java new file mode 100644 index 0000000000000..52af4bd24bb3f --- /dev/null +++ b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/knative/events/CloudEventBuilder.java @@ -0,0 +1,198 @@ +package io.quarkus.funqy.knative.events; + +import java.time.OffsetDateTime; +import java.util.Collections; +import java.util.Map; + +public class CloudEventBuilder { + private String specVersion; + private String id; + private String type; + private String source; + private String subject; + private OffsetDateTime time; + private String dataSchema; + private Map extensions; + + private CloudEventBuilder() { + } + + public static CloudEventBuilder create() { + return new CloudEventBuilder(); + } + + public CloudEventBuilder specVersion(String specVersion) { + if ((specVersion.charAt(0) != '0' && specVersion.charAt(0) != '1') || specVersion.charAt(1) != '.') { + throw new IllegalArgumentException("Only supported major versions are 0 and 1."); + } + this.specVersion = specVersion; + return this; + } + + public CloudEventBuilder id(String id) { + this.id = id; + return this; + } + + public CloudEventBuilder type(String type) { + this.type = type; + return this; + } + + public CloudEventBuilder source(String source) { + this.source = source; + return this; + } + + public CloudEventBuilder subject(String subject) { + this.subject = subject; + return this; + } + + public CloudEventBuilder time(OffsetDateTime time) { + this.time = time; + return this; + } + + public CloudEventBuilder dataSchema(String dataSchema) { + this.dataSchema = dataSchema; + return this; + } + + public CloudEventBuilder extensions(Map extensions) { + this.extensions = extensions; + return this; + } + + public CloudEvent build(byte[] data, String dataContentType) { + + return new SimpleCloudEvent(specVersion, + id, + type, + source, + subject, + time, + extensions, + dataSchema, + dataContentType, + data); + } + + public CloudEvent build(T data) { + return new SimpleCloudEvent(specVersion, + id, + type, + source, + subject, + time, + extensions, + dataSchema, + "application/json", + data); + } + + public CloudEvent build() { + return new SimpleCloudEvent(specVersion, + id, + type, + source, + subject, + time, + extensions, + dataSchema, + null, + null); + } + + private static final class SimpleCloudEvent extends AbstractCloudEvent implements CloudEvent { + private final String specVersion; + private final String id; + private final String type; + private final String source; + private final String subject; + private final OffsetDateTime time; + private final Map extensions; + private final String dataSchema; + private final String dataContentType; + private final T data; + + SimpleCloudEvent(String specVersion, + String id, + String type, + String source, + String subject, + OffsetDateTime time, + Map extensions, + String dataSchema, + String dataContentType, + T data) { + + if (extensions == null) { + this.extensions = Collections.emptyMap(); + } else { + this.extensions = Collections.unmodifiableMap(extensions); + } + + this.specVersion = specVersion; + this.id = id; + this.type = type; + this.source = source; + this.subject = subject; + this.time = time; + this.dataSchema = dataSchema; + this.dataContentType = dataContentType; + this.data = data; + } + + @Override + public String id() { + return id; + } + + @Override + public String specVersion() { + return specVersion; + } + + @Override + public String source() { + return source; + } + + @Override + public String type() { + return type; + } + + @Override + public String subject() { + return subject; + } + + @Override + public OffsetDateTime time() { + return time; + } + + @Override + public String dataSchema() { + return dataSchema; + } + + @Override + public Map extensions() { + return extensions; + } + + @Override + public String dataContentType() { + return dataContentType; + } + + @Override + public T data() { + return data; + } + + } +} diff --git a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/HeaderCloudEventImpl.java b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/HeaderCloudEventImpl.java index f3b90e559587e..3aa6d13674700 100644 --- a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/HeaderCloudEventImpl.java +++ b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/HeaderCloudEventImpl.java @@ -1,27 +1,55 @@ package io.quarkus.funqy.runtime.bindings.knative.events; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.lang.reflect.Type; import java.time.OffsetDateTime; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; + +import io.quarkus.funqy.knative.events.AbstractCloudEvent; import io.quarkus.funqy.knative.events.CloudEvent; -import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; -class HeaderCloudEventImpl implements CloudEvent { +class HeaderCloudEventImpl extends AbstractCloudEvent implements CloudEvent { String id; String specVersion; String source; + String type; + String subject; OffsetDateTime time; + Map extensions; + + String dataSchema; + String dataContentType; + T data; - final HttpServerRequest request; + final MultiMap headers; + final Buffer buffer; + final Type dataType; + final ObjectMapper mapper; + private ObjectReader reader; - HeaderCloudEventImpl(HttpServerRequest request) { - this.request = request; + HeaderCloudEventImpl(MultiMap headers, Buffer buffer, Type dataType, ObjectMapper mapper, ObjectReader reader) { + this.headers = headers; + this.buffer = buffer; + this.dataType = dataType; + this.mapper = mapper; + this.reader = reader; } @Override public String id() { if (id == null) { - id = this.request.getHeader("ce-id"); + id = headers.get("ce-id"); } return id; @@ -30,7 +58,7 @@ public String id() { @Override public String specVersion() { if (specVersion == null) { - specVersion = this.request.getHeader("ce-specversion"); + this.specVersion = headers.get("ce-specversion"); } return specVersion; @@ -38,17 +66,26 @@ public String specVersion() { @Override public String source() { - if (source == null) { - source = this.request.getHeader("ce-source"); + if (source == null && headers.contains("ce-source")) { + source = headers.get("ce-source"); } return source; } + @Override + public String type() { + if (type == null) { + type = headers.get("ce-type"); + } + + return type; + } + @Override public String subject() { if (subject == null) { - subject = this.request.getHeader("ce-subject"); + subject = headers.get("ce-subject"); } return subject; @@ -57,7 +94,7 @@ public String subject() { @Override public OffsetDateTime time() { if (time == null) { - String t = this.request.getHeader("ce-time"); + String t = headers.get("ce-time"); if (t != null) { time = OffsetDateTime.parse(t); } @@ -65,4 +102,84 @@ public OffsetDateTime time() { return time; } + + private static final Set reservedHeaders; + static { + Set ra = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + ra.add("ce-specversion"); + ra.add("ce-id"); + ra.add("ce-source"); + ra.add("ce-type"); + + ra.add("Content-Type"); + ra.add("ce-subject"); + ra.add("ce-time"); + + ra.add("ce-datacontentencoding"); + ra.add("ce-schemaurl"); + + ra.add("ce-dataschema"); + + reservedHeaders = Collections.unmodifiableSet(ra); + } + + private static boolean isCEHeader(String value) { + return (value.charAt(0) == 'C' || value.charAt(0) == 'c') && + (value.charAt(1) == 'E' || value.charAt(1) == 'e') && + value.charAt(2) == '-'; + } + + @Override + public Map extensions() { + if (extensions == null) { + extensions = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); + for (Map.Entry entry : headers) { + if (isCEHeader(entry.getKey()) && !reservedHeaders.contains(entry.getKey())) { + extensions.put(entry.getKey().substring(3), entry.getValue()); + } + } + extensions = Collections.unmodifiableMap(extensions); + } + return extensions; + } + + @Override + public String dataSchema() { + if (dataSchema == null) { + String dsName = specVersion().charAt(0) == '0' ? "ce-schemaurl" : "ce-dataschema"; + dataSchema = headers.get(dsName); + } + return dataSchema; + } + + @Override + public String dataContentType() { + if (dataContentType == null) { + dataContentType = headers.get("Content-Type"); + } + + return dataContentType; + } + + @Override + public T data() { + if (data != null) { + return data; + } + if (dataContentType() != null && dataContentType().startsWith("application/json") && !byte[].class.equals(dataType)) { + try { + data = reader.readValue(buffer.getBytes()); + return data; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } else if (byte[].class.equals(dataType)) { + data = (T) buffer.getBytes(); + return data; + } else { + String msg = String.format("Don't know how to get event data (dataContentType: '%s', javaType: '%s').", + dataContentType(), dataType.getTypeName()); + throw new RuntimeException(msg); + } + } } diff --git a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/JsonCloudEventImpl.java b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/JsonCloudEventImpl.java index bbaebd95fc956..098bc1b6047ab 100644 --- a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/JsonCloudEventImpl.java +++ b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/JsonCloudEventImpl.java @@ -1,22 +1,50 @@ package io.quarkus.funqy.runtime.bindings.knative.events; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.lang.reflect.Type; import java.time.OffsetDateTime; - +import java.util.Base64; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; + +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.ObjectWriter; +import io.quarkus.funqy.knative.events.AbstractCloudEvent; import io.quarkus.funqy.knative.events.CloudEvent; -class JsonCloudEventImpl implements CloudEvent { +class JsonCloudEventImpl extends AbstractCloudEvent implements CloudEvent { String id; String specVersion; String source; + String type; + String subject; OffsetDateTime time; + Map extensions; + + String dataSchema; + String dataContentType; + T data; final JsonNode event; + final ObjectMapper mapper; + final Type dataType; + private ObjectReader reader; + private ObjectWriter writer; - public JsonCloudEventImpl(JsonNode event) { + public JsonCloudEventImpl(JsonNode event, Type dataType, ObjectMapper mapper, ObjectReader reader) { this.event = event; + this.mapper = mapper; + this.dataType = dataType; + this.reader = reader; } @Override @@ -34,8 +62,9 @@ public String id() { public String specVersion() { if (specVersion == null) { JsonNode specVersion = event.get("specversion"); - if (specVersion != null) + if (specVersion != null) { this.specVersion = specVersion.asText(); + } } return specVersion; @@ -43,21 +72,31 @@ public String specVersion() { @Override public String source() { - if (source == null) { - JsonNode source = event.get("source"); - if (source != null) - this.source = source.asText(); + if (source == null && event.has("source")) { + this.source = event.get("source").asText(); } return source; } + @Override + public String type() { + if (type == null) { + JsonNode source = event.get("type"); + if (source != null) + this.type = source.asText(); + } + + return type; + } + @Override public String subject() { if (subject == null) { JsonNode subject = event.get("subject"); - if (subject != null) + if (subject != null) { this.subject = subject.asText(); + } } return subject; @@ -74,4 +113,128 @@ public OffsetDateTime time() { return time; } + + private static final Set reservedAttributes; + static { + Set ra = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + ra.add("specversion"); + ra.add("id"); + ra.add("source"); + ra.add("type"); + + ra.add("datacontenttype"); + ra.add("subject"); + ra.add("time"); + + ra.add("datacontentencoding"); + ra.add("schemaurl"); + ra.add("dataschema"); + + ra.add("data"); + + reservedAttributes = Collections.unmodifiableSet(ra); + } + + @Override + public Map extensions() { + + if (extensions == null) { + extensions = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); + event.fields().forEachRemaining(e -> { + if (!reservedAttributes.contains(e.getKey())) { + extensions.put(e.getKey(), e.getValue().textValue()); + } + }); + extensions = Collections.unmodifiableMap(extensions); + } + + return extensions; + } + + @Override + public String dataSchema() { + if (dataSchema == null) { + String dsName = specVersion().charAt(0) == '0' ? "schemaurl" : "dataschema"; + JsonNode dataSchema = event.get(dsName); + if (dataSchema != null) { + this.dataSchema = dataSchema.asText(); + } + } + + return dataSchema; + } + + @Override + public String dataContentType() { + if (dataContentType == null) { + JsonNode dataContentType = event.get("datacontenttype"); + if (dataContentType != null) { + this.dataContentType = dataContentType.asText(); + } + } + + return dataContentType; + } + + @Override + public T data() { + if (data != null) { + return data; + } + if (dataContentType() != null && dataContentType().startsWith("application/json") && !byte[].class.equals(dataType)) { + try { + data = reader.readValue(event.get("data")); + return data; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } else if (byte[].class.equals(dataType)) { + try { + switch (specVersion().charAt(0)) { + case '0': + boolean isBase64 = false; + if (event.has("datacontentencoding")) { + String dce = event.get("datacontentencoding").asText(); + if ("base64".equals(dce)) { + isBase64 = true; + } else { + throw new RuntimeException("Cannot deserialize data for data-content-encoding: '" + dce + "'."); + } + } + if (isBase64) { + if (event.has("data")) { + String txt = event.get("data").asText(); + data = (T) Base64.getDecoder().decode(txt); + return data; + } + } else { + if (event.has("data")) { + data = (T) mapper.writeValueAsBytes(event.get("data")); + return data; + } + } + case '1': + if (event.has("data")) { + data = (T) mapper.writeValueAsBytes(event.get("data")); + return data; + } else if (event.has("data_base64")) { + String txt = event.get("data_base64").asText(); + data = (T) Base64.getDecoder().decode(txt); + return data; + } else { + return null; + } + default: + throw new RuntimeException("Cannot deserialize data for spec-version: '" + specVersion() + "'."); + } + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } else { + String msg = String.format("Don't know how to get event data (dataContentType: '%s', javaType: '%s').", + dataContentType(), dataType.getTypeName()); + throw new RuntimeException(msg); + } + } + } diff --git a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/KnativeEventsBindingRecorder.java b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/KnativeEventsBindingRecorder.java index 43ebc1760afb8..7028d2b2f82d1 100644 --- a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/KnativeEventsBindingRecorder.java +++ b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/KnativeEventsBindingRecorder.java @@ -1,6 +1,8 @@ package io.quarkus.funqy.runtime.bindings.knative.events; import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executor; @@ -17,7 +19,9 @@ import io.quarkus.arc.Arc; import io.quarkus.arc.InstanceHandle; +import io.quarkus.arc.impl.Reflections; import io.quarkus.arc.runtime.BeanContainer; +import io.quarkus.funqy.knative.events.CloudEvent; import io.quarkus.funqy.knative.events.CloudEventMapping; import io.quarkus.funqy.runtime.FunctionConstructor; import io.quarkus.funqy.runtime.FunctionInvoker; @@ -44,6 +48,10 @@ public class KnativeEventsBindingRecorder { public static final String RESPONSE_TYPE = "response.cloud.event.type"; public static final String RESPONSE_SOURCE = "response.cloud.event.source"; + public static final String INPUT_CE_DATA_TYPE = "io.quarkus.funqy.knative.events.INPUT_CE_DATA_TYPE"; + public static final String OUTPUT_CE_DATA_TYPE = "io.quarkus.funqy.knative.events.OUTPUT_CE_DATA_TYPE"; + public static final String DATA_OBJECT_READER = ObjectReader.class.getName() + "_DATA_OBJECT_READER"; + public static final String DATA_OBJECT_WRITER = ObjectWriter.class.getName() + "_DATA_OBJECT_WRITER"; public void init() { typeTriggers = new HashMap<>(); @@ -61,16 +69,44 @@ public void init() { } if (invoker.hasInput()) { - JavaType javaInputType = objectMapper.constructType(invoker.getInputType()); + Type inputType = invoker.getInputType(); + + if (CloudEvent.class.equals(Reflections.getRawType(inputType))) { + if (inputType instanceof ParameterizedType) { + Type[] params = ((ParameterizedType) inputType).getActualTypeArguments(); + if (params.length == 1) { + inputType = params[0]; + invoker.getBindingContext().put(INPUT_CE_DATA_TYPE, inputType); + } + } else { + throw new RuntimeException("When using CloudEvent<> generic parameter must be used."); + } + } + + JavaType javaInputType = objectMapper.constructType(inputType); ObjectReader reader = objectMapper.readerFor(javaInputType); - invoker.getBindingContext().put(ObjectReader.class.getName(), reader); - QueryReader queryReader = queryMapper.readerFor(invoker.getInputType()); + invoker.getBindingContext().put(DATA_OBJECT_READER, reader); + QueryReader queryReader = queryMapper.readerFor(inputType); invoker.getBindingContext().put(QueryReader.class.getName(), queryReader); } if (invoker.hasOutput()) { - JavaType outputJavaType = objectMapper.constructType(invoker.getOutputType()); + Type outputType = invoker.getOutputType(); + + if (CloudEvent.class.equals(Reflections.getRawType(outputType))) { + if (outputType instanceof ParameterizedType) { + Type[] params = ((ParameterizedType) outputType).getActualTypeArguments(); + if (params.length == 1) { + outputType = params[0]; + invoker.getBindingContext().put(OUTPUT_CE_DATA_TYPE, outputType); + } + } else { + throw new RuntimeException("When using CloudEvent<> generic parameter must be used."); + } + } + + JavaType outputJavaType = objectMapper.constructType(outputType); ObjectWriter writer = objectMapper.writerFor(outputJavaType); - invoker.getBindingContext().put(ObjectWriter.class.getName(), writer); + invoker.getBindingContext().put(DATA_OBJECT_WRITER, writer); String functionName = invoker.getName(); if (annotation != null && !annotation.responseType().isEmpty()) { diff --git a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/VertxRequestHandler.java b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/VertxRequestHandler.java index 6083249330c91..0a9f59d45c1b4 100644 --- a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/VertxRequestHandler.java +++ b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/VertxRequestHandler.java @@ -1,13 +1,20 @@ package io.quarkus.funqy.runtime.bindings.knative.events; +import static io.quarkus.funqy.runtime.bindings.knative.events.KnativeEventsBindingRecorder.DATA_OBJECT_READER; +import static io.quarkus.funqy.runtime.bindings.knative.events.KnativeEventsBindingRecorder.DATA_OBJECT_WRITER; +import static io.quarkus.funqy.runtime.bindings.knative.events.KnativeEventsBindingRecorder.INPUT_CE_DATA_TYPE; +import static io.quarkus.funqy.runtime.bindings.knative.events.KnativeEventsBindingRecorder.OUTPUT_CE_DATA_TYPE; import static io.quarkus.funqy.runtime.bindings.knative.events.KnativeEventsBindingRecorder.RESPONSE_SOURCE; import static io.quarkus.funqy.runtime.bindings.knative.events.KnativeEventsBindingRecorder.RESPONSE_TYPE; +import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.Type; import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.Executor; +import java.util.function.Consumer; import javax.enterprise.inject.Instance; import javax.enterprise.inject.spi.CDI; @@ -24,6 +31,7 @@ import io.quarkus.arc.ManagedContext; import io.quarkus.arc.runtime.BeanContainer; import io.quarkus.funqy.knative.events.CloudEvent; +import io.quarkus.funqy.knative.events.CloudEventBuilder; import io.quarkus.funqy.runtime.FunctionInvoker; import io.quarkus.funqy.runtime.FunctionRecorder; import io.quarkus.funqy.runtime.FunqyServerResponse; @@ -35,7 +43,9 @@ import io.quarkus.vertx.http.runtime.security.QuarkusHttpUser; import io.vertx.core.Handler; import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerResponse; import io.vertx.ext.web.RoutingContext; @@ -74,16 +84,24 @@ public VertxRequestHandler(Vertx vertx, @Override public void handle(RoutingContext routingContext) { - String mediaType = routingContext.request().getHeader("Content-Type"); - if (mediaType == null || mediaType.startsWith("application/json") || mediaType.trim().equals("")) { - if (routingContext.request().getHeader("ce-id") != null) { - binaryContentMode(routingContext); - } else { - regularFunqyHttp(routingContext); + final HttpServerRequest request = routingContext.request(); + final String mediaType = request.getHeader("Content-Type"); + boolean binaryCE = request.headers().contains("Ce-Id"); + boolean structuredCE = false; + if (mediaType != null) { + structuredCE = mediaType.startsWith("application/cloudevents+json"); + } + + if (structuredCE || binaryCE) { + try { + processCloudEvent(routingContext); + } catch (Throwable t) { + routingContext.fail(t); } - } else if (mediaType.startsWith("application/cloudevents+json")) { - structuredMode(routingContext); - } else if (mediaType.startsWith("application/cloudevents-batch+json")) { + } else if ((mediaType != null && mediaType.startsWith("application/json") && request.method() == HttpMethod.POST) || + request.method() == HttpMethod.GET) { + regularFunqyHttp(routingContext); + } else if (mediaType != null && mediaType.startsWith("application/cloudevents-batch+json")) { routingContext.fail(406); log.error("Batch mode not supported yet"); return; @@ -95,8 +113,261 @@ public void handle(RoutingContext routingContext) { } } - private static final ResponseProcessing NOOP = () -> { - }; + private void processCloudEvent(RoutingContext routingContext) { + final HttpServerRequest httpRequest = routingContext.request(); + final HttpServerResponse httpResponse = routingContext.response(); + final boolean binaryCE = httpRequest.headers().contains("ce-id"); + + httpRequest.bodyHandler(bodyBuff -> executor.execute(() -> { + try { + final String ceType; + final String ceSpecVersion; + final JsonNode structuredPayload; + + if (binaryCE) { + ceType = httpRequest.headers().get("ce-type"); + ceSpecVersion = httpRequest.headers().get("ce-specversion"); + structuredPayload = null; + } else { + try { + structuredPayload = mapper.readTree(bodyBuff.getBytes()); + ceType = structuredPayload.get("type").asText(); + ceSpecVersion = structuredPayload.get("specversion").asText(); + } catch (IOException e) { + routingContext.fail(e); + return; + } + } + + if (!isSupportedSpecVersion(ceSpecVersion)) { + log.errorf("Unexpected CloudEvent spec-version '%s'.", ceSpecVersion); + routingContext.fail(400); + return; + } + + final FunctionInvoker invoker; + if (defaultInvoker != null) { + invoker = defaultInvoker; + } else { + invoker = typeTriggers.get(ceType); + if (invoker == null) { + routingContext.fail(404); + log.error("Couldn't map CloudEvent type: '" + ceType + "' to a function."); + return; + } + } + + final Type inputCeDataType = (Type) invoker.getBindingContext().get(INPUT_CE_DATA_TYPE); + final Type outputCeDataType = (Type) invoker.getBindingContext().get(OUTPUT_CE_DATA_TYPE); + final Type innerInputType = inputCeDataType != null ? inputCeDataType : invoker.getInputType(); + final Type innerOutputType = outputCeDataType != null ? outputCeDataType : invoker.getOutputType(); + final ObjectReader reader = (ObjectReader) invoker.getBindingContext().get(DATA_OBJECT_READER); + final ObjectWriter writer = (ObjectWriter) invoker.getBindingContext().get(DATA_OBJECT_WRITER); + + final CloudEvent inputCloudEvent; + final Object input; + if (invoker.hasInput()) { + if (binaryCE) { + inputCloudEvent = new HeaderCloudEventImpl<>( + httpRequest.headers(), + bodyBuff, + inputCeDataType != null ? inputCeDataType : innerInputType, + mapper, + reader); + } else { + inputCloudEvent = new JsonCloudEventImpl<>( + structuredPayload, + inputCeDataType != null ? inputCeDataType : innerInputType, + mapper, + reader); + } + if (inputCeDataType == null) { + // we need to unwrap user data from CloudEvent + input = inputCloudEvent.data(); + } else { + // user is explicitly handling CloudEvent + input = inputCloudEvent; + } + } else { + input = inputCloudEvent = null; + } + + final Consumer sendOutput = output -> { + try { + if (!invoker.hasOutput()) { + routingContext.response().setStatusCode(204); + routingContext.response().end(); + return; + } + + final CloudEvent outputCloudEvent; + if (outputCeDataType == null) { + // we need to wrap user data into CloudEvent + CloudEventBuilder builder = CloudEventBuilder.create(); + if (byte[].class.equals(innerOutputType)) { + outputCloudEvent = builder.build((byte[]) output, "application/octet-stream"); + } else { + outputCloudEvent = builder.build(output); + } + } else { + // user is explicitly returning CloudEvent + outputCloudEvent = (CloudEvent) output; + } + + String id = outputCloudEvent.id(); + if (id == null) { + id = getResponseId(); + } + String specVersion; + if (outputCloudEvent.specVersion() == null) { + specVersion = inputCloudEvent.specVersion().toString(); + } else { + specVersion = outputCloudEvent.specVersion().toString(); + } + String source = outputCloudEvent.source(); + if (source == null) { + source = (String) invoker.getBindingContext().get(RESPONSE_SOURCE); + } + String type = outputCloudEvent.type(); + if (type == null) { + type = (String) invoker.getBindingContext().get(RESPONSE_TYPE); + } + + boolean ceHasData = !Void.class.equals(innerInputType); + + if (binaryCE) { + httpResponse.putHeader("ce-id", id); + httpResponse.putHeader("ce-specversion", specVersion); + httpResponse.putHeader("ce-source", source); + httpResponse.putHeader("ce-type", type); + + if (outputCloudEvent.time() != null) { + httpResponse.putHeader("ce-time", outputCloudEvent.time().toString()); + } + + if (outputCloudEvent.subject() != null) { + httpResponse.putHeader("ce-subject", outputCloudEvent.subject()); + } + + if (outputCloudEvent.dataSchema() != null) { + String dsName = outputCloudEvent.specVersion().charAt(0) == '0' ? "ce-schemaurl" + : "ce-dataschema"; + httpResponse.putHeader(dsName, outputCloudEvent.dataSchema()); + } + + outputCloudEvent.extensions() + .entrySet() + .forEach(e -> httpResponse.putHeader("ce-" + e.getKey(), e.getValue())); + + String dataContentType = outputCloudEvent.dataContentType(); + if (dataContentType != null) { + httpResponse.putHeader("Content-Type", dataContentType); + } + + if (ceHasData) { + if (dataContentType != null && dataContentType.startsWith("application/json")) { + httpResponse.end(Buffer.buffer(writer.writeValueAsBytes(outputCloudEvent.data()))); + } else if (byte[].class.equals(innerOutputType)) { + httpResponse.end(Buffer.buffer((byte[]) outputCloudEvent.data())); + } else { + log.errorf("Don't know how to write ce to output (dataContentType: %s, javaType: %s).", + dataContentType, innerOutputType); + routingContext.fail(500); + return; + } + } else { + routingContext.response().setStatusCode(204); + routingContext.response().end(); + } + return; + } else { + final Map responseEvent = new HashMap<>(); + responseEvent.put("id", id); + responseEvent.put("specversion", specVersion); + responseEvent.put("source", source); + responseEvent.put("type", type); + + if (outputCloudEvent.time() != null) { + responseEvent.put("time", outputCloudEvent.time()); + } + + if (outputCloudEvent.subject() != null) { + responseEvent.put("subject", outputCloudEvent.subject()); + } + + if (outputCloudEvent.dataSchema() != null) { + String dsName = outputCloudEvent.specVersion().charAt(0) == '0' ? "schemaurl" : "dataschema"; + responseEvent.put(dsName, outputCloudEvent.dataSchema()); + } + + outputCloudEvent.extensions() + .entrySet() + .forEach(e -> responseEvent.put(e.getKey(), e.getValue())); + + String dataContentType = outputCloudEvent.dataContentType(); + if (dataContentType != null) { + responseEvent.put("datacontenttype", dataContentType); + } + + if (ceHasData) { + switch (specVersion.charAt(0)) { + case '1': + if (dataContentType != null && dataContentType.startsWith("application/json")) { + responseEvent.put("data", outputCloudEvent.data()); + } else if (byte[].class.equals(innerOutputType)) { + responseEvent.put("data_base64", (byte[]) outputCloudEvent.data()); + } else { + log.errorf( + "Don't know how to write ce to output (dataContentType: %s, javaType: %s).", + dataContentType, innerOutputType); + routingContext.fail(500); + return; + } + break; + case '0': + if (dataContentType != null && dataContentType.startsWith("application/json")) { + responseEvent.put("data", outputCloudEvent.data()); + } else if (byte[].class.equals(innerOutputType)) { + responseEvent.put("datacontentencoding", "base64"); + responseEvent.put("data", (byte[]) outputCloudEvent.data()); + } else { + log.errorf( + "Don't know how to write ce to output (dataContentType: %s, javaType: %s).", + dataContentType, innerOutputType); + routingContext.fail(500); + return; + } + break; + default: + throw new RuntimeException( + "Unsupported CloudEvent spec-version: '" + specVersion + "'."); + } + } + + routingContext.response().putHeader("Content-Type", "application/cloudevents+json"); + httpResponse.end(Buffer.buffer(mapper.writer().writeValueAsBytes(responseEvent))); + return; + } + } catch (Throwable t) { + routingContext.fail(t); + } + }; + + dispatch(inputCloudEvent, routingContext, invoker, input) + .getOutput() + .subscribe() + .with(sendOutput, t -> routingContext.fail(t)); + + } catch (Throwable t) { + routingContext.fail(t); + } + })); + + } + + private static boolean isSupportedSpecVersion(String ceSpecVersion) { + return (ceSpecVersion.charAt(0) == '0' || ceSpecVersion.charAt(0) == '1') && ceSpecVersion.charAt(1) == '.'; + } private void regularFunqyHttp(RoutingContext routingContext) { String path = routingContext.request().path(); @@ -118,39 +389,25 @@ private void regularFunqyHttp(RoutingContext routingContext) { } else { invoker = defaultInvoker; } - processHttpRequest(null, routingContext, NOOP, invoker); - } - private void binaryContentMode(RoutingContext routingContext) { - String ceType = routingContext.request().getHeader("ce-type"); - FunctionInvoker invoker = defaultInvoker; if (invoker == null) { - // map by type trigger - invoker = typeTriggers.get(ceType); - if (invoker == null) { - routingContext.fail(404); - log.error("Could not map ce-type header: " + ceType + " to a function"); - return; - } + routingContext.fail(404); + log.error("There is no function matching the path."); + return; + } + if (invoker.getBindingContext().get(INPUT_CE_DATA_TYPE) != null || + invoker.getBindingContext().get(OUTPUT_CE_DATA_TYPE) != null) { + routingContext.fail(400); + log.errorf("Bad request: the '%s' function expects CloudEvent, but plain HTTP was received.", + invoker.getName()); + return; } - final FunctionInvoker targetInvoker = invoker; - processHttpRequest(new HeaderCloudEventImpl(routingContext.request()), routingContext, () -> { - routingContext.response().putHeader("ce-id", getResponseId()); - routingContext.response().putHeader("ce-specversion", "1.0"); - routingContext.response().putHeader("ce-source", - (String) targetInvoker.getBindingContext().get(RESPONSE_SOURCE)); - routingContext.response().putHeader("ce-type", - (String) targetInvoker.getBindingContext().get(RESPONSE_TYPE)); - }, invoker); - } - @FunctionalInterface - interface ResponseProcessing { - void handle(); + processHttpRequest(null, routingContext, invoker); } - private void processHttpRequest(CloudEvent event, RoutingContext routingContext, ResponseProcessing handler, + private void processHttpRequest(CloudEvent event, RoutingContext routingContext, FunctionInvoker invoker) { if (routingContext.request().method() == HttpMethod.GET) { Object input = null; @@ -165,7 +422,7 @@ private void processHttpRequest(CloudEvent event, RoutingContext routingContext, } } try { - execute(event, routingContext, handler, invoker, input); + execute(event, routingContext, invoker, input); } catch (Throwable t) { log.error(t); routingContext.fail(500, t); @@ -176,7 +433,7 @@ private void processHttpRequest(CloudEvent event, RoutingContext routingContext, Object input = null; if (buff.length() > 0) { ByteBufInputStream in = new ByteBufInputStream(buff.getByteBuf()); - ObjectReader reader = (ObjectReader) invoker.getBindingContext().get(ObjectReader.class.getName()); + ObjectReader reader = (ObjectReader) invoker.getBindingContext().get(DATA_OBJECT_READER); try { input = reader.readValue((InputStream) in); } catch (JsonProcessingException e) { @@ -185,7 +442,7 @@ private void processHttpRequest(CloudEvent event, RoutingContext routingContext, return; } } - execute(event, routingContext, handler, invoker, input); + execute(event, routingContext, invoker, input); } catch (Throwable t) { log.error(t); routingContext.fail(500, t); @@ -198,7 +455,7 @@ private void processHttpRequest(CloudEvent event, RoutingContext routingContext, } - private void execute(CloudEvent event, RoutingContext routingContext, ResponseProcessing handler, FunctionInvoker invoker, + private void execute(CloudEvent event, RoutingContext routingContext, FunctionInvoker invoker, Object finalInput) { executor.execute(() -> { try { @@ -210,9 +467,8 @@ private void execute(CloudEvent event, RoutingContext routingContext, ResponsePr if (invoker.hasOutput()) { try { httpResponse.setStatusCode(200); - handler.handle(); ObjectWriter writer = (ObjectWriter) invoker.getBindingContext() - .get(ObjectWriter.class.getName()); + .get(DATA_OBJECT_WRITER); httpResponse.putHeader("Content-Type", "application/json"); httpResponse.end(writer.writeValueAsString(obj)); } catch (JsonProcessingException jpe) { @@ -235,110 +491,6 @@ private void execute(CloudEvent event, RoutingContext routingContext, ResponsePr }); } - private void structuredMode(RoutingContext routingContext) { - if (routingContext.request().method() != HttpMethod.POST) { - routingContext.fail(405); - log.error("Must be POST method"); - return; - } - routingContext.request().bodyHandler(buff -> { - try { - ByteBufInputStream in = new ByteBufInputStream(buff.getByteBuf()); - Object input = null; - JsonNode event; - try { - event = mapper.reader().readTree((InputStream) in); - } catch (JsonProcessingException e) { - log.error("Failed to unmarshal input", e); - routingContext.fail(400); - return; - } - FunctionInvoker invoker = defaultInvoker; - if (invoker == null) { - String eventType = event.get("type").asText(); - invoker = typeTriggers.get(eventType); - if (invoker == null) { - routingContext.fail(404); - log.error("Could not map json cloud event to function: " + eventType); - return; - } - - } - final FunctionInvoker targetInvoker = invoker; - if (invoker.hasInput()) { - - JsonNode dct = event.get("datacontenttype"); - if (dct == null) { - routingContext.fail(400); - return; - } - String type = dct.asText(); - if (type != null) { - if (!type.equals("application/json")) { - routingContext.fail(406); - log.error("Illegal datacontenttype"); - return; - } - JsonNode data = event.get("data"); - if (data != null) { - ObjectReader reader = (ObjectReader) invoker.getBindingContext().get(ObjectReader.class.getName()); - try { - input = reader.readValue(data); - } catch (JsonProcessingException e) { - log.error("Failed to unmarshal input", e); - routingContext.fail(400); - return; - } - } - } - } - Object finalInput = input; - - executor.execute(() -> { - try { - final HttpServerResponse httpResponse = routingContext.response(); - final FunqyServerResponse response = dispatch(new JsonCloudEventImpl(event), routingContext, - targetInvoker, finalInput); - - response.getOutput().emitOn(executor).subscribe().with( - obj -> { - if (targetInvoker.hasOutput()) { - httpResponse.setStatusCode(200); - final Map responseEvent = new HashMap<>(); - - responseEvent.put("id", getResponseId()); - responseEvent.put("specversion", "1.0"); - responseEvent.put("source", - targetInvoker.getBindingContext().get(RESPONSE_SOURCE)); - responseEvent.put("type", - targetInvoker.getBindingContext().get(RESPONSE_TYPE)); - responseEvent.put("datacontenttype", "application/json"); - responseEvent.put("data", obj); - try { - httpResponse.end(mapper.writer().writeValueAsString(responseEvent)); - } catch (JsonProcessingException e) { - log.error("Failed to marshal", e); - routingContext.fail(400); - } - } else { - httpResponse.setStatusCode(204); - httpResponse.end(); - } - }, - t -> routingContext.fail(t)); - - } catch (Throwable t) { - log.error(t); - routingContext.fail(500, t); - } - }); - } catch (Throwable t) { - log.error(t); - routingContext.fail(500, t); - } - }); - } - private String getResponseId() { return UUID.randomUUID().toString(); } diff --git a/extensions/funqy/funqy-server-common/runtime/src/main/java/io/quarkus/funqy/runtime/FunctionInvoker.java b/extensions/funqy/funqy-server-common/runtime/src/main/java/io/quarkus/funqy/runtime/FunctionInvoker.java index 35737a2390ce7..f633eb47d64c5 100644 --- a/extensions/funqy/funqy-server-common/runtime/src/main/java/io/quarkus/funqy/runtime/FunctionInvoker.java +++ b/extensions/funqy/funqy-server-common/runtime/src/main/java/io/quarkus/funqy/runtime/FunctionInvoker.java @@ -55,7 +55,7 @@ public FunctionInvoker(String name, Class targetClass, Method method) { "Uni must be used with type parameter (e.g. Uni)."); } } else { - outputType = returnType; + outputType = method.getGenericReturnType(); } } }