diff --git a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java index 82a184f..20ad901 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java +++ b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java @@ -21,6 +21,9 @@ import java.util.Arrays; import java.util.List; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer; @@ -174,6 +177,8 @@ public class KclMessageDrivenChannelAdapter extends MessageProducerSupport private long pollingIdleTime = 1500L; + private long gracefulShutdownTimeout; + public KclMessageDrivenChannelAdapter(String... streams) { this(KinesisAsyncClient.create(), CloudWatchAsyncClient.create(), DynamoDbAsyncClient.create(), streams); } @@ -393,6 +398,17 @@ public void setPollingIdleTime(long pollingIdleTime) { this.pollingIdleTime = pollingIdleTime; } + /** + * The timeout for {@link Scheduler#startGracefulShutdown()}. + * Defaults to {@code 0} with the meaning to call {@link Scheduler#shutdown()}. + * @param gracefulShutdownTimeout the timeout for {@link Scheduler#startGracefulShutdown()}. + * @since 3.0.8 + * @see Scheduler#startGracefulShutdown() + */ + public void setGracefulShutdownTimeout(long gracefulShutdownTimeout) { + this.gracefulShutdownTimeout = gracefulShutdownTimeout; + } + @Override protected void onInit() { super.onInit(); @@ -494,14 +510,25 @@ protected void doStart() { @Override protected void doStop() { super.doStop(); - this.scheduler.shutdown(); + if (this.gracefulShutdownTimeout == 0) { + this.scheduler.shutdown(); + } + else { + try { + logger.info("Start graceful shutdown for KCL..."); + this.scheduler.startGracefulShutdown().get(this.gracefulShutdownTimeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException | ExecutionException | TimeoutException ex) { + throw new RuntimeException("Graceful shutdown for KCL has failed.", ex); + } + } } @Override public void destroy() { super.destroy(); if (isRunning()) { - this.scheduler.shutdown(); + stop(); } } diff --git a/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java index 8571300..35198ae 100644 --- a/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java @@ -194,6 +194,7 @@ public KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter() { adapter.setBindSourceRecord(true); adapter.setEmptyRecordList(true); adapter.setPollingMaxRecords(99); + adapter.setGracefulShutdownTimeout(100); return adapter; }