From 6a274a89e10c9dba8eb9c31b0f9e6f7669065305 Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Mon, 19 Sep 2022 10:55:01 +0200 Subject: [PATCH 1/4] Adding a filter for custom error respnse codes to be added as CE extensions Signed-off-by: Matthias Wessendorf --- .../broker/dispatcher/impl/RecordDispatcherImpl.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java index e3fa0b59ea..21a51d7c8b 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java @@ -77,6 +77,7 @@ public class RecordDispatcherImpl implements RecordDispatcher { private static final String KN_ERROR_DEST_EXT_NAME = "knativeerrordest"; private static final String KN_ERROR_CODE_EXT_NAME = "knativeerrorcode"; private static final String KN_ERROR_DATA_EXT_NAME = "knativeerrordata"; + private static final String EKB_ERROR_PREFIX = "kne-"; private static final int KN_ERROR_DATA_MAX_BYTES = 1024; private final Filter filter; @@ -292,6 +293,14 @@ private ConsumerRecordContext errorTransform(final ConsumerRecordContext recordC extensions.put(KN_ERROR_DEST_EXT_NAME, destination); extensions.put(KN_ERROR_CODE_EXT_NAME, String.valueOf(response.statusCode())); + // transform all headers keys to be lowercase and filter by prefix. + // afterwards extract the prefix and add them to the extensions map + response.headers().forEach((k, v) -> { + if (k.regionMatches(true, 0, EKB_ERROR_PREFIX, 0, EKB_ERROR_PREFIX.length())) { // aka startsWithIgnoreCase + extensions.put(k.substring(EKB_ERROR_PREFIX.length()).toLowerCase(), v); + } + }); + // we extract the response as byte array as we do not need a string // representation of it var data = response.bodyAsBuffer(); From 9baa960450049443a6060429db0e2b7a6ba3377b Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Mon, 19 Sep 2022 13:39:26 +0200 Subject: [PATCH 2/4] Adding simple unit test Signed-off-by: Matthias Wessendorf --- .../dispatcher/impl/RecordDispatcherTest.java | 71 ++++++++++++++++++- 1 file changed, 70 insertions(+), 1 deletion(-) diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java index a94c0e919a..54441899e8 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java @@ -300,6 +300,71 @@ final var record = record(); assertNoDiscardedEventCount(); } + @Test + public void failedEventsShouldBeEnhancedWithCustomHttpHeaders() { + + final var subscriberSenderSendCalled = new AtomicBoolean(false); + final var dlsSenderSendCalled = new AtomicBoolean(false); + final RecordDispatcherListener receiver = offsetManagerMock(); + + int errorCode = 422; + String errorBody = "{ \"message\": \"bad bad things happened\" }"; + String validErrorKey = "kne-testerror"; + String invalidErrorKey = "something"; + MultiMap headerMap = MultiMap.caseInsensitiveMultiMap().add(validErrorKey, "hello").add(invalidErrorKey,"nope"); + + final var dispatcherHandler = new RecordDispatcherImpl( + resourceContext, + value -> true, + new CloudEventSenderMock( + record -> { + subscriberSenderSendCalled.set(true); + return Future.failedFuture(new ResponseFailureException(makeHttpResponseWithHeaders(errorCode, errorBody, headerMap), "")); + } + ), + new CloudEventSenderMock( + record -> { + dlsSenderSendCalled.set(true); + return Future.succeededFuture(); + } + ), new ResponseHandlerMock(), + receiver, + null, + registry + ); + final var record = record(); + dispatcherHandler.dispatch(record); + + ArgumentCaptor> captor = ArgumentCaptor.forClass(KafkaConsumerRecord.class); + + assertTrue(subscriberSenderSendCalled.get()); + assertTrue(dlsSenderSendCalled.get()); + verify(receiver, times(1)).recordReceived(record); + verify(receiver, times(1)).successfullySentToDeadLetterSink(captor.capture()); + verify(receiver, never()).successfullySentToSubscriber(any()); + verify(receiver, never()).failedToSendToDeadLetterSink(any(), any()); + verify(receiver, never()).recordDiscarded(any()); + + KafkaConsumerRecord failedRecord = captor.getValue(); + assertEquals(record.topic(), failedRecord.topic()); + assertEquals(record.partition(), failedRecord.partition()); + assertEquals(record.offset(), failedRecord.offset()); + assertEquals(record.key(), failedRecord.key()); + assertEquals(record.value().getId(), failedRecord.value().getId()); + assertEquals(record.value().getAttributeNames(), failedRecord.value().getAttributeNames()); + assertEquals(record.value().getData(), failedRecord.value().getData()); + assertEquals("testdest", failedRecord.value().getExtension("knativeerrordest")); + assertEquals(String.valueOf(errorCode), failedRecord.value().getExtension("knativeerrorcode")); + assertEquals("hello", failedRecord.value().getExtension("testerror")); + assertThat(failedRecord.value().getExtension("something")).isNull(); + assertEquals(Base64.getEncoder().encodeToString(errorBody.getBytes()), failedRecord.value().getExtension("knativeerrordata")); + + assertEventDispatchLatency(); + assertEventProcessingLatency(); + assertEventCount(); + assertNoDiscardedEventCount(); + } + @Test public void failedEventsShouldBeEnhancedWithErrorExtensionsPriorToSendingToDlsBodyTooLarge() { @@ -362,11 +427,15 @@ final var record = record(); } private HttpResponseImpl makeHttpResponse(int statusCode, String body) { + return makeHttpResponseWithHeaders(statusCode, body, MultiMap.caseInsensitiveMultiMap()); + } + + private HttpResponseImpl makeHttpResponseWithHeaders(int statusCode, String body, MultiMap headers) { return new HttpResponseImpl( HttpVersion.HTTP_2, statusCode, "", - MultiMap.caseInsensitiveMultiMap(), + headers, MultiMap.caseInsensitiveMultiMap(), Collections.emptyList(), Buffer.buffer(body, "UTF-8"), From f284df5e44ea0da4ca90ec582e624bb098f804aa Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Mon, 19 Sep 2022 14:49:11 +0200 Subject: [PATCH 3/4] Adding rekt test Signed-off-by: Matthias Wessendorf --- test/e2e_new/dls_extensions_test.go | 68 +++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/test/e2e_new/dls_extensions_test.go b/test/e2e_new/dls_extensions_test.go index 893b14f6de..008c3d41e0 100644 --- a/test/e2e_new/dls_extensions_test.go +++ b/test/e2e_new/dls_extensions_test.go @@ -59,6 +59,7 @@ func TestDeadLetterSinkExtensions(t *testing.T) { env.Test(ctx, t, SubscriberReturnedErrorSmallData()) env.Test(ctx, t, SubscriberReturnedErrorLargeData()) env.Test(ctx, t, SubscriberReturnedHtmlWebpage()) + env.Test(ctx, t, SubscriberReturnedCustomExtensionHeader()) } func SubscriberUnreachable() *feature.Feature { @@ -361,6 +362,73 @@ func SubscriberReturnedHtmlWebpage() *feature.Feature { return f } +func SubscriberReturnedCustomExtensionHeader() *feature.Feature { + f := feature.NewFeature() + + sourceName := feature.MakeRandomK8sName("source") + sinkName := feature.MakeRandomK8sName("sink") + deadLetterSinkName := feature.MakeRandomK8sName("dls") + triggerName := feature.MakeRandomK8sName("trigger") + brokerName := feature.MakeRandomK8sName("broker") + + ev := cetest.FullEvent() + + f.Setup("install one partition configuration", single_partition_config.Install) + f.Setup("install broker", broker.Install( + brokerName, + broker.WithBrokerClass(kafka.BrokerClass), + broker.WithConfig(single_partition_config.ConfigMapName), + )) + f.Setup("broker is ready", broker.IsReady(brokerName)) + f.Setup("broker is addressable", broker.IsAddressable(brokerName)) + + errorData := `{ "message": "catastrophic failure" }` + f.Setup("install sink", eventshub.Install( + sinkName, + eventshub.StartReceiver, + eventshub.DropFirstN(1), + eventshub.DropEventsResponseCode(422), + eventshub.DropEventsResponseHeaders(map[string]string{"Kne-Test": "foo"}), + eventshub.DropEventsResponseBody(errorData), + )) + f.Setup("install dead letter sink", eventshub.Install( + deadLetterSinkName, + eventshub.StartReceiver, + )) + f.Setup("install trigger", trigger.Install( + triggerName, + brokerName, + trigger.WithSubscriber(svc.AsKReference(sinkName), ""), + trigger.WithDeadLetterSink(svc.AsKReference(deadLetterSinkName), ""), + )) + f.Setup("trigger is ready", trigger.IsReady(triggerName)) + + f.Requirement("install source", eventshub.Install( + sourceName, + eventshub.StartSenderToResource(broker.GVR(), brokerName), + eventshub.InputEvent(ev), + )) + + f.Assert("knativeerrordest, knativeerrorcode, knativeerrordata and custom extension header added", assertEnhancedWithKnativeErrorExtensions( + deadLetterSinkName, + func(ctx context.Context) cetest.EventMatcher { + sinkAddress, _ := svc.Address(ctx, sinkName) + return cetest.HasExtension("knativeerrordest", sinkAddress.String()) + }, + func(ctx context.Context) cetest.EventMatcher { + return cetest.HasExtension("knativeerrorcode", "422") + }, + func(ctx context.Context) cetest.EventMatcher { + return cetest.HasExtension("test", "foo") + }, + func(ctx context.Context) cetest.EventMatcher { + return cetest.HasExtension("knativeerrordata", base64.StdEncoding.EncodeToString([]byte(errorData))) + }, + )) + + return f +} + func assertEnhancedWithKnativeErrorExtensions(sinkName string, matcherfns ...func(ctx context.Context) cetest.EventMatcher) feature.StepFn { return func(ctx context.Context, t feature.T) { matchers := make([]cetest.EventMatcher, len(matcherfns)) From 061363594cfb8979b34e1b2c4e24cdab9d4f8166 Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Mon, 19 Sep 2022 14:59:00 +0200 Subject: [PATCH 4/4] :lipstick: update comment Signed-off-by: Matthias Wessendorf --- .../kafka/broker/dispatcher/impl/RecordDispatcherImpl.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java index 21a51d7c8b..15c87fe625 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java @@ -293,8 +293,7 @@ private ConsumerRecordContext errorTransform(final ConsumerRecordContext recordC extensions.put(KN_ERROR_DEST_EXT_NAME, destination); extensions.put(KN_ERROR_CODE_EXT_NAME, String.valueOf(response.statusCode())); - // transform all headers keys to be lowercase and filter by prefix. - // afterwards extract the prefix and add them to the extensions map + // match for prefixed headers and put them to our extensions map response.headers().forEach((k, v) -> { if (k.regionMatches(true, 0, EKB_ERROR_PREFIX, 0, EKB_ERROR_PREFIX.length())) { // aka startsWithIgnoreCase extensions.put(k.substring(EKB_ERROR_PREFIX.length()).toLowerCase(), v);