Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow pulsar metadata headers to be mapped outbound #1038

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ static class PulsarHeadersCustomObjectMapperTestConfig {

=== Inbound/Outbound Patterns
On the inbound side, by default, all Pulsar headers (message metadata plus user properties) are mapped to `MessageHeaders`.
On the outbound side, by default, all `MessageHeaders` are mapped, except `id`, `timestamp`, and the headers that represent the Pulsar message metadata.
On the outbound side, by default, all `MessageHeaders` are mapped, except `id`, `timestamp`, and the headers that represent the Pulsar message metadata (i.e. the headers that are prefixed with `pulsar_message_`).
You can specify which headers are mapped for inbound and outbound messages by configuring the `inboundPatterns` and `outboundPatterns` on a mapper bean you provide.

You can include Pulsar message metadata headers on the outbound messages by adding the exact header name to the `outboundPatterns` as patterns are not supported for metadata headers.
Patterns are rather simple and can contain a leading wildcard (`\*`), a trailing wildcard, or both (for example, `*.cat.*`).
You can negate patterns with a leading `!`.
The first pattern that matches a header name (whether positive or negative) wins.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ public abstract class AbstractPulsarHeaderMapper<ToPulsarHeadersContextType, ToS

private static final PatternMatch EXCLUDE_PATTERN_TIMESTAMP = PatternMatch.fromPatternString("!timestamp");

private static final List<String> NEVER_MATCH_OUTBOUND_INTERNAL_HEADERS = List.of(PulsarHeaders.KEY,
PulsarHeaders.KEY_BYTES, PulsarHeaders.ORDERING_KEY, PulsarHeaders.INDEX, PulsarHeaders.MESSAGE_ID,
PulsarHeaders.BROKER_PUBLISH_TIME, PulsarHeaders.EVENT_TIME, PulsarHeaders.MESSAGE_SIZE,
PulsarHeaders.PRODUCER_NAME, PulsarHeaders.RAW_DATA, PulsarHeaders.PUBLISH_TIME,
PulsarHeaders.REDELIVERY_COUNT, PulsarHeaders.REPLICATED_FROM, PulsarHeaders.SCHEMA_VERSION,
PulsarHeaders.SEQUENCE_ID, PulsarHeaders.TOPIC_NAME);

protected final LogAccessor logger = new LogAccessor(this.getClass());

private final List<PulsarHeaderMatcher> inboundMatchers = new ArrayList<>();
Expand All @@ -64,10 +71,9 @@ public abstract class AbstractPulsarHeaderMapper<ToPulsarHeadersContextType, ToS
/**
* Construct a mapper that will match the supplied inbound and outbound patterns.
* <p>
* <strong>NOTE:</strong> Internal framework headers are <em>never</em> mapped
* outbound. By default, the {@code "id"} and {@code "timestamp"} headers are also
* excluded from outbound mapping but can be included by adding them to
* {@code outboundPatterns}.
* <strong>NOTE:</strong> By default, internal framework headers and the {@code "id"}
* and {@code "timestamp"} headers are <em>not</em> mapped outbound but can be
* included by adding them to {@code outboundPatterns}.
* <p>
* <strong>NOTE:</strong> The patterns are applied in order, stopping on the first
* match (positive or negative). When no pattern is specified, the {@code "*"} pattern
Expand All @@ -84,23 +90,7 @@ public AbstractPulsarHeaderMapper(List<String> inboundPatterns, List<String> out
Objects.requireNonNull(outboundPatterns, "outboundPatterns must be specified");
inboundPatterns.forEach((p) -> this.inboundMatchers.add(PatternMatch.fromPatternString(p)));
// @formatter:off
this.outboundMatchers.add(new NeverMatch(
PulsarHeaders.KEY,
PulsarHeaders.KEY_BYTES,
PulsarHeaders.ORDERING_KEY,
PulsarHeaders.INDEX,
PulsarHeaders.MESSAGE_ID,
PulsarHeaders.BROKER_PUBLISH_TIME,
PulsarHeaders.EVENT_TIME,
PulsarHeaders.MESSAGE_SIZE,
PulsarHeaders.PRODUCER_NAME,
PulsarHeaders.RAW_DATA,
PulsarHeaders.PUBLISH_TIME,
PulsarHeaders.REDELIVERY_COUNT,
PulsarHeaders.REPLICATED_FROM,
PulsarHeaders.SCHEMA_VERSION,
PulsarHeaders.SEQUENCE_ID,
PulsarHeaders.TOPIC_NAME));
this.outboundMatchers.add(getNeverMatch(outboundPatterns));
// @formatter:on
if (outboundPatterns.isEmpty()) {
this.outboundMatchers.add(EXCLUDE_PATTERN_ID);
Expand All @@ -114,6 +104,12 @@ public AbstractPulsarHeaderMapper(List<String> inboundPatterns, List<String> out
}
}

private NeverMatch getNeverMatch(List<String> outboundPatterns) {
List<String> neverMatches = new ArrayList<>(NEVER_MATCH_OUTBOUND_INTERNAL_HEADERS);
neverMatches.removeAll(outboundPatterns);
return new NeverMatch(neverMatches.toArray(new String[0]));
}

@Override
public Map<String, String> toPulsarHeaders(MessageHeaders springHeaders) {
Objects.requireNonNull(springHeaders, "springHeaders must not be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;

import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -206,6 +207,19 @@ void ensureCallbacksInvoked() {
verify(spyTestMapper, times(springHeaders.size())).matchesForOutbound(anyString());
}

@Test
void neverMatchFiltersCanBeConfigured() {
var mapper = mapperWithOutboundPatterns(PulsarHeaders.KEY, PulsarHeaders.MESSAGE_ID,
PulsarHeaders.PRODUCER_NAME, "noSuchInternalHeader");
var springHeaders = new HashMap<String, Object>();
springHeaders.put(PulsarHeaders.KEY, "testKey");
springHeaders.put(PulsarHeaders.KEY_BYTES, "testKeyBytes");
springHeaders.put(PulsarHeaders.MESSAGE_ID, "testMsg");
springHeaders.put(PulsarHeaders.PRODUCER_NAME, "testProducer");
assertThat(mapper.toPulsarHeaders(new MessageHeaders(springHeaders))).containsOnlyKeys(PulsarHeaders.KEY,
PulsarHeaders.MESSAGE_ID, PulsarHeaders.PRODUCER_NAME);
}

}

@Nested
Expand Down Expand Up @@ -249,7 +263,7 @@ static class TestPulsarHeaderMapper extends AbstractPulsarHeaderMapper<String, S
private String toPulsarHeadersContext;

TestPulsarHeaderMapper(String toSpringHeadersContext, String toPulsarHeadersContext) {
super(Collections.emptyList(), Collections.emptyList());
super(List.of(), List.of());
this.toSpringHeadersContext = toSpringHeadersContext;
this.toPulsarHeadersContext = toPulsarHeadersContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.pulsar.support.header.PulsarHeaderMapperTestUtil.mockPulsarMessage;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -43,12 +42,12 @@ class ToStringPulsarHeaderMapperTests extends AbstractPulsarHeaderMapperTests {

@Override
AbstractPulsarHeaderMapper<?, ?> mapperWithInboundPatterns(String... patterns) {
return new ToStringPulsarHeaderMapper(List.of(patterns), Collections.emptyList());
return new ToStringPulsarHeaderMapper(List.of(patterns), List.of());
}

@Override
AbstractPulsarHeaderMapper<?, ?> mapperWithOutboundPatterns(String... patterns) {
return new ToStringPulsarHeaderMapper(Collections.emptyList(), List.of(patterns));
return new ToStringPulsarHeaderMapper(List.of(), List.of(patterns));
}

@Test
Expand Down