Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection leaks (not released) when concurrent query is canceled #198

Open
agorbachenko opened this issue Sep 7, 2023 · 20 comments
Open
Labels
status: waiting-for-triage An issue we've not yet triaged

Comments

@agorbachenko
Copy link

Bug Report

Versions

  • Driver: 1.0.2.RELEASE
  • Database: PostgreSQL 13.12
  • Java: 17
  • OS: MacOS, Linux

Current Behavior

When you have some database query zipped with other action failing for some reason, your query is canceled, database connection is released, BUT it seems that it also affects another concurrently running query leading to its connection not being released.

See the reproducer project for the details: https://github.com/agorbachenko/r2dbc-connection-leak-demo

Output sample:

2023-09-07T10:15:44.303+03:00  INFO 34234 --- [           main] c.e.r.R2dbcConnectionLeakDemoApplication : Started R2dbcConnectionLeakDemoApplication in 3.649 seconds (process running for 4.734)
2023-09-07T10:15:44.560+03:00 DEBUG 34234 --- [actor-tcp-nio-1] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2023-09-07T10:15:44.564+03:00 DEBUG 34234 --- [actor-tcp-nio-1] io.r2dbc.pool.PooledConnection           : Releasing connection
2023-09-07T10:15:45.326+03:00 DEBUG 34234 --- [     parallel-2] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2023-09-07T10:15:45.331+03:00 DEBUG 34234 --- [     parallel-2] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT demo.* FROM demo]
2023-09-07T10:15:45.345+03:00  INFO 34234 --- [     parallel-5] c.example.r2dbc.R2dbcConnectionLeakDemo  : Canceled
2023-09-07T10:15:45.346+03:00 DEBUG 34234 --- [     parallel-5] io.r2dbc.pool.PooledConnection           : Releasing connection
2023-09-07T10:15:45.347+03:00 DEBUG 34234 --- [     parallel-5] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2023-09-07T10:15:45.348+03:00 DEBUG 34234 --- [     parallel-5] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT demo.* FROM demo WHERE demo.id = $1 LIMIT 2]
2023-09-07T10:15:45.364+03:00 DEBUG 34234 --- [actor-tcp-nio-1] io.r2dbc.pool.PooledConnection           : Releasing connection
2023-09-07T10:15:46.326+03:00  INFO 34234 --- [     parallel-3] c.example.r2dbc.R2dbcConnectionLeakDemo  : r2dbc.pool.acquired: 0.0
2023-09-07T10:15:46.327+03:00  INFO 34234 --- [     parallel-3] c.example.r2dbc.R2dbcConnectionLeakDemo  : r2dbc.pool.pending: 0.0
2023-09-07T10:15:46.327+03:00  INFO 34234 --- [     parallel-3] c.example.r2dbc.R2dbcConnectionLeakDemo  : r2dbc.pool.idle: 1.0
2023-09-07T10:15:46.365+03:00 DEBUG 34234 --- [     parallel-6] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2023-09-07T10:15:46.366+03:00 DEBUG 34234 --- [     parallel-6] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT demo.* FROM demo]
2023-09-07T10:15:46.378+03:00  INFO 34234 --- [     parallel-8] c.example.r2dbc.R2dbcConnectionLeakDemo  : Canceled
2023-09-07T10:15:46.378+03:00 DEBUG 34234 --- [     parallel-8] io.r2dbc.pool.PooledConnection           : Releasing connection
2023-09-07T10:15:46.379+03:00 DEBUG 34234 --- [     parallel-8] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2023-09-07T10:15:46.379+03:00 DEBUG 34234 --- [     parallel-8] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT demo.* FROM demo WHERE demo.id = $1 LIMIT 2]
2023-09-07T10:15:47.393+03:00  INFO 34234 --- [     parallel-2] c.example.r2dbc.R2dbcConnectionLeakDemo  : Canceled
2023-09-07T10:15:48.322+03:00  INFO 34234 --- [     parallel-3] c.example.r2dbc.R2dbcConnectionLeakDemo  : r2dbc.pool.acquired: 1.0
2023-09-07T10:15:48.322+03:00  INFO 34234 --- [     parallel-3] c.example.r2dbc.R2dbcConnectionLeakDemo  : r2dbc.pool.pending: 0.0
2023-09-07T10:15:48.322+03:00  INFO 34234 --- [     parallel-3] c.example.r2dbc.R2dbcConnectionLeakDemo  : r2dbc.pool.idle: 0.0
2023-09-07T10:15:48.413+03:00  INFO 34234 --- [     parallel-4] c.example.r2dbc.R2dbcConnectionLeakDemo  : Canceled
2023-09-07T10:15:49.427+03:00  INFO 34234 --- [     parallel-6] c.example.r2dbc.R2dbcConnectionLeakDemo  : Canceled
2023-09-07T10:15:50.323+03:00  INFO 34234 --- [     parallel-3] c.example.r2dbc.R2dbcConnectionLeakDemo  : r2dbc.pool.acquired: 1.0
2023-09-07T10:15:50.323+03:00  INFO 34234 --- [     parallel-3] c.example.r2dbc.R2dbcConnectionLeakDemo  : r2dbc.pool.pending: 0.0
2023-09-07T10:15:50.323+03:00  INFO 34234 --- [     parallel-3] c.example.r2dbc.R2dbcConnectionLeakDemo  : r2dbc.pool.idle: 0.0
2023-09-07T10:15:50.446+03:00  INFO 34234 --- [     parallel-8] c.example.r2dbc.R2dbcConnectionLeakDemo  : Canceled
2023-09-07T10:15:51.464+03:00  INFO 34234 --- [     parallel-2] c.example.r2dbc.R2dbcConnectionLeakDemo  : Canceled

This connection was not released:

2023-09-07T10:15:46.379+03:00 DEBUG 34234 --- [     parallel-8] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2023-09-07T10:15:46.379+03:00 DEBUG 34234 --- [     parallel-8] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT demo.* FROM demo WHERE demo.id = $1 LIMIT 2]

pg_stat_activity shows that the connection has not changed its state since then

Stack trace
// your stack trace here

Table schema

Input Code
-- your SQL here;

Steps to reproduce

Input Code
// your code here;

Expected behavior/code

Possible Solution

Additional context

@agorbachenko agorbachenko changed the title Connection not released when ANOTHER query is cancelled Connection leaks (not released) when the concurrent query is canceled Nov 18, 2023
@agorbachenko agorbachenko changed the title Connection leaks (not released) when the concurrent query is canceled Connection leaks (not released) when concurrent query is canceled Nov 18, 2023
@jivkodobrev
Copy link

jivkodobrev commented Jan 24, 2024

@agorbachenko Hi Alexei - I ran your repro application but was not able to reproduce the issue. It ran for about 10 minutes on my machine locally. Built with Java 18 via gradle bootRun
Is there something specific that needs to be done to reproduce?

We have an issue which seems similar (#206) and I was hoping to observe the repro to understand the problem.
Thanks in advance!

UPDATE: I actually was able to reproduce the issue, it ran for about 25 minutes and ran into the problem. I will investigate and will keep you posted here.

cc @SimoneGiusso

@SimoneGiusso
Copy link

SimoneGiusso commented Feb 8, 2024

I actually was able to reproduce the issue, it ran for about 25 minutes and ran into the problem.

btw if you change the delaySubscription in the runSomethingBadPeriodically method to 100ms or even 10ms the issue will manifest after few seconds. I also propose to remove the using of the spring DefaultDatabaseClient just to make it lighter and to exclude issue coming from spring. I also used 1 thread:

package com.example.r2dbc;

import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.List;

import static io.r2dbc.pool.PoolingConnectionFactoryProvider.INITIAL_SIZE;
import static io.r2dbc.pool.PoolingConnectionFactoryProvider.MAX_SIZE;
import static io.r2dbc.spi.ConnectionFactoryOptions.*;

@Component
public class R2dbcConnectionLeakDemo {

    private final ConnectionFactory pooledConnectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
        .option(DRIVER, "pool")
        .option(PROTOCOL, "postgresql") // driver identifier, PROTOCOL is delegated as DRIVER by the pool.
        .option(HOST, "127.0.0.1")
        .option(PORT, 5432)
        .option(USER, "postgres")
        .option(PASSWORD, "demo")
        .option(DATABASE, "postgres")
        .option(MAX_SIZE, 1)
        .option(INITIAL_SIZE, 1)
        .build());

    @EventListener(ApplicationReadyEvent.class)
    private void runGoodQueryPeriodically() {
        Mono.usingWhen(pooledConnectionFactory.create(),
                connection -> Mono.from(connection.createStatement("SELECT demo.* FROM demo WHERE demo.id = 1 LIMIT 2")
                    .execute()),
                Connection::close)
            .delaySubscription(Duration.ofMillis(1000), Schedulers.single())
            .repeat()
            .subscribe();
    }

    @EventListener(ApplicationReadyEvent.class)
    private void runSomethingBadPeriodically() {
        getDemoEntitiesAsList()
            .zipWith(runSomethingFailing())
            .delaySubscription(Duration.ofMillis(10), Schedulers.single())
            // resume on error to repeat
            .onErrorResume(e -> Mono.empty())
            .repeat()
            .subscribe();
    }

    private Mono<List<Integer>> getDemoEntitiesAsList() {
        return Mono.usingWhen(pooledConnectionFactory.create(),
                connection -> Mono.from(connection.createStatement("SELECT demo.id FROM demo")
                    .execute()),
                Connection::close)
            .map(result -> result.map(readable -> readable.get(0, Integer.class)))
            .flatMapMany(Flux::from)
            .collectList();
    }

    private static Mono<?> runSomethingFailing() {
        return Mono.error(RuntimeException::new)
            .delaySubscription(Duration.ofMillis(10), Schedulers.single());
    }

}

Of course I confirm that the issue is still present:

...
2024-02-09T12:30:49.868+01:00 DEBUG 94597 --- [       single-1] io.r2dbc.pool.PooledConnection           : Releasing connection
2024-02-09T12:30:49.880+01:00 DEBUG 94597 --- [       single-1] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2024-02-09T12:30:49.891+01:00 DEBUG 94597 --- [       single-1] io.r2dbc.pool.PooledConnection           : Releasing connection
2024-02-09T12:30:49.904+01:00 DEBUG 94597 --- [       single-1] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2024-02-09T12:30:49.917+01:00 DEBUG 94597 --- [       single-1] io.r2dbc.pool.PooledConnection           : Releasing connection
2024-02-09T12:30:49.923+01:00 DEBUG 94597 --- [       single-1] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
// After a while it cannot obtain a connection anymore and it hangs here

@jivkodobrev
Copy link

Thanks! I haven't had the chance to investigate, hopefully I can get back to this in the beginning of March

@mp911de
Copy link
Member

mp911de commented Feb 12, 2024

You can add a bit more debugging logging via Mono.log(), ideally on the Mono producing usingWhen to detect whether a cancellation signal is being propagated into usingWhen.

@SimoneGiusso
Copy link

SimoneGiusso commented Feb 12, 2024

You can add a bit more debugging logging via Mono.log(), ideally on the Mono producing usingWhen to detect whether a cancellation signal is being propagated into usingWhen.

It seems be propagated?

I updated the code, also using different threads, it actually makes the log clear:

    package com.example.r2dbc;

import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.List;

import static io.r2dbc.pool.PoolingConnectionFactoryProvider.INITIAL_SIZE;
import static io.r2dbc.pool.PoolingConnectionFactoryProvider.MAX_SIZE;
import static io.r2dbc.spi.ConnectionFactoryOptions.*;

@Component
public class R2dbcConnectionLeakDemo {

    private final ConnectionFactory pooledConnectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
        .option(DRIVER, "pool")
        .option(PROTOCOL, "postgresql") // driver identifier, PROTOCOL is delegated as DRIVER by the pool.
        .option(HOST, "127.0.0.1")
        .option(PORT, 5432)
        .option(USER, "postgres")
        .option(PASSWORD, "demo")
        .option(DATABASE, "postgres")
        .option(MAX_SIZE, 1)
        .option(INITIAL_SIZE, 1)
        .build());

    @EventListener(ApplicationReadyEvent.class)
    private void runGoodQueryPeriodically() {
        Mono.usingWhen(pooledConnectionFactory.create(),
                connection -> Mono.from(connection.createStatement("SELECT demo.* FROM demo WHERE demo.id = 1 LIMIT 2")
                    .execute()),
                Connection::close)
            .log()
            .delaySubscription(Duration.ofMillis(1000), Schedulers.newSingle("GoodThread"))
            .repeat()
            .subscribe();
    }

    @EventListener(ApplicationReadyEvent.class)
    private void runSomethingBadPeriodically() {
        getDemoEntitiesAsList()
            .zipWith(runSomethingFailing())
            .delaySubscription(Duration.ofMillis(10), Schedulers.newSingle("LeakingThread"))
            // resume on error to repeat
            .onErrorResume(e -> Mono.empty())
            .repeat()
            .subscribe();
    }

    private Mono<List<Integer>> getDemoEntitiesAsList() {
        return Mono.usingWhen(pooledConnectionFactory.create(),
                connection -> Mono.from(connection.createStatement("SELECT demo.id FROM demo")
                    .execute()),
                Connection::close)
            .log()
            .map(result -> result.map(readable -> readable.get(0, Integer.class)))
            .flatMapMany(Flux::from)
            .collectList();
    }

    private static Mono<?> runSomethingFailing() {
        return Mono.error(RuntimeException::new)
            .delaySubscription(Duration.ofMillis(10), Schedulers.newSingle("BadThread"));
    }

}
2024-02-12T12:58:33.743+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T12:58:33.743+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T12:58:33.743+01:00 DEBUG 24946 --- [LeakingThread-1] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2024-02-12T12:58:33.755+01:00  INFO 24946 --- [    BadThread-4] reactor.Mono.UsingWhen.1                 : cancel()
2024-02-12T12:58:33.756+01:00 DEBUG 24946 --- [    BadThread-4] io.r2dbc.pool.PooledConnection           : Releasing connection
2024-02-12T12:58:33.768+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T12:58:33.768+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T12:58:33.769+01:00 DEBUG 24946 --- [LeakingThread-1] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2024-02-12T12:58:33.781+01:00  INFO 24946 --- [    BadThread-4] reactor.Mono.UsingWhen.1                 : cancel()
2024-02-12T12:58:33.782+01:00 DEBUG 24946 --- [    BadThread-4] io.r2dbc.pool.PooledConnection           : Releasing connection
2024-02-12T12:58:33.794+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T12:58:33.794+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T12:58:33.795+01:00 DEBUG 24946 --- [LeakingThread-1] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2024-02-12T12:58:33.807+01:00  INFO 24946 --- [   GoodThread-2] reactor.Mono.UsingWhen.2                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T12:58:33.807+01:00  INFO 24946 --- [   GoodThread-2] reactor.Mono.UsingWhen.2                 : request(unbounded)
2024-02-12T12:58:33.807+01:00  INFO 24946 --- [    BadThread-4] reactor.Mono.UsingWhen.1                 : cancel()
2024-02-12T12:58:33.807+01:00 DEBUG 24946 --- [    BadThread-4] io.r2dbc.pool.PooledConnection           : Releasing connection
2024-02-12T12:58:33.807+01:00 DEBUG 24946 --- [    BadThread-4] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2024-02-12T12:58:33.821+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T12:58:33.821+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T12:58:33.833+01:00  INFO 24946 --- [    BadThread-4] reactor.Mono.UsingWhen.1                 : cancel()
2024-02-12T12:58:33.845+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T12:58:33.845+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T12:58:33.858+01:00  INFO 24946 --- [    BadThread-4] reactor.Mono.UsingWhen.1                 : cancel()
2024-02-12T12:58:33.871+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T12:58:33.871+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T12:58:33.883+01:00  INFO 24946 --- [    BadThread-4] reactor.Mono.UsingWhen.1                 : cancel()
2024-02-12T12:58:33.896+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T12:58:33.896+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T12:58:33.907+01:00  INFO 24946 --- [    BadThread-4] reactor.Mono.UsingWhen.1                 : cancel()

It seems that the cancellation signal happening on the BadThread causes the release of the connection. However something happens when there is a concurrent query. I.e. When the concurrent GoodThread is going to acquire a connection. Meanwhile the BadThread is releasing and right after acquiring the connection. From this point everything is stuck. The GoodThread is probably waiting to get the connection (I don't have any others log coming from this thread).

The odd thing is that the LeakingThread + BadThread is continuously running (why), but apparently is not trying to get a connection from the pool, otherwise I'd not seeing it logging repeatedly:

2024-02-12T12:58:35.134+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T12:58:35.134+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T12:58:35.146+01:00  INFO 24946 --- [    BadThread-4] reactor.Mono.UsingWhen.1                 : cancel()

I changed this line:

.delaySubscription(Duration.ofMillis(100), Schedulers.newSingle("LeakingThread")) // delayed increased 10 times.

and the log slightly change:

2024-02-12T13:11:01.839+01:00  INFO 25171 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T13:11:01.840+01:00  INFO 25171 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T13:11:01.840+01:00 DEBUG 25171 --- [LeakingThread-1] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2024-02-12T13:11:01.851+01:00  INFO 25171 --- [    BadThread-4] reactor.Mono.UsingWhen.1                 : cancel()
2024-02-12T13:11:01.852+01:00 DEBUG 25171 --- [    BadThread-4] io.r2dbc.pool.PooledConnection           : Releasing connection
2024-02-12T13:11:01.923+01:00  INFO 25171 --- [   GoodThread-2] reactor.Mono.UsingWhen.2                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T13:11:01.924+01:00  INFO 25171 --- [   GoodThread-2] reactor.Mono.UsingWhen.2                 : request(unbounded)
2024-02-12T13:11:01.924+01:00 DEBUG 25171 --- [   GoodThread-2] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2024-02-12T13:11:01.957+01:00  INFO 25171 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T13:11:01.957+01:00  INFO 25171 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T13:11:01.970+01:00  INFO 25171 --- [    BadThread-4] reactor.Mono.UsingWhen.1                 : cancel()
2024-02-12T13:11:02.073+01:00  INFO 25171 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T13:11:02.074+01:00  INFO 25171 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T13:11:02.086+01:00  INFO 25171 --- [    BadThread-4] reactor.Mono.UsingWhen.1                 : cancel()
2024-02-12T13:11:02.192+01:00  INFO 25171 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T13:11:02.192+01:00  INFO 25171 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T13:11:02.205+01:00  INFO 25171 --- [    BadThread-4] reactor.Mono.UsingWhen.1                 : cancel()

Now the GoodThread is also logging the attempt to Obtaining a new connection. It wasn't the case before. Moreover the BadThread seems already releasing the connection. However the leak occurs.

Finally I decided to increase the delay to 1250ms (less frequently than the good query which runs every 1000ms):

.delaySubscription(Duration.ofMillis(1250), Schedulers.newSingle("LeakingThread"))

At least at the beginning everything seems working and the Log looks completely different:

2024-02-12T13:20:49.931+01:00 DEBUG 25449 --- [LeakingThread-2] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2024-02-12T13:20:49.936+01:00 DEBUG 25449 --- [actor-tcp-nio-1] io.r2dbc.pool.PooledConnection           : Releasing connection
2024-02-12T13:20:49.936+01:00  INFO 25449 --- [actor-tcp-nio-1] reactor.Mono.UsingWhen.2                 : onNext(PostgresqlResult{context=ConnectionContext{client=io.r2dbc.postgresql.client.ReactorNettyClient@10a36d1b, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@157ae3d9, connection=PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@10a36d1b, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@157ae3d9}, configuration=PostgresqlConnectionConfiguration{applicationName='r2dbc-postgresql', autodetectExtensions='true', compatibilityMode=false, connectTimeout=null, errorResponseLogLevel=DEBUG, database='postgres', extensions=[], fetchSize=io.r2dbc.postgresql.PostgresqlConnectionConfiguration$Builder$$Lambda$642/0x00000008004bc240@34d713a2, forceBinary='false', lockWaitTimeout='null, loopResources='null', multiHostConfiguration='null', noticeLogLevel='DEBUG', options='{}', password='****', preferAttachedBuffers=false, singleHostConfiguration=SingleHostConfiguration{host='127.0.0.1', port=5432, socket='null'}, statementTimeout=null, tcpKeepAlive=false, tcpNoDelay=true, timeZone=sun.util.calendar.ZoneInfo[id="Europe/Zurich",offset=3600000,dstSavings=3600000,useDaylight=true,transitions=119,lastRule=java.util.SimpleTimeZone[id=Europe/Zurich,offset=3600000,dstSavings=3600000,useDaylight=true,startYear=0,startMode=2,startMonth=2,startDay=-1,startDayOfWeek=1,startTime=3600000,startTimeMode=2,endMode=2,endMonth=9,endDay=-1,endDayOfWeek=1,endTime=3600000,endTimeMode=2]], username='postgres'}, portalNameSupplier=io.r2dbc.postgresql.DefaultPortalNameSupplier@ce2d296, statementCache=IndefiniteStatementCache{cache={}, counter=0}}, messages=WindowFlux})
2024-02-12T13:20:49.936+01:00  INFO 25449 --- [actor-tcp-nio-1] reactor.Mono.UsingWhen.2                 : onComplete()
2024-02-12T13:20:49.946+01:00  INFO 25449 --- [    BadThread-4] reactor.Mono.UsingWhen.2                 : cancel()
2024-02-12T13:20:50.791+01:00  INFO 25449 --- [   GoodThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T13:20:50.792+01:00  INFO 25449 --- [   GoodThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T13:20:50.793+01:00 DEBUG 25449 --- [   GoodThread-1] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2024-02-12T13:20:50.800+01:00 DEBUG 25449 --- [actor-tcp-nio-1] io.r2dbc.pool.PooledConnection           : Releasing connection
2024-02-12T13:20:50.801+01:00  INFO 25449 --- [actor-tcp-nio-1] reactor.Mono.UsingWhen.1                 : onNext(PostgresqlResult{context=ConnectionContext{client=io.r2dbc.postgresql.client.ReactorNettyClient@10a36d1b, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@157ae3d9, connection=PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@10a36d1b, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@157ae3d9}, configuration=PostgresqlConnectionConfiguration{applicationName='r2dbc-postgresql', autodetectExtensions='true', compatibilityMode=false, connectTimeout=null, errorResponseLogLevel=DEBUG, database='postgres', extensions=[], fetchSize=io.r2dbc.postgresql.PostgresqlConnectionConfiguration$Builder$$Lambda$642/0x00000008004bc240@34d713a2, forceBinary='false', lockWaitTimeout='null, loopResources='null', multiHostConfiguration='null', noticeLogLevel='DEBUG', options='{}', password='****', preferAttachedBuffers=false, singleHostConfiguration=SingleHostConfiguration{host='127.0.0.1', port=5432, socket='null'}, statementTimeout=null, tcpKeepAlive=false, tcpNoDelay=true, timeZone=sun.util.calendar.ZoneInfo[id="Europe/Zurich",offset=3600000,dstSavings=3600000,useDaylight=true,transitions=119,lastRule=java.util.SimpleTimeZone[id=Europe/Zurich,offset=3600000,dstSavings=3600000,useDaylight=true,startYear=0,startMode=2,startMonth=2,startDay=-1,startDayOfWeek=1,startTime=3600000,startTimeMode=2,endMode=2,endMonth=9,endDay=-1,endDayOfWeek=1,endTime=3600000,endTimeMode=2]], username='postgres'}, portalNameSupplier=io.r2dbc.postgresql.DefaultPortalNameSupplier@ce2d296, statementCache=IndefiniteStatementCache{cache={}, counter=0}}, messages=WindowFlux})
2024-02-12T13:20:50.801+01:00  INFO 25449 --- [actor-tcp-nio-1] reactor.Mono.UsingWhen.1                 : onComplete()
2024-02-12T13:20:51.202+01:00  INFO 25449 --- [LeakingThread-2] reactor.Mono.UsingWhen.2                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T13:20:51.203+01:00  INFO 25449 --- [LeakingThread-2] reactor.Mono.UsingWhen.2                 : request(unbounded)
2024-02-12T13:20:51.203+01:00 DEBUG 25449 --- [LeakingThread-2] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2024-02-12T13:20:51.209+01:00 DEBUG 25449 --- [actor-tcp-nio-1] io.r2dbc.pool.PooledConnection           : Releasing connection
2024-02-12T13:20:51.209+01:00  INFO 25449 --- [actor-tcp-nio-1] reactor.Mono.UsingWhen.2                 : onNext(PostgresqlResult{context=ConnectionContext{client=io.r2dbc.postgresql.client.ReactorNettyClient@10a36d1b, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@157ae3d9, connection=PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@10a36d1b, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@157ae3d9}, configuration=PostgresqlConnectionConfiguration{applicationName='r2dbc-postgresql', autodetectExtensions='true', compatibilityMode=false, connectTimeout=null, errorResponseLogLevel=DEBUG, database='postgres', extensions=[], fetchSize=io.r2dbc.postgresql.PostgresqlConnectionConfiguration$Builder$$Lambda$642/0x00000008004bc240@34d713a2, forceBinary='false', lockWaitTimeout='null, loopResources='null', multiHostConfiguration='null', noticeLogLevel='DEBUG', options='{}', password='****', preferAttachedBuffers=false, singleHostConfiguration=SingleHostConfiguration{host='127.0.0.1', port=5432, socket='null'}, statementTimeout=null, tcpKeepAlive=false, tcpNoDelay=true, timeZone=sun.util.calendar.ZoneInfo[id="Europe/Zurich",offset=3600000,dstSavings=3600000,useDaylight=true,transitions=119,lastRule=java.util.SimpleTimeZone[id=Europe/Zurich,offset=3600000,dstSavings=3600000,useDaylight=true,startYear=0,startMode=2,startMonth=2,startDay=-1,startDayOfWeek=1,startTime=3600000,startTimeMode=2,endMode=2,endMonth=9,endDay=-1,endDayOfWeek=1,endTime=3600000,endTimeMode=2]], username='postgres'}, portalNameSupplier=io.r2dbc.postgresql.DefaultPortalNameSupplier@ce2d296, statementCache=IndefiniteStatementCache{cache={}, counter=0}}, messages=WindowFlux})
2024-02-12T13:20:51.209+01:00  INFO 25449 --- [actor-tcp-nio-1] reactor.Mono.UsingWhen.2                 : onComplete()
2024-02-12T13:20:51.217+01:00  INFO 25449 --- [    BadThread-4] reactor.Mono.UsingWhen.2                 : cancel()
2024-02-12T13:20:51.805+01:00  INFO 25449 --- [   GoodThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T13:20:51.806+01:00  INFO 25449 --- [   GoodThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T13:20:51.806+01:00 DEBUG 25449 --- [   GoodThread-1] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2024-02-12T13:20:51.812+01:00 DEBUG 25449 --- [actor-tcp-nio-1] io.r2dbc.pool.PooledConnection           : Releasing connection
2024-02-12T13:20:51.813+01:00  INFO 25449 --- [actor-tcp-nio-1] reactor.Mono.UsingWhen.1                 : onNext(PostgresqlResult{context=ConnectionContext{client=io.r2dbc.postgresql.client.ReactorNettyClient@10a36d1b, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@157ae3d9, connection=PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@10a36d1b, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@157ae3d9}, configuration=PostgresqlConnectionConfiguration{applicationName='r2dbc-postgresql', autodetectExtensions='true', compatibilityMode=false, connectTimeout=null, errorResponseLogLevel=DEBUG, database='postgres', extensions=[], fetchSize=io.r2dbc.postgresql.PostgresqlConnectionConfiguration$Builder$$Lambda$642/0x00000008004bc240@34d713a2, forceBinary='false', lockWaitTimeout='null, loopResources='null', multiHostConfiguration='null', noticeLogLevel='DEBUG', options='{}', password='****', preferAttachedBuffers=false, singleHostConfiguration=SingleHostConfiguration{host='127.0.0.1', port=5432, socket='null'}, statementTimeout=null, tcpKeepAlive=false, tcpNoDelay=true, timeZone=sun.util.calendar.ZoneInfo[id="Europe/Zurich",offset=3600000,dstSavings=3600000,useDaylight=true,transitions=119,lastRule=java.util.SimpleTimeZone[id=Europe/Zurich,offset=3600000,dstSavings=3600000,useDaylight=true,startYear=0,startMode=2,startMonth=2,startDay=-1,startDayOfWeek=1,startTime=3600000,startTimeMode=2,endMode=2,endMonth=9,endDay=-1,endDayOfWeek=1,endTime=3600000,endTimeMode=2]], username='postgres'}, portalNameSupplier=io.r2dbc.postgresql.DefaultPortalNameSupplier@ce2d296, statementCache=IndefiniteStatementCache{cache={}, counter=0}}, messages=WindowFlux})
2024-02-12T13:20:51.813+01:00  INFO 25449 --- [actor-tcp-nio-1] reactor.Mono.UsingWhen.1                 : onComplete()
2024-02-12T13:20:52.473+01:00  INFO 25449 --- [LeakingThread-2] reactor.Mono.UsingWhen.2                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T13:20:52.474+01:00  INFO 25449 --- [LeakingThread-2] reactor.Mono.UsingWhen.2                 : request(unbounded)

The BadThread, I'm not sure why, seems kicking in after the LeakingThread has finished to run the query (onComplete()). Moreover this time the query is currently running I suppose since there is the reactor-tcp-nio-1 thread.

@mp911de
Copy link
Member

mp911de commented Feb 13, 2024

Releasing connection is logged upon subscription to the Connection.close() publisher. This indicates that usingWhen isn't subscribing to the publisher and in that regard, it is more likely a Reactor Core issue.

@chemicL
Copy link

chemicL commented Feb 26, 2024

@mp911de Hey :) I was summoned here from reactor/reactor-core#3695. Please note that the original reporter is not using the usingWhen operator. I am trying to understand the issues that users run into with the API and javadoc of that operator and I am failing to see where it might fail to deliver on its promises or where improvements are needed. This example seems at first to involve the operator, however, as just noted - the problem reported also manifests when this operator is not involved at all. Perhaps the issue lies elsewhere (either in other parts of reactor-core or in r2dbc)? I'd suggest to investigate the problem that doesn't involve the usingWhen operator first and once that is fixed we can see if usingWhen is still a source of issues or perhaps behaves as expected.

@agorbachenko
Copy link
Author

agorbachenko commented Feb 26, 2024

@chemicL, hmm... As I understand, my example uses usingWhen implicitly via DefaultDatabaseClient's org.springframework.r2dbc.core.DefaultDatabaseClient#inConnection: SimpleR2dbcRepository -> R2dbcEntityTemplate implements R2dbcEntityOperations -> DatabaseClient. Doesn't it?

@chemicL
Copy link

chemicL commented Feb 26, 2024

@agorbachenko thanks for the quick response. I was not aware of the internals in Spring, I just had a look at r2dbc sources. I'll have a look then!

@SimoneGiusso
Copy link

I was not aware of the internals in Spring, I just had a look at r2dbc sources. I'll have a look then!

Btw @chemicL I revised the author example keeping the test logic and its goal on showing the issue. However I removed the "Spring" layer, hoping it make it easier to understand. I don't think you have to look at the spring internal if you just look at the revised example here.

@chemicL
Copy link

chemicL commented Feb 26, 2024

Thanks @SimoneGiusso. We experimented together with @mp911de today with both examples. Thanks for providing your version of it! What I can say is that usingWhen is definitely not at fault here but handling of cancellation at the phase before the connection is acquired. When the connection is abandoned before being handed over, it ends up in the pool. However, the execution never picks up the allocated connection nor cleans it up going forward. So I'll try to investigate a bit further and we can see together with @mp911de if something elsewhere in reactor-core is at fault, or the use of it in r2dbc, or in reactor-pool...

@mp911de mp911de added the for: external-project For an external project and not something we can fix label Feb 27, 2024
@mp911de
Copy link
Member

mp911de commented Feb 27, 2024

Thanks to @chemicL's investigation it seems that the underlying Reactor Pool causes the issue. The related issue is reactor/reactor-pool#124. I think the problem is going to be addressed there so closing the ticket here.

@mp911de mp911de closed this as not planned Won't fix, can't repro, duplicate, stale Feb 27, 2024
@mp911de mp911de reopened this Mar 6, 2024
@mp911de mp911de added status: waiting-for-triage An issue we've not yet triaged and removed for: external-project For an external project and not something we can fix labels Mar 6, 2024
@mp911de
Copy link
Member

mp911de commented Mar 6, 2024

@chemicL and I ran some investigations. For now, it seems that the issue only happens when there is an actual driver involved. So far, we tested only against Postgres. Using R2DBC Mocks doesn't reproduce the problem so we need to widen our scope again.

Anyone willing to help looking into the cause is welcome.

@chemicL
Copy link

chemicL commented Mar 6, 2024

What I have found is that the connection acquisition is working properly and the connection is not stuck in the pool, but is being held by an actor that fails to consume the responses for an ongoing query. The scenario is as follows:

  1. bad actor acquires the connection
  2. bad actor starts an exchange and sends the request
  3. a response is being generated and the bad actor starts consuming
  4. bad actor cancels before consuming the response entirely
  5. bad actor returns the connection to the pool
  6. good actor acquires the connection
  7. good actor sends a request
  8. good actor never receives the response

So with that scenario I replicated what was observed using the Postgres driver to check just the integration and the code is in https://github.com/chemicL/r2dbc-lab. However, all the tests pass in those mocked scenarios. Further investigation is needed.

It looks as if the reactor-pool issue mentioned above is not the root cause here.

@cyberluke
Copy link

@chemicL Hello Dariusz, greetings to Krakow! Yesterday I checked your r2dbc-lab in order to reproduce the problem. But your current example is using H2, not Postgre. I had to rewrite it for Postgre. Anyway I cannot reproduce the problem with bad actor. Can we help each other?

@chemicL
Copy link

chemicL commented Apr 26, 2024

Hey, @cyberluke 👋 I'm afraid I don't understand - r2dbc-lab has no actual dependency on H2 nor Postgresql. It uses only mocks with r2dbc-pool, r2dbc-spi, and r2dbc-spi-test. Can you elaborate? Also, r2dbc-lab shows in fact that using the primitives from the SPI there is no issue - the problem is not reproducible in isolation, only using the examples provided by @SimoneGiusso and @agorbachenko.

@cyberluke
Copy link

@chemicL You're right, too much work. That H2 database was used in R2DBC Test case to reproduce the bug from another issue here.

@blangenberg
Copy link

Is there a status update on this issue?

@alexeykurshakov
Copy link

alexeykurshakov commented Sep 25, 2024

@blangenberg @agorbachenko @mp911de If you increase size of "reactor.bufferSize.small" property the issue will gone. The source of problem is in r2dbc-postgresql ReactorNettyClient class. The internal ReactorNettyClient.Conversation structure is stucked if data consumer not consumed data. And this is what happened when bad thread cancelled it request

@chemicL
Copy link

chemicL commented Sep 25, 2024

@alexeykurshakov great finding! Thanks also for opening pgjdbc/r2dbc-postgresql#661 - linking it here for reference.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: waiting-for-triage An issue we've not yet triaged
Projects
None yet
Development

No branches or pull requests

8 participants