Skip to content

Commit

Permalink
fix: add config options to facilitate late shutdown of pubsub publish…
Browse files Browse the repository at this point in the history
…er ThreadPoolTaskScheduler (#2721) (#2738)

There has been a significant revision of `ThreadPoolTaskScheduler/Executor` lifecycle capabilities as part of the spring-6.1.x release. It includes a concurrently managed stop phase for `ThreadPoolTaskScheduler/Executor`, favouring early soft shutdown. As a result, the `ThreadPoolTaskScheduler`, which is used for pubsub publishing, now shuts down immediately on `ContextClosedEvent`, thereby rejecting any further task submissions (#2721).

This PR aims to retain the default behavior of spring-6.1.x but provides config options to leverage the `lateShutdown` of the underlying `ThreadPoolTaskScheduler`, such as:
`spring.cloud.gcp.pubsub.publisher.executor-accept-tasks-after-context-close=true`.


References:
1. spring-projects/spring-framework#32109 (comment)
2. spring-projects/spring-framework@b12115b
3. spring-projects/spring-framework@a2000db
4. spring-projects/spring-framework#31019 (comment)
5. https://github.com/spring-projects/spring-framework/blob/996e66abdbaad866f0eab40bcf5628cdea92e046/spring-context/src/main/java/org/springframework/scheduling/concurrent/ExecutorConfigurationSupport.java#L482

Fixes #2721.
  • Loading branch information
jayakumarc authored Mar 28, 2024
1 parent 7dfb4de commit 13e4911
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ public GcpPubSubAutoConfiguration(
public ThreadPoolTaskScheduler pubsubPublisherThreadPool() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(this.gcpPubSubProperties.getPublisher().getExecutorThreads());
scheduler.setAcceptTasksAfterContextClose(
this.gcpPubSubProperties.getPublisher().getExecutorAcceptTasksAfterContextClose());
scheduler.setWaitForTasksToCompleteOnShutdown(
this.gcpPubSubProperties.getPublisher().getExecutorWaitForTasksToCompleteOnShutdown());
scheduler.setAwaitTerminationMillis(
this.gcpPubSubProperties.getPublisher().getExecutorAwaitTerminationMillis());
scheduler.setThreadNamePrefix("gcp-pubsub-publisher");
scheduler.setDaemon(true);
return scheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,40 @@ public static class Publisher {
/** Number of threads used by every publisher. */
private int executorThreads = 4;

/**
* Default {@code false}. Passed on to the underlying `ThreadPoolTaskScheduler` property
* `acceptTasksAfterContextClose`. With this set to {@code true}, makes the
* `ThreadPoolTaskScheduler` to accept further tasks after the `ContextClosedEvent`, with the
* expense of a longer shutdown phase. The scheduler will not go through a coordinated lifecycle
* stop phase but rather only stop the remaining tasks(with a hard interrupt) on its own
* shutdown. The interrupt on the blocked threads before the JVM shuts down letting them close
* in an orderly fashion.
*/
private Boolean executorAcceptTasksAfterContextClose = false;

/**
* Default {@code false}. Passed on to the underlying `ThreadPoolTaskScheduler` property
* `waitForTasksToCompleteOnShutdown`. With this set to {@code true},(the common
* pre-spring-6.1.x behaviour) makes the `ThreadPoolTaskScheduler` to wait for scheduled tasks
* to complete on shutdown, not interrupting running tasks and executing all tasks in the queue,
* with the expense of a longer shutdown phase. The scheduler will not go through a coordinated
* lifecycle stop phase but rather only stop the tasks and wait for task completion on its own
* shutdown. This will not interrupt the running tasks, letting the JVM end and hard-stopping
* any remaining threads.
*/
private Boolean executorWaitForTasksToCompleteOnShutdown = false;

/**
* Default 0. Passed on to the underlying `ThreadPoolTaskScheduler` property
* `awaitTerminationMillis`. This property sets the maximum number of milliseconds that the
* `ThreadPoolTaskScheduler`is supposed to block on shutdown in order to wait for remaining
* tasks to complete their execution before the rest of the container continues to shut down.
* This is particularly useful if the remaining tasks are likely to need access to other
* resources that are also managed by the container. With this property, scheduler will wait for
* the given time (max) for the termination of tasks.
*/
private Long executorAwaitTerminationMillis = 0L;

/** Retry properties. */
private final Retry retry = new Retry();

Expand Down Expand Up @@ -366,6 +400,32 @@ public void setExecutorThreads(int executorThreads) {
this.executorThreads = executorThreads;
}

public Boolean getExecutorAcceptTasksAfterContextClose() {
return this.executorAcceptTasksAfterContextClose;
}

public void setExecutorAcceptTasksAfterContextClose(
Boolean executorAcceptTasksAfterContextClose) {
this.executorAcceptTasksAfterContextClose = executorAcceptTasksAfterContextClose;
}

public Boolean getExecutorWaitForTasksToCompleteOnShutdown() {
return this.executorWaitForTasksToCompleteOnShutdown;
}

public void setExecutorWaitForTasksToCompleteOnShutdown(
Boolean executorWaitForTasksToCompleteOnShutdown) {
this.executorWaitForTasksToCompleteOnShutdown = executorWaitForTasksToCompleteOnShutdown;
}

public long getExecutorAwaitTerminationMillis() {
return this.executorAwaitTerminationMillis;
}

public void setExecutorAwaitTerminationMillis(long executorAwaitTerminationMillis) {
this.executorAwaitTerminationMillis = executorAwaitTerminationMillis;
}

public Boolean getEnableMessageOrdering() {
return enableMessageOrdering;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,9 @@ void testDefaultPublisherProperties() {
pubSubConfiguration.initialize("projectId");

assertThat(publisher.getExecutorThreads()).isEqualTo(4);
assertThat(publisher.getExecutorAcceptTasksAfterContextClose()).isFalse();
assertThat(publisher.getExecutorWaitForTasksToCompleteOnShutdown()).isFalse();
assertThat(publisher.getExecutorAwaitTerminationMillis()).isEqualTo(0L);
assertThat(publisher.getEnableMessageOrdering()).isNull();
assertThat(publisher.getEndpoint()).isNull();
assertThat(batching.getElementCountThreshold()).isNull();
Expand All @@ -565,12 +568,18 @@ void testDefaultPublisherProperties() {
@Test
void testPublisherProperties() {
publisher.setExecutorThreads(5);
publisher.setExecutorAcceptTasksAfterContextClose(true);
publisher.setExecutorWaitForTasksToCompleteOnShutdown(true);
publisher.setExecutorAwaitTerminationMillis(30000);
publisher.setEnableMessageOrdering(true);
publisher.setEndpoint("fake-endpoint");

pubSubConfiguration.initialize("projectId");

assertThat(publisher.getExecutorThreads()).isEqualTo(5);
assertThat(publisher.getExecutorAcceptTasksAfterContextClose()).isTrue();
assertThat(publisher.getExecutorWaitForTasksToCompleteOnShutdown()).isTrue();
assertThat(publisher.getExecutorAwaitTerminationMillis()).isEqualTo(30000L);
assertThat(publisher.getEnableMessageOrdering()).isTrue();
assertThat(publisher.getEndpoint()).isEqualTo("fake-endpoint");
}
Expand Down

0 comments on commit 13e4911

Please sign in to comment.