From 384591069e3ae6f07906f6f6bac866a7e7fdc708 Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Sun, 16 Feb 2025 17:39:34 -0600 Subject: [PATCH] Allow pulsar metadata headers to be mapped outbound This allows pulsar metadata headers, which are excluded on the outbound messages by default, to be included in the outbound message headers. Resolves #1037 --- .../ROOT/pages/reference/pulsar-header.adoc | 4 +- .../header/AbstractPulsarHeaderMapper.java | 38 +++++++++---------- .../AbstractPulsarHeaderMapperTests.java | 16 +++++++- .../ToStringPulsarHeaderMapperTests.java | 5 +-- 4 files changed, 36 insertions(+), 27 deletions(-) diff --git a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar-header.adoc b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar-header.adoc index f48446e3a..c345caa6d 100644 --- a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar-header.adoc +++ b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar-header.adoc @@ -63,9 +63,9 @@ static class PulsarHeadersCustomObjectMapperTestConfig { === Inbound/Outbound Patterns On the inbound side, by default, all Pulsar headers (message metadata plus user properties) are mapped to `MessageHeaders`. -On the outbound side, by default, all `MessageHeaders` are mapped, except `id`, `timestamp`, and the headers that represent the Pulsar message metadata. +On the outbound side, by default, all `MessageHeaders` are mapped, except `id`, `timestamp`, and the headers that represent the Pulsar message metadata (i.e. the headers that are prefixed with `pulsar_message_`). You can specify which headers are mapped for inbound and outbound messages by configuring the `inboundPatterns` and `outboundPatterns` on a mapper bean you provide. - +You can include Pulsar message metadata headers on the outbound messages by adding the exact header name to the `outboundPatterns` as patterns are not supported for metadata headers. Patterns are rather simple and can contain a leading wildcard (`\*`), a trailing wildcard, or both (for example, `*.cat.*`). You can negate patterns with a leading `!`. The first pattern that matches a header name (whether positive or negative) wins. diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/support/header/AbstractPulsarHeaderMapper.java b/spring-pulsar/src/main/java/org/springframework/pulsar/support/header/AbstractPulsarHeaderMapper.java index cd2090bb9..0bce8c61f 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/support/header/AbstractPulsarHeaderMapper.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/support/header/AbstractPulsarHeaderMapper.java @@ -55,6 +55,13 @@ public abstract class AbstractPulsarHeaderMapper NEVER_MATCH_OUTBOUND_INTERNAL_HEADERS = List.of(PulsarHeaders.KEY, + PulsarHeaders.KEY_BYTES, PulsarHeaders.ORDERING_KEY, PulsarHeaders.INDEX, PulsarHeaders.MESSAGE_ID, + PulsarHeaders.BROKER_PUBLISH_TIME, PulsarHeaders.EVENT_TIME, PulsarHeaders.MESSAGE_SIZE, + PulsarHeaders.PRODUCER_NAME, PulsarHeaders.RAW_DATA, PulsarHeaders.PUBLISH_TIME, + PulsarHeaders.REDELIVERY_COUNT, PulsarHeaders.REPLICATED_FROM, PulsarHeaders.SCHEMA_VERSION, + PulsarHeaders.SEQUENCE_ID, PulsarHeaders.TOPIC_NAME); + protected final LogAccessor logger = new LogAccessor(this.getClass()); private final List inboundMatchers = new ArrayList<>(); @@ -64,10 +71,9 @@ public abstract class AbstractPulsarHeaderMapper - * NOTE: Internal framework headers are never mapped - * outbound. By default, the {@code "id"} and {@code "timestamp"} headers are also - * excluded from outbound mapping but can be included by adding them to - * {@code outboundPatterns}. + * NOTE: By default, internal framework headers and the {@code "id"} + * and {@code "timestamp"} headers are not mapped outbound but can be + * included by adding them to {@code outboundPatterns}. *

* NOTE: The patterns are applied in order, stopping on the first * match (positive or negative). When no pattern is specified, the {@code "*"} pattern @@ -84,23 +90,7 @@ public AbstractPulsarHeaderMapper(List inboundPatterns, List out Objects.requireNonNull(outboundPatterns, "outboundPatterns must be specified"); inboundPatterns.forEach((p) -> this.inboundMatchers.add(PatternMatch.fromPatternString(p))); // @formatter:off - this.outboundMatchers.add(new NeverMatch( - PulsarHeaders.KEY, - PulsarHeaders.KEY_BYTES, - PulsarHeaders.ORDERING_KEY, - PulsarHeaders.INDEX, - PulsarHeaders.MESSAGE_ID, - PulsarHeaders.BROKER_PUBLISH_TIME, - PulsarHeaders.EVENT_TIME, - PulsarHeaders.MESSAGE_SIZE, - PulsarHeaders.PRODUCER_NAME, - PulsarHeaders.RAW_DATA, - PulsarHeaders.PUBLISH_TIME, - PulsarHeaders.REDELIVERY_COUNT, - PulsarHeaders.REPLICATED_FROM, - PulsarHeaders.SCHEMA_VERSION, - PulsarHeaders.SEQUENCE_ID, - PulsarHeaders.TOPIC_NAME)); + this.outboundMatchers.add(getNeverMatch(outboundPatterns)); // @formatter:on if (outboundPatterns.isEmpty()) { this.outboundMatchers.add(EXCLUDE_PATTERN_ID); @@ -114,6 +104,12 @@ public AbstractPulsarHeaderMapper(List inboundPatterns, List out } } + private NeverMatch getNeverMatch(List outboundPatterns) { + List neverMatches = new ArrayList<>(NEVER_MATCH_OUTBOUND_INTERNAL_HEADERS); + neverMatches.removeAll(outboundPatterns); + return new NeverMatch(neverMatches.toArray(new String[0])); + } + @Override public Map toPulsarHeaders(MessageHeaders springHeaders) { Objects.requireNonNull(springHeaders, "springHeaders must not be null"); diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/support/header/AbstractPulsarHeaderMapperTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/support/header/AbstractPulsarHeaderMapperTests.java index 9bea1bf39..1466c3095 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/support/header/AbstractPulsarHeaderMapperTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/support/header/AbstractPulsarHeaderMapperTests.java @@ -28,6 +28,7 @@ import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Objects; import org.apache.pulsar.client.api.Message; @@ -206,6 +207,19 @@ void ensureCallbacksInvoked() { verify(spyTestMapper, times(springHeaders.size())).matchesForOutbound(anyString()); } + @Test + void neverMatchFiltersCanBeConfigured() { + var mapper = mapperWithOutboundPatterns(PulsarHeaders.KEY, PulsarHeaders.MESSAGE_ID, + PulsarHeaders.PRODUCER_NAME, "noSuchInternalHeader"); + var springHeaders = new HashMap(); + springHeaders.put(PulsarHeaders.KEY, "testKey"); + springHeaders.put(PulsarHeaders.KEY_BYTES, "testKeyBytes"); + springHeaders.put(PulsarHeaders.MESSAGE_ID, "testMsg"); + springHeaders.put(PulsarHeaders.PRODUCER_NAME, "testProducer"); + assertThat(mapper.toPulsarHeaders(new MessageHeaders(springHeaders))).containsOnlyKeys(PulsarHeaders.KEY, + PulsarHeaders.MESSAGE_ID, PulsarHeaders.PRODUCER_NAME); + } + } @Nested @@ -249,7 +263,7 @@ static class TestPulsarHeaderMapper extends AbstractPulsarHeaderMapper mapperWithInboundPatterns(String... patterns) { - return new ToStringPulsarHeaderMapper(List.of(patterns), Collections.emptyList()); + return new ToStringPulsarHeaderMapper(List.of(patterns), List.of()); } @Override AbstractPulsarHeaderMapper mapperWithOutboundPatterns(String... patterns) { - return new ToStringPulsarHeaderMapper(Collections.emptyList(), List.of(patterns)); + return new ToStringPulsarHeaderMapper(List.of(), List.of(patterns)); } @Test