Skip to content

Commit

Permalink
Adding simple unit test
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Wessendorf <[email protected]>
  • Loading branch information
matzew committed Sep 19, 2022
1 parent 7db2829 commit 86d702c
Showing 1 changed file with 70 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,71 @@ final var record = record();
assertNoDiscardedEventCount();
}

@Test
public void failedEventsShouldBeEnhancedWithCustomHttpHeaders() {

final var subscriberSenderSendCalled = new AtomicBoolean(false);
final var dlsSenderSendCalled = new AtomicBoolean(false);
final RecordDispatcherListener receiver = offsetManagerMock();

int errorCode = 422;
String errorBody = "{ \"message\": \"bad bad things happened\" }";
String validErrorKey = "kne-testerror";
String invalidErrorKey = "something";
MultiMap headerMap = MultiMap.caseInsensitiveMultiMap().add(validErrorKey, "hello").add(invalidErrorKey,"nope");

final var dispatcherHandler = new RecordDispatcherImpl(
resourceContext,
value -> true,
new CloudEventSenderMock(
record -> {
subscriberSenderSendCalled.set(true);
return Future.failedFuture(new ResponseFailureException(makeHttpResponseWithHeaders(errorCode, errorBody, headerMap), ""));
}
),
new CloudEventSenderMock(
record -> {
dlsSenderSendCalled.set(true);
return Future.succeededFuture();
}
), new ResponseHandlerMock(),
receiver,
null,
registry
);
final var record = record();
dispatcherHandler.dispatch(record);

ArgumentCaptor<KafkaConsumerRecord<Object, CloudEvent>> captor = ArgumentCaptor.forClass(KafkaConsumerRecord.class);

assertTrue(subscriberSenderSendCalled.get());
assertTrue(dlsSenderSendCalled.get());
verify(receiver, times(1)).recordReceived(record);
verify(receiver, times(1)).successfullySentToDeadLetterSink(captor.capture());
verify(receiver, never()).successfullySentToSubscriber(any());
verify(receiver, never()).failedToSendToDeadLetterSink(any(), any());
verify(receiver, never()).recordDiscarded(any());

KafkaConsumerRecord<Object, CloudEvent> failedRecord = captor.getValue();
assertEquals(record.topic(), failedRecord.topic());
assertEquals(record.partition(), failedRecord.partition());
assertEquals(record.offset(), failedRecord.offset());
assertEquals(record.key(), failedRecord.key());
assertEquals(record.value().getId(), failedRecord.value().getId());
assertEquals(record.value().getAttributeNames(), failedRecord.value().getAttributeNames());
assertEquals(record.value().getData(), failedRecord.value().getData());
assertEquals("testdest", failedRecord.value().getExtension("knativeerrordest"));
assertEquals(String.valueOf(errorCode), failedRecord.value().getExtension("knativeerrorcode"));
assertEquals("hello", failedRecord.value().getExtension("testerror"));
assertThat(failedRecord.value().getExtension("something")).isNull();
assertEquals(Base64.getEncoder().encodeToString(errorBody.getBytes()), failedRecord.value().getExtension("knativeerrordata"));

assertEventDispatchLatency();
assertEventProcessingLatency();
assertEventCount();
assertNoDiscardedEventCount();
}

@Test
public void failedEventsShouldBeEnhancedWithErrorExtensionsPriorToSendingToDlsBodyTooLarge() {

Expand Down Expand Up @@ -362,11 +427,15 @@ final var record = record();
}

private HttpResponseImpl<Buffer> makeHttpResponse(int statusCode, String body) {
return makeHttpResponseWithHeaders(statusCode, body, MultiMap.caseInsensitiveMultiMap());
}

private HttpResponseImpl<Buffer> makeHttpResponseWithHeaders(int statusCode, String body, MultiMap headers) {
return new HttpResponseImpl<Buffer>(
HttpVersion.HTTP_2,
statusCode,
"",
MultiMap.caseInsensitiveMultiMap(),
headers,
MultiMap.caseInsensitiveMultiMap(),
Collections.emptyList(),
Buffer.buffer(body, "UTF-8"),
Expand Down

0 comments on commit 86d702c

Please sign in to comment.