Skip to content

Commit

Permalink
PostgresChannelMessageTableSubscriber: Renew connection only if invalid
Browse files Browse the repository at this point in the history
Fixes: #9111

An evolution of the #9061: renew the connection only when we need to.

**Auto-cherry-pick to `6.2.x` & `6.1.x`**
  • Loading branch information
joshiste authored and artembilan committed May 21, 2024
1 parent 899598a commit da29e2d
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,10 @@ private void doStart(CountDownLatch startingLatch) {
if (!isActive()) {
return;
}
if (notifications == null || notifications.length == 0) {
if ((notifications == null || notifications.length == 0) && !conn.isValid(1)) {
//We did not receive any notifications within the timeout period.
//We will close the connection and re-establish it.
//If the connection is still valid, we will continue polling
//Otherwise, we will close the connection and re-establish it.
break;
}
for (PGNotification notification : notifications) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import javax.sql.DataSource;

Expand Down Expand Up @@ -268,7 +270,18 @@ public void testRenewConnection() throws Exception {
CountDownLatch latch = new CountDownLatch(2);
List<Object> payloads = new ArrayList<>();
CountDownLatch connectionLatch = new CountDownLatch(2);
connectionSupplier.onGetConnection = connectionLatch::countDown;
AtomicBoolean connectionCloseState = new AtomicBoolean();
connectionSupplier.onGetConnection = conn -> {
connectionLatch.countDown();
if (connectionCloseState.compareAndSet(false, true)) {
try {
conn.close();
}
catch (Exception e) {
//nop
}
}
};
postgresChannelMessageTableSubscriber.start();
postgresSubscribableChannel.subscribe(message -> {
payloads.add(message.getPayload());
Expand Down Expand Up @@ -324,7 +337,7 @@ public JdbcChannelMessageStore jdbcChannelMessageStore(DataSource dataSource) {

private static class ConnectionSupplier implements PgConnectionSupplier {

Runnable onGetConnection;
Consumer<PgConnection> onGetConnection;

@Override
public PgConnection get() throws SQLException {
Expand All @@ -333,10 +346,11 @@ public PgConnection get() throws SQLException {
POSTGRES_CONTAINER.getPassword())
.unwrap(PgConnection.class);
if (this.onGetConnection != null) {
this.onGetConnection.run();
this.onGetConnection.accept(conn);
}
return conn;
}

}

}

0 comments on commit da29e2d

Please sign in to comment.