Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection not closed when parent coroutine timeout #1436

Closed
namjug-kim opened this issue Jan 3, 2023 · 32 comments · Fixed by #1810
Closed

Connection not closed when parent coroutine timeout #1436

namjug-kim opened this issue Jan 3, 2023 · 32 comments · Fixed by #1810
Assignees
Milestone

Comments

@namjug-kim
Copy link

I wanted to give timeout when I used the connection through MutinySessionFactory.
This is a reproduction code for reproducing a problem situation. In fact, Netty-based API servers are used with timeout, and Connection leaks occur.
In this situation, pool hang forever.

@Test
fun testTimeout() {
        println("execute")
        try {
            runBlocking {
                withTimeout(Duration.ofMillis(150)) {
                    try {
                        val withStatelessSession = sessionFactory.withStatelessSession { session ->
                            val createNativeQuery = session.createNativeQuery<Long>("SELECT COUNT(1) FROM User")
                            createNativeQuery.singleResultOrNull
                        }
                        println("result : ${withStatelessSession.awaitSuspending()}")
                    } finally {
                        println("finally called")
                    }
                }
            }
        } catch (e: Exception) {
            e.printStackTrace()
        }
        CountDownLatch(1).await()
}

expected action is to close connection as shown in the log below.

[] 2023-01-03 15:19:58,209 DEBUG [vert.x-eventloop-thread-0] o.h.r.j.i.LogicalConnectionMan: Initiating JDBC connection release from afterTransaction
[] 2023-01-03 15:19:58,210 TRACE [vert.x-eventloop-thread-0] o.h.r.p.i.SqlClientConnection: Connection closed: io.vertx.mysqlclient.impl.MySQLConnectionImpl@39e6b6f7

However, the actual operation did not close Connection.

execute
[] 2023-01-03 15:15:31,600 DEBUG [Test worker @coroutine#1] o.h.r.m.i.MutinySessionFactory: No existing open Mutiny.StatelessSession was found in the current Vert.x context: opening a new instance
[] 2023-01-03 15:15:31,632 DEBUG [vert.x-eventloop-thread-0] i.n.channel.DefaultChannelId: -Dio.netty.processId: 54046 (auto-detected)
[] 2023-01-03 15:15:31,634 DEBUG [vert.x-eventloop-thread-0] i.n.channel.DefaultChannelId: -Dio.netty.machineId: f8:4d:89:ff:fe:69:71:a4 (auto-detected)
[] 2023-01-03 15:15:31,648 DEBUG [vert.x-eventloop-thread-0] i.n.b.PooledByteBufAllocator: -Dio.netty.allocator.numHeapArenas: 20
[] 2023-01-03 15:15:31,648 DEBUG [vert.x-eventloop-thread-0] i.n.b.PooledByteBufAllocator: -Dio.netty.allocator.numDirectArenas: 20
[] 2023-01-03 15:15:31,648 DEBUG [vert.x-eventloop-thread-0] i.n.b.PooledByteBufAllocator: -Dio.netty.allocator.pageSize: 8192
[] 2023-01-03 15:15:31,648 DEBUG [vert.x-eventloop-thread-0] i.n.b.PooledByteBufAllocator: -Dio.netty.allocator.maxOrder: 9
[] 2023-01-03 15:15:31,648 DEBUG [vert.x-eventloop-thread-0] i.n.b.PooledByteBufAllocator: -Dio.netty.allocator.chunkSize: 4194304
[] 2023-01-03 15:15:31,648 DEBUG [vert.x-eventloop-thread-0] i.n.b.PooledByteBufAllocator: -Dio.netty.allocator.smallCacheSize: 256
[] 2023-01-03 15:15:31,648 DEBUG [vert.x-eventloop-thread-0] i.n.b.PooledByteBufAllocator: -Dio.netty.allocator.normalCacheSize: 64
[] 2023-01-03 15:15:31,648 DEBUG [vert.x-eventloop-thread-0] i.n.b.PooledByteBufAllocator: -Dio.netty.allocator.maxCachedBufferCapacity: 32768
[] 2023-01-03 15:15:31,648 DEBUG [vert.x-eventloop-thread-0] i.n.b.PooledByteBufAllocator: -Dio.netty.allocator.cacheTrimInterval: 8192
[] 2023-01-03 15:15:31,648 DEBUG [vert.x-eventloop-thread-0] i.n.b.PooledByteBufAllocator: -Dio.netty.allocator.cacheTrimIntervalMillis: 0
[] 2023-01-03 15:15:31,648 DEBUG [vert.x-eventloop-thread-0] i.n.b.PooledByteBufAllocator: -Dio.netty.allocator.useCacheForAllThreads: false
[] 2023-01-03 15:15:31,648 DEBUG [vert.x-eventloop-thread-0] i.n.b.PooledByteBufAllocator: -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023
[] 2023-01-03 15:15:31,653 DEBUG [vert.x-eventloop-thread-0] io.netty.buffer.ByteBufUtil: -Dio.netty.allocator.type: pooled
[] 2023-01-03 15:15:31,653 DEBUG [vert.x-eventloop-thread-0] io.netty.buffer.ByteBufUtil: -Dio.netty.threadLocalDirectBufferSize: 0
[] 2023-01-03 15:15:31,653 DEBUG [vert.x-eventloop-thread-0] io.netty.buffer.ByteBufUtil: -Dio.netty.maxThreadLocalCharBufferSize: 16384
[] 2023-01-03 15:15:31,696 DEBUG [vert.x-eventloop-thread-0] io.netty.util.Recycler: -Dio.netty.recycler.maxCapacityPerThread: 4096
[] 2023-01-03 15:15:31,696 DEBUG [vert.x-eventloop-thread-0] io.netty.util.Recycler: -Dio.netty.recycler.ratio: 8
[] 2023-01-03 15:15:31,696 DEBUG [vert.x-eventloop-thread-0] io.netty.util.Recycler: -Dio.netty.recycler.chunkSize: 32
[] 2023-01-03 15:15:31,696 DEBUG [vert.x-eventloop-thread-0] io.netty.util.Recycler: -Dio.netty.recycler.blocking: false
[] 2023-01-03 15:15:31,699 DEBUG [vert.x-eventloop-thread-0] i.netty.buffer.AbstractByteBuf: -Dio.netty.buffer.checkAccessible: true
[] 2023-01-03 15:15:31,699 DEBUG [vert.x-eventloop-thread-0] i.netty.buffer.AbstractByteBuf: -Dio.netty.buffer.checkBounds: true
[] 2023-01-03 15:15:31,699 DEBUG [vert.x-eventloop-thread-0] i.n.u.ResourceLeakDetectorFact: Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@78c7da6e
[] 2023-01-03 15:15:31,716 TRACE [vert.x-eventloop-thread-0] o.h.r.p.i.SqlClientConnection: Connection created: io.vertx.mysqlclient.impl.MySQLConnectionImpl@39e6b6f7
[] 2023-01-03 15:15:31,753 DEBUG [vert.x-eventloop-thread-0] o.h.s.i.StatisticsInitiator: Statistics initialized [enabled=false]
finally called
kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 150 ms
	(Coroutine boundary)
	at com.TestService$name$1$1.invokeSuspend(TestService.kt:47)
	at com.TestService$name$1.invokeSuspend(TestService.kt:41)
	(Coroutine creation stacktrace)
	at kotlin.coroutines.intrinsics.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:122)
	at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:30)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
	at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
	at com.TestService.name(TestService.kt:40)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
	at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
	at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
	at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:99)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:79)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:75)
	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
	at jdk.proxy1/jdk.proxy1.$Proxy2.stop(Unknown Source)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:193)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
	at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:133)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:71)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
Caused by: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 150 ms
	at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:184)
	at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:154)
	at kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.common.kt:508)
	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:284)
	at kotlinx.coroutines.DefaultExecutor.run(DefaultExecutor.kt:108)
	at java.base/java.lang.Thread.run(Thread.java:1589)
[] 2023-01-03 15:15:31,765 DEBUG [vert.x-eventloop-thread-0] o.h.e.j.spi.SqlStatementLogger: SELECT COUNT(1) FROM User
[] 2023-01-03 15:15:31,787 DEBUG [vert.x-eventloop-thread-0] o.j.l.DelegatingBasicLogger: Result set row: 0
[] 2023-01-03 15:15:31,787 DEBUG [vert.x-eventloop-thread-0] org.hibernate.loader.Loader: Result row: 
[] 2023-01-03 15:15:31,791 DEBUG [vert.x-eventloop-thread-0] o.h.r.j.i.LogicalConnectionMan: Initiating JDBC connection release from afterTransaction

@namjug-kim
Copy link
Author

namjug-kim commented Jan 3, 2023

in MutinySessionFactoryImpl#withSession theres no Uni#onCancellation, is this intended?

// MutinySessionFactoryImpl
	private<S extends Mutiny.Closeable, T> Uni<T> withSession(
			Uni<S> sessionUni,
			Function<S, Uni<T>> work,
			Context.Key<S> contextKey) {
		return sessionUni.chain( session -> Uni.createFrom().voidItem()
				.invoke( () -> context.put( contextKey, session ) )
				.chain( () -> work.apply( session ) )
				.eventually( () -> context.remove( contextKey ) )
				.eventually(session::close)
		);
	}
// MutinyStatelessSessionImpl
		Uni<T> executeInTransaction(Function<Mutiny.Transaction, Uni<T>> work) {
			return work.apply( this )
					// in the case of an exception or cancellation
					// we need to rollback the transaction
					.onFailure().call( this::rollback )
					.onCancellation().call( this::rollback )
					// finally, when there was no exception,
					// commit or rollback the transaction
					.call( () -> rollback ? rollback() : commit() );
		}

I saw that Transaction has processing for onCacellation. Is there a best practice to handle cancel event in (withSession, withTransaction)?

@DavideD
Copy link
Member

DavideD commented Jan 5, 2023

in MutinySessionFactoryImpl#withSession theres no Uni#onCancellation, is this intended?

Mmmh... that's a good point. I haven't thought of it.
I was expecting that using eventually would cover all the basis, but I haven't tested what happens with onCancellation.
We will have to check.

@gavinking
Copy link
Member

It seems to me that eventually() should do what you expect here, and if it doesn’t then that’s more like a bug.

@namjug-kim
Copy link
Author

namjug-kim commented Jan 5, 2023

val cancellable = sessionFactory
    .withStatelessSession { it.createNativeQuery<Long>("SELECT COUNT(1) FROM User").singleResultOrNull }
    .subscribe()
    .with { println("success : $it") }

val timer = Timer()
timer.schedule(object : TimerTask() {
    override fun run() {
        cancellable.cancel()
    }
}, 150)

CountDownLatch(1).await()

eventually() not invoked in this test..

In uni eventually() theres no process for cancel or terminate

This method is a shortcut for UniOnItemOrFailure.call(BiFunction): onItemOrFailure().call((item, err) -> supplier.get())

@DavideD
Copy link
Member

DavideD commented Jan 5, 2023

I'm going to create an issue for Vert.x, but in the meantime, we should fix this.

@DavideD
Copy link
Member

DavideD commented Jan 5, 2023

Sorry, I mean mutiny (not Vert.x)

@DavideD
Copy link
Member

DavideD commented Jan 5, 2023

@cescoffier, @jponge What do you think?

@namjug-kim
Copy link
Author

smallrye/smallrye-mutiny#178

Looking at the issue of discussion about eventually api, the omission of terminate and cancel in eventually seems to be intended.

@namjug-kim
Copy link
Author

namjug-kim commented Jan 5, 2023

    private <S extends Mutiny.Closeable, T> Uni<T> withSession(
            Uni<S> sessionUni,
            Function<S, Uni<T>> work,
            Context.Key<S> contextKey) {
        AtomicBoolean connectionCloseCalled = new AtomicBoolean(false);
        return sessionUni.chain(session -> Uni.createFrom().voidItem()
                .invoke(() -> context.put(contextKey, session))
                .chain(() -> work.apply(session))
                .eventually(() -> {
                    if (!connectionCloseCalled.getAndSet(true)) {
                        return Uni.createFrom().voidItem()
                                .invoke(() -> context.remove(contextKey))
                                .chain(session::close);
                    } else {
                        return Uni.createFrom().voidItem();
                    }
                })
                .onCancellation().call(() -> {
                    if (!connectionCloseCalled.getAndSet(true)) {
                        return Uni.createFrom().voidItem()
                                .invoke(() -> context.remove(contextKey))
                                .chain(session::close);
                    } else {
                        return Uni.createFrom().voidItem();
                    }
                })
        );
    }

I tested it by adding the action of closing the session to onCancellation()

I expected it work well. But, when the pool is full and enter the queue, the connection from the queue stuck connection pool.
onCancellation() was executed on the thread that called cancel.

If cancel inside vertx context it works fine for me :)

@gavinking
Copy link
Member

Well it might have been intended but from this discussion it doesn’t seem like the current behavior actually achieves what you would want it to do from the user point of view.

@gavinking
Copy link
Member

That is to say: it’s supposed to be a finally

@cescoffier
Copy link

eventually() is not called on cancellation.
If you want cancellation, you need: onTermination().invoke(....), also called on cancellation.

Note that when this callback is invoked following a cancellation, it will be called on the thread having called cancel().

@cescoffier
Copy link

(The question about whether eventually should be called on cancellation is something we need to think about)

@gavinking
Copy link
Member

But then Clement, naively, that seems to make eventually() useless and misleading, since it can’t be used to do cleanup, which would seem to be its whole purpose. I would predict that almost everyone who calls that method expects it to be “always called”, and so almost every usage of it is actually broken.

Am I missing something here? What is the use case for a “finally” which sometimes doesn’t get called?

@cescoffier
Copy link

@gavinking I agree, and actually, I had to try because I was pretty sure it was called on cancellation too. @jponge is going to open an issue on mutiny.

@jponge
Copy link
Contributor

jponge commented Jan 5, 2023

eventually is currently always called for completion and error signals. So yes we should change the implementation since eventually exists primarily for cleanup purposes.

@gavinking
Copy link
Member

OK, great perfect, thanks folks 🙏

@jponge
Copy link
Contributor

jponge commented Jan 5, 2023

Meanwhile you can use .onTermination().invoke(Functions.TriConsumer<T, Throwable, Boolean> consumer) to handle all the cases (item received, failure, or cancelled)

@jponge
Copy link
Contributor

jponge commented Jan 5, 2023

Note that I'll backport the fix to Mutiny 1.x, I'm due a 1.9.0.

@namjug-kim
Copy link
Author

@jponge
eventually modified in version 1.9.0, run in the same context as upstream when executed with a cancel signal?

@jponge
Copy link
Contributor

jponge commented Jan 5, 2023

eventually will be called from the thread that forwards a cancel() signal. Just like eventually is called from the thread that forwards an item or a failure.

@cescoffier
Copy link

@namjug-kim probably not. Mutiny does not have the notion of context (this comes from Vert.x, not mutiny which is just an API).

@namjug-kim
Copy link
Author

Eveneventually handling cancel signals does not seem to solve this issue 🥲
eventually executed from cancel signal will called on the thread having called cancel().

.eventually( () -> context.remove( contextKey ) )
.eventually(session::close)

context.remove(contextKey) will not be erased because it is not the same vertx context that put session in the context.
I don't know why there is this problem because I don't know in detail about vertx pool behavior. If the pool is acquired from the queue of the pool, the pool is stuck.

@jponge
Copy link
Contributor

jponge commented Jan 6, 2023

You could do something like: (not sure it compiles or even works, it's sketch)

Uni.createFrom().deferred(() -> {
  // Get the correct Vert.x context when the Uni is created
  Context vertxContext = Vertx.getOrCreateContext();
  // Create the Uni as before
  return Uni.createFrom()
    // (...)
    .eventually(() -> {
      // With the Vert.x context, dispatch the session close on what should be the correct thread even if it's a cancellation
      vertxContext.runOnContext(() -> session.close());
    });
})
// (...)

Makes sense?

@namjug-kim
Copy link
Author

namjug-kim commented Jan 6, 2023

it makes sense..!

// if not in vertx context
Uni.createFrom().emitter(emitter -> vertxContext.runOnContext(handler -> session.close().subscribe().with(emitter::complete, emitter::fail)))

Apart from Mutiny, Im not sure the cancel signal can be handled correctly.
cancel signal can happen at any progress. and some point cause hang forever.

@Override
    public Uni<Mutiny.Session> openSession() {
        SessionCreationOptions options = options();
        return uni(() -> connection(options.getTenantIdentifier()))
                .chain(reactiveConnection -> create(reactiveConnection,
                        () -> new ReactiveSessionImpl(delegate, options, reactiveConnection)))
                .map(s -> new MutinySessionImpl(s, this));
    }

If the () -> connection(options.getTenantIdentifier()) is canceled while it is in progress(like enqueue pool waiter), the handler to process after it comes out of the pool waiter disappears and is permanently hang.

@gavinking
Copy link
Member

Makes sense?

Yeah, I don't think this is a problem, or at least, IIRC, it's a problem we already know how to solve, and already have code for, isn't that right @DavideD?

(FTR, you would have to solve the exact same problem, in the exact same way, if you did it via .onCancellation().call(), right?)

@jponge
Copy link
Contributor

jponge commented Jan 6, 2023

(FTR, you would have to solve the exact same problem, in the exact same way, if you did it via .onCancellation().call(), right?)

Yes, looks like a threading issue where you need to get the proper Vert.x context for the code being run.

The deferred Uni creation idea above could be a simple solution to capture it when the processing starts, and make sure you can reuse it to dispatch later on with eventually, onXYZ, etc.

jponge added a commit to smallrye/smallrye-mutiny that referenced this issue Jan 6, 2023
@DavideD
Copy link
Member

DavideD commented Jan 6, 2023

Yeah, I don't think this is a problem, or at least, IIRC, it's a problem we already know how to solve, and already have code for, isn't that right @DavideD?

I think so.
The context is bound to the SessionFactory, if somebody use withSession, we run everything in that context.

Is this what you mean?

namjug-kim added a commit to namjug-kim/hibernate-reactive that referenced this issue Jan 6, 2023
@namjug-kim
Copy link
Author

namjug-kim@185bb98

I made a test to verify the section that needs to be cleanup Connection.

To complete the task, need to change it to use the Context of Hibernate-reactive rather than VertxContext, but can you please confirm that it is the correct approach?

@DavideD
Copy link
Member

DavideD commented Jan 6, 2023

I think it would be better if you could first create a test case.
This way we can see the error and make sure that the fix actually solves it.

You can check one of the many tests we have in the test folder for inspiration. Check the one extending BaseReactiveTest

namjug-kim added a commit to namjug-kim/hibernate-reactive that referenced this issue Jan 9, 2023
namjug-kim added a commit to namjug-kim/hibernate-reactive that referenced this issue Jan 9, 2023
namjug-kim added a commit to namjug-kim/hibernate-reactive that referenced this issue Jan 9, 2023
namjug-kim added a commit to namjug-kim/hibernate-reactive that referenced this issue Jan 9, 2023
namjug-kim added a commit to namjug-kim/hibernate-reactive that referenced this issue Jan 9, 2023
namjug-kim added a commit to namjug-kim/hibernate-reactive that referenced this issue Jan 9, 2023
@namjug-kim
Copy link
Author

@DavideD
fail test for reproduce connection leak : namjug-kim@6d9bbe6
fix : namjug-kim@5576dc1

I didn't add onCancellation to all uni but i think every uni chain need onCancellation for cleanup resource.
I don't know this is the right way to fix 🥲
This is a way that can cause human error.

@DavideD
Copy link
Member

DavideD commented Jan 9, 2023

Thanks @namjug-kim,
I will check this as soon as possible.

namjug-kim added a commit to namjug-kim/hibernate-reactive that referenced this issue Jan 18, 2023
@DavideD DavideD self-assigned this Jun 7, 2023
DavideD added a commit to DavideD/hibernate-reactive that referenced this issue Nov 30, 2023
DavideD added a commit to DavideD/hibernate-reactive that referenced this issue Nov 30, 2023
DavideD added a commit to DavideD/hibernate-reactive that referenced this issue Nov 30, 2023
DavideD added a commit to DavideD/hibernate-reactive that referenced this issue Nov 30, 2023
@DavideD DavideD added this to the 2.2.1.Final milestone Jan 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants