Skip to content

Commit

Permalink
GH-9558: Expose BarrierSpec.discardChannel & triggerTimeout
Browse files Browse the repository at this point in the history
Fixes: #9558
Issue link: #9558

(cherry picked from commit e1cebaf)
  • Loading branch information
artembilan authored and spring-builds committed Oct 18, 2024
1 parent beed81d commit 83db2bb
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,7 @@
import org.springframework.integration.handler.DiscardingMessageHandler;
import org.springframework.integration.handler.MessageTriggerAction;
import org.springframework.integration.store.SimpleMessageGroup;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
Expand Down Expand Up @@ -170,7 +171,8 @@ public BarrierMessageHandler(long requestTimeout, long triggerTimeout, MessageGr
}

/**
* Set the name of the channel to which late arriving trigger messages are sent.
* Set the name of the channel to which late arriving trigger messages are sent,
* or request message does not arrive in time.
* @param discardChannelName the discard channel.
* @since 5.0
*/
Expand All @@ -179,7 +181,8 @@ public void setDiscardChannelName(String discardChannelName) {
}

/**
* Set the channel to which late arriving trigger messages are sent.
* Set the channel to which late arriving trigger messages are sent,
* or request message does not arrive in time.
* @param discardChannel the discard channel.
* @since 5.0
*/
Expand All @@ -188,8 +191,11 @@ public void setDiscardChannel(MessageChannel discardChannel) {
}

/**
* Return the discard message channel for trigger action message.
* @return a discard message channel.
* @since 5.0
*/
@Nullable
@Override
public MessageChannel getDiscardChannel() {
String channelName = this.discardChannelName;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,8 @@
import org.springframework.integration.aggregator.HeaderAttributeCorrelationStrategy;
import org.springframework.integration.aggregator.MessageGroupProcessor;
import org.springframework.integration.config.ConsumerEndpointFactoryBean;
import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;

/**
Expand All @@ -43,6 +45,15 @@ public class BarrierSpec extends ConsumerEndpointSpec<BarrierSpec, BarrierMessag
private CorrelationStrategy correlationStrategy =
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID);

@Nullable
private MessageChannel discardChannel;

@Nullable
private String discardChannelName;

@Nullable
private Long triggerTimeout;

protected BarrierSpec(long timeout) {
super(null);
this.timeout = timeout;
Expand All @@ -60,9 +71,57 @@ public BarrierSpec correlationStrategy(CorrelationStrategy correlationStrategy)
return this;
}

/**
* Set the channel to which late arriving trigger messages are sent,
* or request message does not arrive in time.
* @param discardChannel the message channel for discarded triggers.
* @return the spec
* @since 6.2.10
*/
public BarrierSpec discardChannel(@Nullable MessageChannel discardChannel) {
this.discardChannel = discardChannel;
return this;
}

/**
* Set the channel bean name to which late arriving trigger messages are sent,
* or request message does not arrive in time.
* @param discardChannelName the message channel for discarded triggers.
* @return the spec
* @since 6.2.10
*/
public BarrierSpec discardChannel(@Nullable String discardChannelName) {
this.discardChannelName = discardChannelName;
return this;
}

/**
* Set the timeout in milliseconds when waiting for a request message.
* @param triggerTimeout the timeout in milliseconds when waiting for a request message.
* @return the spec
* @since 6.2.10
*/
public BarrierSpec triggerTimeout(long triggerTimeout) {
this.triggerTimeout = triggerTimeout;
return this;
}

@Override
public Tuple2<ConsumerEndpointFactoryBean, BarrierMessageHandler> doGet() {
this.handler = new BarrierMessageHandler(this.timeout, this.outputProcessor, this.correlationStrategy);
if (this.triggerTimeout == null) {
this.handler = new BarrierMessageHandler(this.timeout, this.outputProcessor, this.correlationStrategy);
}
else {
this.handler =
new BarrierMessageHandler(this.timeout, this.triggerTimeout, this.outputProcessor,
this.correlationStrategy);
}
if (this.discardChannel != null) {
this.handler.setDiscardChannel(this.discardChannel);
}
else if (this.discardChannelName != null) {
this.handler.setDiscardChannelName(this.discardChannelName);
}
return super.doGet();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ public IntegrationFlow barrierFlow() {
return f -> f
.barrier(10000, b -> b
.correlationStrategy(new HeaderAttributeCorrelationStrategy(BARRIER))
.discardChannel("nullChannel")
.outputProcessor(g ->
g.getMessages()
.stream()
Expand Down
1 change: 1 addition & 0 deletions src/reference/antora/modules/ROOT/pages/barrier.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,6 @@ XML::
Depending on which one has a message arrive first, either the thread sending a message to `in` or the thread sending a message to `release` waits for up to ten seconds until the other message arrives.
When the message is released, the `out` channel is sent a message that combines the result of invoking the custom `MessageGroupProcessor` bean, named `myOutputProcessor`.
If the main thread times out and a trigger arrives later, you can configure a discard channel to which the late trigger is sent.
The trigger message is also discarded if request message does not arrive in time.

For an example of this component, see the https://github.com/spring-projects/spring-integration-samples/tree/main/basic/barrier[barrier sample application].

0 comments on commit 83db2bb

Please sign in to comment.