Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a filter for custom error respnse codes to be added as CE extensions #2657

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class RecordDispatcherImpl implements RecordDispatcher {
private static final String KN_ERROR_DEST_EXT_NAME = "knativeerrordest";
private static final String KN_ERROR_CODE_EXT_NAME = "knativeerrorcode";
private static final String KN_ERROR_DATA_EXT_NAME = "knativeerrordata";
private static final String EKB_ERROR_PREFIX = "kne-";
Copy link
Contributor Author

@matzew matzew Sep 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pierDipi I have no idea what is better kne- as in Knative Error?

Or ekb-, as in "eventing kafka broker", as that is more generic

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like kne-

private static final int KN_ERROR_DATA_MAX_BYTES = 1024;

private final Filter filter;
Expand Down Expand Up @@ -292,6 +293,13 @@ private ConsumerRecordContext errorTransform(final ConsumerRecordContext recordC
extensions.put(KN_ERROR_DEST_EXT_NAME, destination);
extensions.put(KN_ERROR_CODE_EXT_NAME, String.valueOf(response.statusCode()));

// match for prefixed headers and put them to our extensions map
response.headers().forEach((k, v) -> {
if (k.regionMatches(true, 0, EKB_ERROR_PREFIX, 0, EKB_ERROR_PREFIX.length())) { // aka startsWithIgnoreCase
extensions.put(k.substring(EKB_ERROR_PREFIX.length()).toLowerCase(), v);
}
});

// we extract the response as byte array as we do not need a string
// representation of it
var data = response.bodyAsBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,71 @@ final var record = record();
assertNoDiscardedEventCount();
}

@Test
public void failedEventsShouldBeEnhancedWithCustomHttpHeaders() {

final var subscriberSenderSendCalled = new AtomicBoolean(false);
final var dlsSenderSendCalled = new AtomicBoolean(false);
final RecordDispatcherListener receiver = offsetManagerMock();

int errorCode = 422;
String errorBody = "{ \"message\": \"bad bad things happened\" }";
String validErrorKey = "kne-testerror";
String invalidErrorKey = "something";
MultiMap headerMap = MultiMap.caseInsensitiveMultiMap().add(validErrorKey, "hello").add(invalidErrorKey,"nope");

final var dispatcherHandler = new RecordDispatcherImpl(
resourceContext,
value -> true,
new CloudEventSenderMock(
record -> {
subscriberSenderSendCalled.set(true);
return Future.failedFuture(new ResponseFailureException(makeHttpResponseWithHeaders(errorCode, errorBody, headerMap), ""));
}
),
new CloudEventSenderMock(
record -> {
dlsSenderSendCalled.set(true);
return Future.succeededFuture();
}
), new ResponseHandlerMock(),
receiver,
null,
registry
);
final var record = record();
dispatcherHandler.dispatch(record);

ArgumentCaptor<KafkaConsumerRecord<Object, CloudEvent>> captor = ArgumentCaptor.forClass(KafkaConsumerRecord.class);

assertTrue(subscriberSenderSendCalled.get());
assertTrue(dlsSenderSendCalled.get());
verify(receiver, times(1)).recordReceived(record);
verify(receiver, times(1)).successfullySentToDeadLetterSink(captor.capture());
verify(receiver, never()).successfullySentToSubscriber(any());
verify(receiver, never()).failedToSendToDeadLetterSink(any(), any());
verify(receiver, never()).recordDiscarded(any());

KafkaConsumerRecord<Object, CloudEvent> failedRecord = captor.getValue();
assertEquals(record.topic(), failedRecord.topic());
assertEquals(record.partition(), failedRecord.partition());
assertEquals(record.offset(), failedRecord.offset());
assertEquals(record.key(), failedRecord.key());
assertEquals(record.value().getId(), failedRecord.value().getId());
assertEquals(record.value().getAttributeNames(), failedRecord.value().getAttributeNames());
assertEquals(record.value().getData(), failedRecord.value().getData());
assertEquals("testdest", failedRecord.value().getExtension("knativeerrordest"));
assertEquals(String.valueOf(errorCode), failedRecord.value().getExtension("knativeerrorcode"));
assertEquals("hello", failedRecord.value().getExtension("testerror"));
assertThat(failedRecord.value().getExtension("something")).isNull();
assertEquals(Base64.getEncoder().encodeToString(errorBody.getBytes()), failedRecord.value().getExtension("knativeerrordata"));

assertEventDispatchLatency();
assertEventProcessingLatency();
assertEventCount();
assertNoDiscardedEventCount();
}

@Test
public void failedEventsShouldBeEnhancedWithErrorExtensionsPriorToSendingToDlsBodyTooLarge() {

Expand Down Expand Up @@ -362,11 +427,15 @@ final var record = record();
}

private HttpResponseImpl<Buffer> makeHttpResponse(int statusCode, String body) {
return makeHttpResponseWithHeaders(statusCode, body, MultiMap.caseInsensitiveMultiMap());
}

private HttpResponseImpl<Buffer> makeHttpResponseWithHeaders(int statusCode, String body, MultiMap headers) {
return new HttpResponseImpl<Buffer>(
HttpVersion.HTTP_2,
statusCode,
"",
MultiMap.caseInsensitiveMultiMap(),
headers,
MultiMap.caseInsensitiveMultiMap(),
Collections.emptyList(),
Buffer.buffer(body, "UTF-8"),
Expand Down
68 changes: 68 additions & 0 deletions test/e2e_new/dls_extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestDeadLetterSinkExtensions(t *testing.T) {
env.Test(ctx, t, SubscriberReturnedErrorSmallData())
env.Test(ctx, t, SubscriberReturnedErrorLargeData())
env.Test(ctx, t, SubscriberReturnedHtmlWebpage())
env.Test(ctx, t, SubscriberReturnedCustomExtensionHeader())
}

func SubscriberUnreachable() *feature.Feature {
Expand Down Expand Up @@ -361,6 +362,73 @@ func SubscriberReturnedHtmlWebpage() *feature.Feature {
return f
}

func SubscriberReturnedCustomExtensionHeader() *feature.Feature {
f := feature.NewFeature()

sourceName := feature.MakeRandomK8sName("source")
sinkName := feature.MakeRandomK8sName("sink")
deadLetterSinkName := feature.MakeRandomK8sName("dls")
triggerName := feature.MakeRandomK8sName("trigger")
brokerName := feature.MakeRandomK8sName("broker")

ev := cetest.FullEvent()

f.Setup("install one partition configuration", single_partition_config.Install)
f.Setup("install broker", broker.Install(
brokerName,
broker.WithBrokerClass(kafka.BrokerClass),
broker.WithConfig(single_partition_config.ConfigMapName),
))
f.Setup("broker is ready", broker.IsReady(brokerName))
f.Setup("broker is addressable", broker.IsAddressable(brokerName))

errorData := `{ "message": "catastrophic failure" }`
f.Setup("install sink", eventshub.Install(
sinkName,
eventshub.StartReceiver,
eventshub.DropFirstN(1),
eventshub.DropEventsResponseCode(422),
eventshub.DropEventsResponseHeaders(map[string]string{"Kne-Test": "foo"}),
eventshub.DropEventsResponseBody(errorData),
))
f.Setup("install dead letter sink", eventshub.Install(
deadLetterSinkName,
eventshub.StartReceiver,
))
f.Setup("install trigger", trigger.Install(
triggerName,
brokerName,
trigger.WithSubscriber(svc.AsKReference(sinkName), ""),
trigger.WithDeadLetterSink(svc.AsKReference(deadLetterSinkName), ""),
))
f.Setup("trigger is ready", trigger.IsReady(triggerName))

f.Requirement("install source", eventshub.Install(
sourceName,
eventshub.StartSenderToResource(broker.GVR(), brokerName),
eventshub.InputEvent(ev),
))

f.Assert("knativeerrordest, knativeerrorcode, knativeerrordata and custom extension header added", assertEnhancedWithKnativeErrorExtensions(
deadLetterSinkName,
func(ctx context.Context) cetest.EventMatcher {
sinkAddress, _ := svc.Address(ctx, sinkName)
return cetest.HasExtension("knativeerrordest", sinkAddress.String())
},
func(ctx context.Context) cetest.EventMatcher {
return cetest.HasExtension("knativeerrorcode", "422")
},
func(ctx context.Context) cetest.EventMatcher {
return cetest.HasExtension("test", "foo")
},
func(ctx context.Context) cetest.EventMatcher {
return cetest.HasExtension("knativeerrordata", base64.StdEncoding.EncodeToString([]byte(errorData)))
},
))

return f
}

func assertEnhancedWithKnativeErrorExtensions(sinkName string, matcherfns ...func(ctx context.Context) cetest.EventMatcher) feature.StepFn {
return func(ctx context.Context, t feature.T) {
matchers := make([]cetest.EventMatcher, len(matcherfns))
Expand Down