Skip to content

Commit

Permalink
fix: re-new connection in PostgresChannelMessageTableSubscriber
Browse files Browse the repository at this point in the history
PostgresChannelMessageTableSubscriber never renews the connection.
This causes problems on DB failover. With this change the connection is
renewed when notifications are not received for a certain time.

fixes #9061
  • Loading branch information
joshiste committed Apr 1, 2024
1 parent 627cde2 commit 4b3e74c
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -62,6 +63,7 @@
* @author Artem Bilan
* @author Igor Lovich
* @author Christian Tzolov
* @author Johannes Edmeier
*
* @since 6.0
*/
Expand All @@ -86,6 +88,8 @@ public final class PostgresChannelMessageTableSubscriber implements SmartLifecyc
@Nullable
private volatile PgConnection connection;

private Duration notificationTimeout = Duration.ofSeconds(60);

/**
* Create a new subscriber using the {@link JdbcChannelMessageStore#DEFAULT_TABLE_PREFIX}.
* @param connectionSupplier The connection supplier for the targeted Postgres database.
Expand Down Expand Up @@ -116,6 +120,19 @@ public void setTaskExecutor(AsyncTaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}

/**
* Set the timeout for the notification polling.
* If for the specified duration no notificiation are received the underlying connection is closed and re-established.
* Setting a value of {@code Duration.ZERO} will disable the timeout and wait forever.
* This might cause problems in DB failover scenarios.
* @param notificationTimeout the timeout for the notification polling.
* @since 6.1.8
*/
public void setNotificationTimeout(Duration notificationTimeout) {
Assert.notNull(notificationTimeout, "'notificationTimeout' must not be null.");
this.notificationTimeout = notificationTimeout;
}

/**
* Add a new subscription to this subscriber.
* @param subscription The subscription to register.
Expand Down Expand Up @@ -193,24 +210,28 @@ private void doStart(CountDownLatch startingLatch) {
while (isActive()) {
startingLatch.countDown();

PGNotification[] notifications = conn.getNotifications(0);
PGNotification[] notifications = conn.getNotifications((int) this.notificationTimeout.toMillis());
// Unfortunately, there is no good way of interrupting a notification
// poll but by closing its connection.
if (!isActive()) {
return;
}
if (notifications != null) {
for (PGNotification notification : notifications) {
String parameter = notification.getParameter();
Set<Subscription> subscriptions = this.subscriptionsMap.get(parameter);
if (subscriptions == null) {
continue;
}
for (Subscription subscription : subscriptions) {
subscription.notifyUpdate();
}
if (notifications == null || notifications.length == 0) {
//We did not receive any notifications within the timeout period.
//We will close the connection and re-establish it.
break;
}
for (PGNotification notification : notifications) {
String parameter = notification.getParameter();
Set<Subscription> subscriptions = this.subscriptionsMap.get(parameter);
if (subscriptions == null) {
continue;
}
for (Subscription subscription : subscriptions) {
subscription.notifyUpdate();
}
}

}
}
finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.springframework.integration.jdbc.channel;

import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -62,6 +64,7 @@
* @author Artem Bilan
* @author Igor Lovich
* @author Adama Sorho
* @author Johannes Edmeier
*
* @since 6.0
*/
Expand Down Expand Up @@ -102,15 +105,14 @@ CREATE FUNCTION INT_CHANNEL_MESSAGE_NOTIFY_FCT()

private String groupId;

private ConnectionSupplier connectionSupplier;

@BeforeEach
void setUp(TestInfo testInfo) {
// Not initiated as a bean to allow for registrations prior and post the life cycle
this.postgresChannelMessageTableSubscriber =
new PostgresChannelMessageTableSubscriber(() ->
DriverManager.getConnection(POSTGRES_CONTAINER.getJdbcUrl(),
POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword())
.unwrap(PgConnection.class));
this.connectionSupplier = new ConnectionSupplier();
this.postgresChannelMessageTableSubscriber = new PostgresChannelMessageTableSubscriber(connectionSupplier);
this.postgresChannelMessageTableSubscriber.setNotificationTimeout(Duration.ofSeconds(5));

this.taskExecutor = new ThreadPoolTaskExecutor();
this.taskExecutor.setCorePoolSize(10);
Expand Down Expand Up @@ -261,6 +263,26 @@ void testRetryOnErrorDuringDispatch(boolean transactionsEnabled) throws Interrup
assertThat(payloads).containsExactly("1");
}

@Test
public void testRenewConnection() throws Exception {
CountDownLatch latch = new CountDownLatch(2);
List<Object> payloads = new ArrayList<>();
CountDownLatch connectionLatch = new CountDownLatch(2);
connectionSupplier.onGetConnection = connectionLatch::countDown;
postgresChannelMessageTableSubscriber.start();
postgresSubscribableChannel.subscribe(message -> {
payloads.add(message.getPayload());
latch.countDown();
});

assertThat(connectionLatch.await(10, TimeUnit.SECONDS)).isTrue();

messageStore.addMessageToGroup(groupId, new GenericMessage<>("1"));
messageStore.addMessageToGroup(groupId, new GenericMessage<>("2"));
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(payloads).containsExactly("1", "2");
}

@Configuration
@EnableIntegration
public static class Config {
Expand Down Expand Up @@ -300,4 +322,21 @@ public JdbcChannelMessageStore jdbcChannelMessageStore(DataSource dataSource) {

}

private static class ConnectionSupplier implements PgConnectionSupplier {

Runnable onGetConnection;

@Override
public PgConnection get() throws SQLException {
var conn = DriverManager.getConnection(POSTGRES_CONTAINER.getJdbcUrl(),
POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword())
.unwrap(PgConnection.class);
if (this.onGetConnection != null) {
this.onGetConnection.run();
}
return conn;
}

}
}

0 comments on commit 4b3e74c

Please sign in to comment.