Skip to content

Commit

Permalink
GH-2155: DeadLetterPublishingRecoverer Improvement
Browse files Browse the repository at this point in the history
Resolves #2155

Co-authored-by: Viacheslav Chernyshev <[email protected]>

- Add a `BitSet` property to suppress individual standard headers
- Support multiple `headersFunction`
- Allow complete customization of exception headers
- Add `setHeadersFunction` to the DLPR factory for retryable topics

* Fix typo in doc.

* Rework enum usage; other refactoring; add getters to `HeaderNames`.

* Fix javadocs.

* Doc polishing.

* Change BitSet to EnumSet.

* Fix bogus import.

* Defend against immutable headers; only create new when detected.

* Fix NOSONAR typo.

**Cherry-pick to `2.8.x`**

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java
  • Loading branch information
garyrussell authored and artembilan committed Mar 10, 2022
1 parent a1ca194 commit f3f3eac
Show file tree
Hide file tree
Showing 7 changed files with 689 additions and 36 deletions.
30 changes: 29 additions & 1 deletion spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5578,7 +5578,7 @@ Key exceptions are only caused by `DeserializationException` s so there is no `D
There are two mechanisms to add more headers.

1. Subclass the recoverer and override `createProducerRecord()` - call `super.createProducerRecord()` and add more headers.
2. Provide a `BiFunction` to receive the consumer record and exception, returning a `Headers` object; headers from there will be copied to the final producer record.
2. Provide a `BiFunction` to receive the consumer record and exception, returning a `Headers` object; headers from there will be copied to the final producer record; also see <<dlpr-headers>>.
Use `setHeadersFunction()` to set the `BiFunction`.

The second is simpler to implement but the first has more information available, including the already assembled standard headers.
Expand Down Expand Up @@ -5673,6 +5673,34 @@ The reason for the two properties is because, while you might want to retain onl

`appendOriginalHeaders` is applied to all headers named `*ORIGINAL*` while `stripPreviousExceptionHeaders` is applied to all headers named `*EXCEPTION*`.

Starting with version 2.8.4, you now can control which of the standard headers will be added to the output record.
See the `enum HeadersToAdd` for the generic names of the (currently) 10 standard headers that are added by default (these are not the actual header names, just an abstraction; the actual header names are set up by the `getHeaderNames()` method which subclasses can override.

To exclude headers, use the `excludeHeaders()` method; for example, to suppress adding the exception stack trace in a header, use:

====
[source, java]
----
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);
----
====

In addition, you can completely customize the addition of exception headers by adding an `ExceptionHeadersCreator`; this also disables all standard exception headers.

====
[source, java]
----
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
kafkaHeaders.add(new RecordHeader(..., ...));
});
----
====

Also starting with version 2.8.4, you can now provide multiple headers functions, via the `addHeadersFunction` method.
This allows additional functions to apply, even if another function has already been registered, for example, when using <<retry-topic>>.

Also see <<retry-headers>> with <<retry-topic>>.

[[exp-backoff]]
Expand Down
2 changes: 2 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/retrytopic.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,8 @@ DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver resolver)
----
====

Starting with version 2.8.4, if you wish to add custom headers (in addition to the retry information headers added by the factory, you can add a `headersFunction` to the factory - `factory.setHeadersFunction((rec, ex) -> { ... })`

[[retry-topic-combine-blocking]]
==== Combining blocking and non-blocking retries

Expand Down
2 changes: 2 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ See <<delegating-serialization>> for more information.

The property `stripPreviousExceptionHeaders` is now `true` by default.

There are now several techniques to customize which headers are added to the output record.

See <<dlpr-headers>> for more information.

[[x28-retryable-topics-changes]]
Expand Down
Loading

0 comments on commit f3f3eac

Please sign in to comment.