diff --git a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java index 4274c947b8..1d58496a64 100644 --- a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java +++ b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java @@ -157,6 +157,12 @@ public GcpPubSubAutoConfiguration( public ThreadPoolTaskScheduler pubsubPublisherThreadPool() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(this.gcpPubSubProperties.getPublisher().getExecutorThreads()); + scheduler.setAcceptTasksAfterContextClose( + this.gcpPubSubProperties.getPublisher().getExecutorAcceptTasksAfterContextClose()); + scheduler.setWaitForTasksToCompleteOnShutdown( + this.gcpPubSubProperties.getPublisher().getExecutorWaitForTasksToCompleteOnShutdown()); + scheduler.setAwaitTerminationMillis( + this.gcpPubSubProperties.getPublisher().getExecutorAwaitTerminationMillis()); scheduler.setThreadNamePrefix("gcp-pubsub-publisher"); scheduler.setDaemon(true); return scheduler; diff --git a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/core/PubSubConfiguration.java b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/core/PubSubConfiguration.java index 41e3f9375e..6934eced90 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/core/PubSubConfiguration.java +++ b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/core/PubSubConfiguration.java @@ -338,6 +338,40 @@ public static class Publisher { /** Number of threads used by every publisher. */ private int executorThreads = 4; + /** + * Default {@code false}. Passed on to the underlying `ThreadPoolTaskScheduler` property + * `acceptTasksAfterContextClose`. With this set to {@code true}, makes the + * `ThreadPoolTaskScheduler` to accept further tasks after the `ContextClosedEvent`, with the + * expense of a longer shutdown phase. The scheduler will not go through a coordinated lifecycle + * stop phase but rather only stop the remaining tasks(with a hard interrupt) on its own + * shutdown. The interrupt on the blocked threads before the JVM shuts down letting them close + * in an orderly fashion. + */ + private Boolean executorAcceptTasksAfterContextClose = false; + + /** + * Default {@code false}. Passed on to the underlying `ThreadPoolTaskScheduler` property + * `waitForTasksToCompleteOnShutdown`. With this set to {@code true},(the common + * pre-spring-6.1.x behaviour) makes the `ThreadPoolTaskScheduler` to wait for scheduled tasks + * to complete on shutdown, not interrupting running tasks and executing all tasks in the queue, + * with the expense of a longer shutdown phase. The scheduler will not go through a coordinated + * lifecycle stop phase but rather only stop the tasks and wait for task completion on its own + * shutdown. This will not interrupt the running tasks, letting the JVM end and hard-stopping + * any remaining threads. + */ + private Boolean executorWaitForTasksToCompleteOnShutdown = false; + + /** + * Default 0. Passed on to the underlying `ThreadPoolTaskScheduler` property + * `awaitTerminationMillis`. This property sets the maximum number of milliseconds that the + * `ThreadPoolTaskScheduler`is supposed to block on shutdown in order to wait for remaining + * tasks to complete their execution before the rest of the container continues to shut down. + * This is particularly useful if the remaining tasks are likely to need access to other + * resources that are also managed by the container. With this property, scheduler will wait for + * the given time (max) for the termination of tasks. + */ + private Long executorAwaitTerminationMillis = 0L; + /** Retry properties. */ private final Retry retry = new Retry(); @@ -366,6 +400,32 @@ public void setExecutorThreads(int executorThreads) { this.executorThreads = executorThreads; } + public Boolean getExecutorAcceptTasksAfterContextClose() { + return this.executorAcceptTasksAfterContextClose; + } + + public void setExecutorAcceptTasksAfterContextClose( + Boolean executorAcceptTasksAfterContextClose) { + this.executorAcceptTasksAfterContextClose = executorAcceptTasksAfterContextClose; + } + + public Boolean getExecutorWaitForTasksToCompleteOnShutdown() { + return this.executorWaitForTasksToCompleteOnShutdown; + } + + public void setExecutorWaitForTasksToCompleteOnShutdown( + Boolean executorWaitForTasksToCompleteOnShutdown) { + this.executorWaitForTasksToCompleteOnShutdown = executorWaitForTasksToCompleteOnShutdown; + } + + public long getExecutorAwaitTerminationMillis() { + return this.executorAwaitTerminationMillis; + } + + public void setExecutorAwaitTerminationMillis(long executorAwaitTerminationMillis) { + this.executorAwaitTerminationMillis = executorAwaitTerminationMillis; + } + public Boolean getEnableMessageOrdering() { return enableMessageOrdering; } diff --git a/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/core/PubSubConfigurationTests.java b/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/core/PubSubConfigurationTests.java index 15e0249bfc..0caffb8598 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/core/PubSubConfigurationTests.java +++ b/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/core/PubSubConfigurationTests.java @@ -543,6 +543,9 @@ void testDefaultPublisherProperties() { pubSubConfiguration.initialize("projectId"); assertThat(publisher.getExecutorThreads()).isEqualTo(4); + assertThat(publisher.getExecutorAcceptTasksAfterContextClose()).isFalse(); + assertThat(publisher.getExecutorWaitForTasksToCompleteOnShutdown()).isFalse(); + assertThat(publisher.getExecutorAwaitTerminationMillis()).isEqualTo(0L); assertThat(publisher.getEnableMessageOrdering()).isNull(); assertThat(publisher.getEndpoint()).isNull(); assertThat(batching.getElementCountThreshold()).isNull(); @@ -565,12 +568,18 @@ void testDefaultPublisherProperties() { @Test void testPublisherProperties() { publisher.setExecutorThreads(5); + publisher.setExecutorAcceptTasksAfterContextClose(true); + publisher.setExecutorWaitForTasksToCompleteOnShutdown(true); + publisher.setExecutorAwaitTerminationMillis(30000); publisher.setEnableMessageOrdering(true); publisher.setEndpoint("fake-endpoint"); pubSubConfiguration.initialize("projectId"); assertThat(publisher.getExecutorThreads()).isEqualTo(5); + assertThat(publisher.getExecutorAcceptTasksAfterContextClose()).isTrue(); + assertThat(publisher.getExecutorWaitForTasksToCompleteOnShutdown()).isTrue(); + assertThat(publisher.getExecutorAwaitTerminationMillis()).isEqualTo(30000L); assertThat(publisher.getEnableMessageOrdering()).isTrue(); assertThat(publisher.getEndpoint()).isEqualTo("fake-endpoint"); }