From 006905c65b74dd3bb7e91d7de96edce4d7f86cdf Mon Sep 17 00:00:00 2001 From: boddissattva <58807088+boddissattva@users.noreply.github.com> Date: Mon, 9 Dec 2024 17:32:24 +0100 Subject: [PATCH 1/4] fix(action): Kafka multiple values header - tests --- .../kafka/KafkaBasicConsumeActionTest.java | 42 +++++++++++++++---- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/chutney/action-impl/src/test/java/com/chutneytesting/action/kafka/KafkaBasicConsumeActionTest.java b/chutney/action-impl/src/test/java/com/chutneytesting/action/kafka/KafkaBasicConsumeActionTest.java index f4d70c94e..2eaccb76c 100644 --- a/chutney/action-impl/src/test/java/com/chutneytesting/action/kafka/KafkaBasicConsumeActionTest.java +++ b/chutney/action-impl/src/test/java/com/chutneytesting/action/kafka/KafkaBasicConsumeActionTest.java @@ -16,6 +16,7 @@ import static com.chutneytesting.action.kafka.KafkaBasicConsumeAction.OUTPUT_PAYLOADS; import static com.chutneytesting.action.spi.ActionExecutionResult.Status.Failure; import static com.chutneytesting.action.spi.ActionExecutionResult.Status.Success; +import static java.util.Arrays.asList; import static java.util.Arrays.stream; import static java.util.Collections.emptyMap; import static java.util.Collections.shuffle; @@ -85,7 +86,7 @@ public void before() { @Test void should_set_inputs_default_values() { - KafkaBasicConsumeAction defaultAction = new KafkaBasicConsumeAction(null, null, null, null, null, null, null, null, null, null, null,null); + KafkaBasicConsumeAction defaultAction = new KafkaBasicConsumeAction(null, null, null, null, null, null, null, null, null, null, null, null); assertThat(defaultAction) .hasFieldOrPropertyWithValue("topic", null) .hasFieldOrPropertyWithValue("group", null) @@ -124,7 +125,7 @@ void should_validate_all_mandatory_inputs() { @Test void should_validate_timeout_input() { String badTimeout = "twenty seconds"; - KafkaBasicConsumeAction defaultAction = new KafkaBasicConsumeAction(TARGET_STUB, "topic", "group", null, null, null, null, null, badTimeout, null,null, null); + KafkaBasicConsumeAction defaultAction = new KafkaBasicConsumeAction(TARGET_STUB, "topic", "group", null, null, null, null, null, badTimeout, null, null, null); List errors = defaultAction.validateInputs(); @@ -135,7 +136,7 @@ void should_validate_timeout_input() { @Test void should_validate_ackMode_input() { String badTackMode = "UNKNOWN_ACKMODE"; - KafkaBasicConsumeAction defaultAction = new KafkaBasicConsumeAction(TARGET_STUB, "topic", "group", null, null, null, null, null, null, badTackMode, null,null); + KafkaBasicConsumeAction defaultAction = new KafkaBasicConsumeAction(TARGET_STUB, "topic", "group", null, null, null, null, null, null, badTackMode, null, null); List errors = defaultAction.validateInputs(); @@ -167,7 +168,7 @@ void should_merge_kafka_consumer_target_properties_with_input_properties() { propertyToOverride, "a property value" ); - KafkaBasicConsumeAction defaultAction = new KafkaBasicConsumeAction(target, null, null, properties, null, null, null, null, null, null, null,null); + KafkaBasicConsumeAction defaultAction = new KafkaBasicConsumeAction(target, null, null, properties, null, null, null, null, null, null, null, null); assertThat(defaultAction) .hasFieldOrPropertyWithValue("properties", expectedConfig) ; @@ -198,6 +199,31 @@ public void should_consume_simple_text_message() { assertThat(logger.errors).isEmpty(); } + @Test + public void should_consume_simple_text_message_with_multiple_values_header() { + // Given + ImmutableList
headers = ImmutableList.of( + new RecordHeader("key-with-multiple-values", "value 1".getBytes()), + new RecordHeader("key-with-multiple-values", "value 2".getBytes()) + ); + Action sut = givenKafkaConsumeAction(null, TEXT_PLAIN_VALUE, null); + givenActionReceiveMessages(sut, + buildRecord(FIRST_OFFSET, "KEY", "test message", headers) + ); + + // When + ActionExecutionResult actionExecutionResult = sut.execute(); + + // Then + assertThat(actionExecutionResult.status).isEqualTo(Success); + var result_headers = (List>) actionExecutionResult.outputs.get(OUTPUT_BODY_HEADERS_KEY); + assertThat(result_headers).hasSize(1); + assertThat(result_headers.get(0)) + .containsExactly( + Map.entry("key-with-multiple-values", asList("value 1", "value 2")) + ); + } + @Test public void should_not_find_any_message() { // Given @@ -358,9 +384,11 @@ public void should_select_message_whose_headers_match_given_payload_jsonpath_sel // Given ImmutableList
headers = ImmutableList.of( new RecordHeader("X-Custom-HeaderKey", "X-Custom-HeaderValue".getBytes()), - new RecordHeader("header", "123".getBytes()) + new RecordHeader("header", "123".getBytes()), + new RecordHeader("multi-values-header", "value 1".getBytes()), + new RecordHeader("multi-values-header", "value 2".getBytes()) ); - Action action = givenKafkaConsumeAction(3, null, "$..[?($.header=='123')]", null, null); + Action action = givenKafkaConsumeAction(3, null, "$..[?($.header=='123' && $.multi-values-header contains 'value 1')]", null, null); String textMessageToSelect = "first text message"; String xmlMessageToSelect = "first xml message"; String jsonMessageToSelect = "first json message"; @@ -537,7 +565,7 @@ private KafkaBasicConsumeAction givenKafkaConsumeAction(String selector, String } private KafkaBasicConsumeAction givenKafkaConsumeAction(int expectedMessageNb, String selector, String headerSelector, String mimeType, String timeout) { - return new KafkaBasicConsumeAction(TARGET_STUB, TOPIC, GROUP, emptyMap(), expectedMessageNb, selector, headerSelector, mimeType, timeout, null,null, logger); + return new KafkaBasicConsumeAction(TARGET_STUB, TOPIC, GROUP, emptyMap(), expectedMessageNb, selector, headerSelector, mimeType, timeout, null, null, logger); } private void givenActionReceiveMessages(Action action, ConsumerRecord... messages) { From 8e3016789c7fdf6f8425d5a23b75b7cfd3587d91 Mon Sep 17 00:00:00 2001 From: boddissattva <58807088+boddissattva@users.noreply.github.com> Date: Mon, 9 Dec 2024 18:32:04 +0100 Subject: [PATCH 2/4] fix(action): Kafka multiple values header - implem --- .../action/kafka/KafkaBasicConsumeAction.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/chutney/action-impl/src/main/java/com/chutneytesting/action/kafka/KafkaBasicConsumeAction.java b/chutney/action-impl/src/main/java/com/chutneytesting/action/kafka/KafkaBasicConsumeAction.java index 457860640..778daee33 100644 --- a/chutney/action-impl/src/main/java/com/chutneytesting/action/kafka/KafkaBasicConsumeAction.java +++ b/chutney/action-impl/src/main/java/com/chutneytesting/action/kafka/KafkaBasicConsumeAction.java @@ -16,7 +16,6 @@ import static java.util.Collections.emptyMap; import static java.util.Optional.ofNullable; import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toMap; import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; import static org.apache.commons.lang3.StringUtils.defaultIfEmpty; import static org.apache.commons.lang3.StringUtils.isBlank; @@ -35,6 +34,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -227,7 +227,22 @@ private Map extractMessageFromRecord(ConsumerRecord extractHeaders(ConsumerRecord record) { - return Stream.of(record.headers().toArray()).distinct().collect(toMap(Header::key, header -> new String(header.value(), UTF_8))); + var result = new HashMap(); + Stream
distinctHeaders = Stream.of(record.headers().toArray()).distinct(); + distinctHeaders.forEach(header -> { + if (result.containsKey(header.key())) { + Object v = result.get(header.key()); + if (v instanceof String) { + var list_value = new ArrayList<>(); + list_value.add(v); + result.put(header.key(), list_value); + } + ((Collection) result.get(header.key())).add(new String(header.value(), UTF_8)); + } else { + result.put(header.key(), new String(header.value(), UTF_8)); + } + }); + return result; } private ConcurrentMessageListenerContainer createMessageListenerContainer() { From a6b94433593f7d41993377d1785789ae96afe5c8 Mon Sep 17 00:00:00 2001 From: boddissattva <58807088+boddissattva@users.noreply.github.com> Date: Tue, 10 Dec 2024 11:33:47 +0100 Subject: [PATCH 3/4] chroe(action): review - clean variables names --- .../action/kafka/KafkaBasicConsumeAction.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/chutney/action-impl/src/main/java/com/chutneytesting/action/kafka/KafkaBasicConsumeAction.java b/chutney/action-impl/src/main/java/com/chutneytesting/action/kafka/KafkaBasicConsumeAction.java index 778daee33..939cf04cd 100644 --- a/chutney/action-impl/src/main/java/com/chutneytesting/action/kafka/KafkaBasicConsumeAction.java +++ b/chutney/action-impl/src/main/java/com/chutneytesting/action/kafka/KafkaBasicConsumeAction.java @@ -230,16 +230,17 @@ private Map extractHeaders(ConsumerRecord record var result = new HashMap(); Stream
distinctHeaders = Stream.of(record.headers().toArray()).distinct(); distinctHeaders.forEach(header -> { - if (result.containsKey(header.key())) { - Object v = result.get(header.key()); - if (v instanceof String) { - var list_value = new ArrayList<>(); - list_value.add(v); - result.put(header.key(), list_value); + String headerKey = header.key(); + if (result.containsKey(headerKey)) { + Object headerValue = result.get(headerKey); + if (headerValue instanceof String) { + var headerValueAsList = new ArrayList<>(); + headerValueAsList.add(headerValue); + result.put(headerKey, headerValueAsList); } - ((Collection) result.get(header.key())).add(new String(header.value(), UTF_8)); + ((Collection) result.get(headerKey)).add(new String(header.value(), UTF_8)); } else { - result.put(header.key(), new String(header.value(), UTF_8)); + result.put(headerKey, new String(header.value(), UTF_8)); } }); return result; From ba6614e4a838b19964d5c1f8253b7a2ce4cd5565 Mon Sep 17 00:00:00 2001 From: BDA <58807088+boddissattva@users.noreply.github.com> Date: Wed, 11 Dec 2024 17:51:09 +0100 Subject: [PATCH 4/4] review --- .../action/kafka/KafkaBasicConsumeActionTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/chutney/action-impl/src/test/java/com/chutneytesting/action/kafka/KafkaBasicConsumeActionTest.java b/chutney/action-impl/src/test/java/com/chutneytesting/action/kafka/KafkaBasicConsumeActionTest.java index 2eaccb76c..587dad0aa 100644 --- a/chutney/action-impl/src/test/java/com/chutneytesting/action/kafka/KafkaBasicConsumeActionTest.java +++ b/chutney/action-impl/src/test/java/com/chutneytesting/action/kafka/KafkaBasicConsumeActionTest.java @@ -200,7 +200,7 @@ public void should_consume_simple_text_message() { } @Test - public void should_consume_simple_text_message_with_multiple_values_header() { + public void should_consume_simple_text_message_with_multiple_values_for_the_same_header() { // Given ImmutableList
headers = ImmutableList.of( new RecordHeader("key-with-multiple-values", "value 1".getBytes()), @@ -385,10 +385,10 @@ public void should_select_message_whose_headers_match_given_payload_jsonpath_sel ImmutableList
headers = ImmutableList.of( new RecordHeader("X-Custom-HeaderKey", "X-Custom-HeaderValue".getBytes()), new RecordHeader("header", "123".getBytes()), - new RecordHeader("multi-values-header", "value 1".getBytes()), - new RecordHeader("multi-values-header", "value 2".getBytes()) + new RecordHeader("key-with-multiple-values", "value 1".getBytes()), + new RecordHeader("key-with-multiple-values", "value 2".getBytes()) ); - Action action = givenKafkaConsumeAction(3, null, "$..[?($.header=='123' && $.multi-values-header contains 'value 1')]", null, null); + Action action = givenKafkaConsumeAction(3, null, "$..[?($.header=='123' && $.key-with-multiple-values contains 'value 1')]", null, null); String textMessageToSelect = "first text message"; String xmlMessageToSelect = "first xml message"; String jsonMessageToSelect = "first json message";