Skip to content

Commit

Permalink
GH-192: Add legacyEmbeddedHeadersFormat
Browse files Browse the repository at this point in the history
Fixes: #192

* Expose `spring.cloud.stream.kinesis.binder.legacy-embedded-headers-format` configuration property
(`false` by default)
* Add logic into a `LegacyEmbeddedHeadersSupportBytesMessageMapper` to serialize headers via `EmbeddedHeaderUtils`
when `legacyEmbeddedHeadersFormat == true`
* Use `LegacyEmbeddedHeadersSupportBytesMessageMapper` on producer endpoints as well
* Change `KinesisBinderObservationTests` configuration to verify legacy headers support
  • Loading branch information
artembilan committed May 18, 2023
1 parent 56d6212 commit 8ff26e8
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import org.springframework.integration.metadata.ConcurrentMetadataStore;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.json.EmbeddedJsonHeadersMessageMapper;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.integration.support.management.IntegrationManagement;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -115,7 +114,7 @@ public class KinesisMessageChannelBinder extends

private final DynamoDbAsyncClient dynamoDBClient;

private final String[] headersToEmbed;
private final LegacyEmbeddedHeadersSupportBytesMessageMapper embeddedHeadersMapper;

private KinesisExtendedBindingProperties extendedBindingProperties = new KinesisExtendedBindingProperties();

Expand Down Expand Up @@ -147,7 +146,10 @@ public KinesisMessageChannelBinder(KinesisBinderConfigurationProperties configur
this.dynamoDBClient = dynamoDBClient;
this.awsCredentialsProvider = awsCredentialsProvider;

this.headersToEmbed = headersToMap(configurationProperties);
this.embeddedHeadersMapper =
new LegacyEmbeddedHeadersSupportBytesMessageMapper(
configurationProperties.isLegacyEmbeddedHeadersFormat(),
headersToMap(configurationProperties));
}

public void setExtendedBindingProperties(KinesisExtendedBindingProperties extendedBindingProperties) {
Expand Down Expand Up @@ -263,7 +265,7 @@ private AbstractAwsMessageHandler<?> createKinesisMessageHandler(ProducerDestina
messageHandler.setStream(destination.getName());
messageHandler.setPartitionKeyExpression(partitionKeyExpression);
if (embedHeaders) {
messageHandler.setEmbeddedHeadersMapper(new EmbeddedJsonHeadersMessageMapper(this.headersToEmbed));
messageHandler.setEmbeddedHeadersMapper(this.embeddedHeadersMapper);
}
return messageHandler;
}
Expand All @@ -275,7 +277,7 @@ private AbstractAwsMessageHandler<?> createKplMessageHandler(ProducerDestination
messageHandler.setStream(destination.getName());
messageHandler.setPartitionKeyExpression(partitionKeyExpression);
if (embedHeaders) {
messageHandler.setEmbeddedHeadersMapper(new EmbeddedJsonHeadersMessageMapper(this.headersToEmbed));
messageHandler.setEmbeddedHeadersMapper(this.embeddedHeadersMapper);
}
return messageHandler;
}
Expand Down Expand Up @@ -403,7 +405,7 @@ private MessageProducerSupport createKclConsumerEndpoint(ConsumerDestination des
adapter.setConsumerBackoff(kinesisConsumerProperties.getConsumerBackoff());
adapter.setListenerMode(kinesisConsumerProperties.getListenerMode());
if (properties.getExtension().isEmbedHeaders()) {
adapter.setEmbeddedHeadersMapper(new LegacyEmbeddedHeadersSupportBytesMessageMapper());
adapter.setEmbeddedHeadersMapper(this.embeddedHeadersMapper);
}

if (properties.isUseNativeDecoding()) {
Expand Down Expand Up @@ -496,7 +498,7 @@ else if (shardId != null) {

adapter.setListenerMode(kinesisConsumerProperties.getListenerMode());
if (properties.getExtension().isEmbedHeaders()) {
adapter.setEmbeddedHeadersMapper(new LegacyEmbeddedHeadersSupportBytesMessageMapper());
adapter.setEmbeddedHeadersMapper(this.embeddedHeadersMapper);
}

if (properties.isUseNativeDecoding()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.springframework.integration.support.json.EmbeddedJsonHeadersMessageMapper;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

/**
* The {@link BytesMessageMapper} implementation to support an embedded headers
Expand All @@ -41,9 +42,16 @@
*/
class LegacyEmbeddedHeadersSupportBytesMessageMapper implements BytesMessageMapper {

private final EmbeddedJsonHeadersMessageMapper delegate = new EmbeddedJsonHeadersMessageMapper();
private final EmbeddedJsonHeadersMessageMapper delegate;

LegacyEmbeddedHeadersSupportBytesMessageMapper() {
private final String[] headersToEmbed;

private final boolean legacyEmbeddedHeadersFormat;

LegacyEmbeddedHeadersSupportBytesMessageMapper(boolean legacyEmbeddedHeadersFormat, String[] headersToEmbed) {
this.legacyEmbeddedHeadersFormat = legacyEmbeddedHeadersFormat;
this.delegate = new EmbeddedJsonHeadersMessageMapper(headersToEmbed);
this.headersToEmbed = headersToEmbed;
}

@Override
Expand Down Expand Up @@ -72,8 +80,18 @@ public Message<?> toMessage(byte[] payload, @Nullable Map<String, Object> header

@Override
public byte[] fromMessage(Message<?> message) {
throw new UnsupportedOperationException(
"The new embedded headers format is produced via 'EmbeddedJsonHeadersMessageMapper'");
if (this.legacyEmbeddedHeadersFormat) {
MessageValues transformed = new MessageValues(message);
Object contentType = transformed.get(MessageHeaders.CONTENT_TYPE);
// transform content type headers to String, so that they can be properly embedded in JSON
if (contentType != null) {
transformed.put(MessageHeaders.CONTENT_TYPE, contentType.toString());
}
return EmbeddedHeaderUtils.embedHeaders(transformed, this.headersToEmbed);
}
else {
return this.delegate.fromMessage(message);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public class KinesisBinderConfigurationProperties {
*/
private boolean enableObservation;

/**
* Enable serializing Kinesis records with legacy embedded headers format.
*/
private boolean legacyEmbeddedHeadersFormat;

private final Checkpoint checkpoint = new Checkpoint();

private final Locks locks = new Locks();
Expand Down Expand Up @@ -136,6 +141,14 @@ public void setEnableObservation(boolean enableObservation) {
this.enableObservation = enableObservation;
}

public boolean isLegacyEmbeddedHeadersFormat() {
return this.legacyEmbeddedHeadersFormat;
}

public void setLegacyEmbeddedHeadersFormat(boolean legacyEmbeddedHeadersFormat) {
this.legacyEmbeddedHeadersFormat = legacyEmbeddedHeadersFormat;
}

/**
* The checkpoint DynamoDB table configuration properties.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
"spring.cloud.stream.bindings.kinesisConsumer-in-0.destination=" + KinesisBinderObservationTests.KINESIS_STREAM,
"spring.cloud.stream.bindings.kinesisConsumer-in-0.group=observation-group",
"spring.cloud.stream.kinesis.binder.enable-observation=true",
"spring.cloud.stream.kinesis.binder.legacy-embedded-headers-format=true",
"logging.level.org.springframework.cloud.stream.binder.kinesis.observation=debug",
"management.tracing.sampling.probability=1.0",
"spring.cloud.aws.region.static=eu-west-2"})
Expand Down

0 comments on commit 8ff26e8

Please sign in to comment.