Use the aggregator
application to combine multiple messages into one, based on some correlation mechanism.
This processor is fully based on the Aggregator component from Spring Integration. So, please, consult there for use-cases and functionality.
If the aggregation and correlation logic is based on the default strategies, the correlationId
, sequenceNumber
and sequenceSize
headers must be presented in the incoming message.
Aggregator Processor is fully based on the Spring Integration’s AggregatingMessageHandler
and since correlation and aggregation logic don’t require particular types, the input payload can be anything able to be transferred over the network and Spring Cloud Stream Binder.
If payload is JSON, the JsonPropertyAccessor
helps to build straightforward SpEL expressions for correlation, release and aggregation functions.
Returns all headers of the incoming messages that have no conflicts among the group. An absent header on one or more messages within the group is not considered a conflict.
The aggregator processor has the following options:
- aggregator.aggregation
-
SpEL expression for aggregation strategy. Default is collection of payloads (Expression, default:
<none>
) - aggregator.correlation
-
SpEL expression for correlation key. Default to correlationId header (Expression, default:
<none>
) - aggregator.group-timeout
-
SpEL expression for timeout to expiring uncompleted groups (Expression, default:
<none>
) - aggregator.message-store-entity
-
Persistence message store entity: table prefix in RDBMS, collection name in MongoDb, etc (String, default:
<none>
) - aggregator.message-store-type
-
Message store type (String, default:
<none>
) - aggregator.release
-
SpEL expression for release strategy. Default is based on the sequenceSize header (Expression, default:
<none>
) - spring.data.mongodb.authentication-database
-
Authentication database name. (String, default:
<none>
) - spring.data.mongodb.database
-
Database name. (String, default:
<none>
) - spring.data.mongodb.field-naming-strategy
-
Fully qualified name of the FieldNamingStrategy to use. (Class<?>, default:
<none>
) - spring.data.mongodb.grid-fs-database
-
GridFS database name. (String, default:
<none>
) - spring.data.mongodb.host
-
Mongo server host. Cannot be set with URI. (String, default:
<none>
) - spring.data.mongodb.password
-
Login password of the mongo server. Cannot be set with URI. (Character[], default:
<none>
) - spring.data.mongodb.port
-
Mongo server port. Cannot be set with URI. (Integer, default:
<none>
) - spring.data.mongodb.uri
-
Mongo database URI. Cannot be set with host, port and credentials. (String, default:
mongodb://localhost/test
) - spring.data.mongodb.username
-
Login user of the mongo server. Cannot be set with URI. (String, default:
<none>
) - spring.datasource.continue-on-error
-
Whether to stop if an error occurs while initializing the database. (Boolean, default:
false
) - spring.datasource.data
-
Data (DML) script resource references. (List<String>, default:
<none>
) - spring.datasource.data-password
-
Password of the database to execute DML scripts (if different). (String, default:
<none>
) - spring.datasource.data-username
-
Username of the database to execute DML scripts (if different). (String, default:
<none>
) - spring.datasource.driver-class-name
-
Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default:
<none>
) - spring.datasource.generate-unique-name
-
Whether to generate a random datasource name. (Boolean, default:
false
) - spring.datasource.initialization-mode
-
Initialize the datasource with available DDL and DML scripts. (DataSourceInitializationMode, default:
embedded
, possible values:ALWAYS
,EMBEDDED
,NEVER
) - spring.datasource.jndi-name
-
JNDI location of the datasource. Class, url, username & password are ignored when set. (String, default:
<none>
) - spring.datasource.name
-
Name of the datasource. Default to "testdb" when using an embedded database. (String, default:
<none>
) - spring.datasource.password
-
Login password of the database. (String, default:
<none>
) - spring.datasource.platform
-
Platform to use in the DDL or DML scripts (such as schema-${platform}.sql or data-${platform}.sql). (String, default:
all
) - spring.datasource.schema
-
Schema (DDL) script resource references. (List<String>, default:
<none>
) - spring.datasource.schema-password
-
Password of the database to execute DDL scripts (if different). (String, default:
<none>
) - spring.datasource.schema-username
-
Username of the database to execute DDL scripts (if different). (String, default:
<none>
) - spring.datasource.separator
-
Statement separator in SQL initialization scripts. (String, default:
;
) - spring.datasource.sql-script-encoding
-
SQL scripts encoding. (Charset, default:
<none>
) - spring.datasource.type
-
Fully qualified name of the connection pool implementation to use. By default, it is auto-detected from the classpath. (Class<DataSource>, default:
<none>
) - spring.datasource.url
-
JDBC URL of the database. (String, default:
<none>
) - spring.datasource.username
-
Login username of the database. (String, default:
<none>
) - spring.mongodb.embedded.features
-
Comma-separated list of features to enable. Uses the defaults of the configured version by default. (Set<Feature>, default:
[sync_delay]
) - spring.mongodb.embedded.version
-
Version of Mongo to use. (String, default:
3.5.5
) - spring.redis.database
-
Database index used by the connection factory. (Integer, default:
0
) - spring.redis.host
-
Redis server host. (String, default:
localhost
) - spring.redis.password
-
Login password of the redis server. (String, default:
<none>
) - spring.redis.port
-
Redis server port. (Integer, default:
6379
) - spring.redis.ssl
-
Whether to enable SSL support. (Boolean, default:
false
) - spring.redis.timeout
-
Connection timeout. (Duration, default:
<none>
) - spring.redis.url
-
Connection URL. Overrides host, port, and password. User is ignored. Example: redis://user:[email protected]:6379 (String, default:
<none>
)
By default the aggregator
processor uses:
- HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID)
- for correlation
;
- SequenceSizeReleaseStrategy
- for release
;
- DefaultAggregatingMessageGroupProcessor
- for aggregation
;
- SimpleMessageStore
- for messageStoreType
.
The aggregator
application can be configured for persistent MessageGroupStore
implementations.
The configuration for target technology is fully based on the Spring Boot auto-configuration.
But default JDBC, MongoDb and Redis auto-configurations are excluded.
They are @Import
ed basing on the aggregator.messageStoreType
configuration property.
Consult Spring Boot Reference Manual for auto-configuration for particular technology you use for aggregator
.
The JDBC JdbcMessageStore
requires particular tables in the target data base.
You can find schema scripts for appropriate RDBMS vendors in the org.springframework.integration.jdbc
package of the spring-integration-jdbc
jar.
Those scripts can be used for automatic data base initialization via Spring Boot.
For example:
java -jar aggregator-rabbit-1.0.0.RELEASE.jar
--aggregator.message-store-type=jdbc
--spring.datasource.url=jdbc:h2:mem:test
--spring.datasource.schema=org/springframework/integration/jdbc/schema-h2.sql
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar aggregator_processor.jar
--aggregator.message-store-type=jdbc
--spring.datasource.url=jdbc:h2:mem:test
--spring.datasource.schema=org/springframework/integration/jdbc/schema-h2.sql
java -jar aggregator_processor.jar
--spring.data.mongodb.port=0
--aggregator.correlation=T(Thread).currentThread().id
--aggregator.release="!#this.?[payload == 'bar'].empty"
--aggregator.aggregation="#this.?[payload == 'foo'].![payload]"
--aggregator.message-store-type=mongodb
--aggregator.message-store-entity=aggregatorTest
This project adheres to the Contributor Covenant code of conduct. By participating, you are expected to uphold this code. Please report unacceptable behavior to [email protected].