Skip to content

Commit

Permalink
fix: add config options to facilitate late shutdown of pubsub publish…
Browse files Browse the repository at this point in the history
…er ThreadPoolTaskScheduler (#2721)
  • Loading branch information
jayakumarc committed Mar 26, 2024
1 parent 75230b2 commit 159895f
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ public GcpPubSubAutoConfiguration(
public ThreadPoolTaskScheduler pubsubPublisherThreadPool() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(this.gcpPubSubProperties.getPublisher().getExecutorThreads());
scheduler.setAcceptTasksAfterContextClose(
this.gcpPubSubProperties.getPublisher().getExecutorAcceptTasksAfterContextClose());
scheduler.setWaitForTasksToCompleteOnShutdown(
this.gcpPubSubProperties.getPublisher().getExecutorWaitForTasksToCompleteOnShutdown());
scheduler.setAwaitTerminationMillis(
this.gcpPubSubProperties.getPublisher().getExecutorAwaitTerminationMillis());
scheduler.setThreadNamePrefix("gcp-pubsub-publisher");
scheduler.setDaemon(true);
return scheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,40 @@ public static class Publisher {
/** Number of threads used by every publisher. */
private int executorThreads = 4;

/**
* Default {@code false}. Passed on to the underlying `ThreadPoolTaskScheduler` property
* `acceptTasksAfterContextClose`. With this set to {@code true}, makes the
* `ThreadPoolTaskScheduler` to accept further tasks after the `ContextClosedEvent`, with the
* expense of a longer shutdown phase. The scheduler will not go through a coordinated lifecycle
* stop phase but rather only stop the remaining tasks(with a hard interrupt) on its own
* shutdown. The interrupt on the blocked threads before the JVM shuts down letting them close
* in an orderly fashion.
*/
private Boolean executorAcceptTasksAfterContextClose = false;

/**
* Default {@code false}. Passed on to the underlying `ThreadPoolTaskScheduler` property
* `waitForTasksToCompleteOnShutdown`. With this set to {@code true},(the common
* pre-spring-6.1.x behaviour) makes the `ThreadPoolTaskScheduler` to wait for scheduled tasks
* to complete on shutdown, not interrupting running tasks and executing all tasks in the queue,
* with the expense of a longer shutdown phase. The scheduler will not go through a coordinated
* lifecycle stop phase but rather only stop the tasks and wait for task completion on its own
* shutdown. This will not interrupt the running tasks, letting the JVM end and hard-stopping
* any remaining threads.
*/
private Boolean executorWaitForTasksToCompleteOnShutdown = false;

/**
* Default 0. Passed on to the underlying `ThreadPoolTaskScheduler` property
* `awaitTerminationMillis`. This property sets the maximum number of milliseconds that the
* `ThreadPoolTaskScheduler`is supposed to block on shutdown in order to wait for remaining
* tasks to complete their execution before the rest of the container continues to shut down.
* This is particularly useful if the remaining tasks are likely to need access to other
* resources that are also managed by the container. With this property, scheduler will wait for
* the given time (max) for the termination of tasks.
*/
private Long executorAwaitTerminationMillis = 0L;

/** Retry properties. */
private final Retry retry = new Retry();

Expand Down Expand Up @@ -366,6 +400,32 @@ public void setExecutorThreads(int executorThreads) {
this.executorThreads = executorThreads;
}

public Boolean getExecutorAcceptTasksAfterContextClose() {
return this.executorAcceptTasksAfterContextClose;
}

public void setExecutorAcceptTasksAfterContextClose(
Boolean executorAcceptTasksAfterContextClose) {
this.executorAcceptTasksAfterContextClose = executorAcceptTasksAfterContextClose;
}

public Boolean getExecutorWaitForTasksToCompleteOnShutdown() {
return this.executorWaitForTasksToCompleteOnShutdown;
}

public void setExecutorWaitForTasksToCompleteOnShutdown(
Boolean executorWaitForTasksToCompleteOnShutdown) {
this.executorWaitForTasksToCompleteOnShutdown = executorWaitForTasksToCompleteOnShutdown;
}

public long getExecutorAwaitTerminationMillis() {
return this.executorAwaitTerminationMillis;
}

public void setExecutorAwaitTerminationMillis(long executorAwaitTerminationMillis) {
this.executorAwaitTerminationMillis = executorAwaitTerminationMillis;
}

public Boolean getEnableMessageOrdering() {
return enableMessageOrdering;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,9 @@ void testDefaultPublisherProperties() {
pubSubConfiguration.initialize("projectId");

assertThat(publisher.getExecutorThreads()).isEqualTo(4);
assertThat(publisher.getExecutorAcceptTasksAfterContextClose()).isFalse();
assertThat(publisher.getExecutorWaitForTasksToCompleteOnShutdown()).isFalse();
assertThat(publisher.getExecutorAwaitTerminationMillis()).isEqualTo(0L);
assertThat(publisher.getEnableMessageOrdering()).isNull();
assertThat(publisher.getEndpoint()).isNull();
assertThat(batching.getElementCountThreshold()).isNull();
Expand All @@ -565,12 +568,18 @@ void testDefaultPublisherProperties() {
@Test
void testPublisherProperties() {
publisher.setExecutorThreads(5);
publisher.setExecutorAcceptTasksAfterContextClose(true);
publisher.setExecutorWaitForTasksToCompleteOnShutdown(true);
publisher.setExecutorAwaitTerminationMillis(30000);
publisher.setEnableMessageOrdering(true);
publisher.setEndpoint("fake-endpoint");

pubSubConfiguration.initialize("projectId");

assertThat(publisher.getExecutorThreads()).isEqualTo(5);
assertThat(publisher.getExecutorAcceptTasksAfterContextClose()).isTrue();
assertThat(publisher.getExecutorWaitForTasksToCompleteOnShutdown()).isTrue();
assertThat(publisher.getExecutorAwaitTerminationMillis()).isEqualTo(30000L);
assertThat(publisher.getEnableMessageOrdering()).isTrue();
assertThat(publisher.getEndpoint()).isEqualTo("fake-endpoint");
}
Expand Down

0 comments on commit 159895f

Please sign in to comment.