Skip to content

Commit

Permalink
fix: re-new connection in PostgresChannelMessageTableSubscriber
Browse files Browse the repository at this point in the history
PostgresChannelMessageTableSubscriber currently never renews the connection.
This causes problems on DB failover. With this change the connection is
renewed when notifications are not received for a certain time.

fixes #9061
  • Loading branch information
joshiste committed Apr 1, 2024
1 parent 627cde2 commit 3221eeb
Showing 1 changed file with 31 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -86,6 +87,8 @@ public final class PostgresChannelMessageTableSubscriber implements SmartLifecyc
@Nullable
private volatile PgConnection connection;

private Duration notificationTimeout = Duration.ofSeconds(60);

/**
* Create a new subscriber using the {@link JdbcChannelMessageStore#DEFAULT_TABLE_PREFIX}.
* @param connectionSupplier The connection supplier for the targeted Postgres database.
Expand Down Expand Up @@ -193,24 +196,28 @@ private void doStart(CountDownLatch startingLatch) {
while (isActive()) {
startingLatch.countDown();

PGNotification[] notifications = conn.getNotifications(0);
PGNotification[] notifications = conn.getNotifications((int) this.notificationTimeout.toMillis());
// Unfortunately, there is no good way of interrupting a notification
// poll but by closing its connection.
if (!isActive()) {
return;
}
if (notifications != null) {
for (PGNotification notification : notifications) {
String parameter = notification.getParameter();
Set<Subscription> subscriptions = this.subscriptionsMap.get(parameter);
if (subscriptions == null) {
continue;
}
for (Subscription subscription : subscriptions) {
subscription.notifyUpdate();
}
if (notifications == null || notifications.length == 0) {
//We did not receive any notifications within the timeout period.
//We will close the connection and re-establish it.
break;
}
for (PGNotification notification : notifications) {
String parameter = notification.getParameter();
Set<Subscription> subscriptions = this.subscriptionsMap.get(parameter);
if (subscriptions == null) {
continue;
}
for (Subscription subscription : subscriptions) {
subscription.notifyUpdate();
}
}

}
}
finally {
Expand Down Expand Up @@ -278,6 +285,19 @@ private static String getKey(Object input) {
return input == null ? null : UUIDConverter.getUUID(input).toString();
}

/**
* Set the timeout for the notification polling.
* If for the specified duration no notificiation are received the underlying connection is closed and re-established.
* Setting a value of {@code Duration.ZERO} will disable the timeout and wait forever.
* This might cause problems in DB failover scenarios.
*
* @param notificationTimeout the timeout for the notification polling.
*/
public void setNotificationTimeout(Duration notificationTimeout) {
Assert.notNull(notificationTimeout, "'notificationTimeout' must not be null.");
this.notificationTimeout = notificationTimeout;
}

/**
* A subscription to a {@link PostgresChannelMessageTableSubscriber} for
* receiving push notifications for new messages that are added to
Expand Down

0 comments on commit 3221eeb

Please sign in to comment.