Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duplicate (un-intended) commit with Spring Data R2dbc, causing "Async resource cleanup failed after onComplete" and "Transaction is already completed - do not call commit or rollback more than once per transaction" exceptions #28968

Closed
62mkv opened this issue Aug 16, 2022 · 5 comments
Assignees
Labels
in: data Issues in data modules (jdbc, orm, oxm, tx) status: duplicate A duplicate of another issue type: bug A general bug

Comments

@62mkv
Copy link

62mkv commented Aug 16, 2022

Affects: 5.3.10

I was planning to build a minimal reproducible project for this #28968, but now I have something even stranger, it might also be related to "spring-tx" but I still feel like it should be raised here originally. Because I (almost) never use spring-tx directly, so.. Anyway, let me know and I will move the issue to a proper project, if necessary.

So, we have a very small integration test: https://github.com/62mkv/spring-cockroach-transaction-retry/tree/duplicate-commit

This code demonstrates really weird (to me) behaviour: it tries to execute "COMMIT" twice, on a same transaction, which brings it first to "no transaction exists" exception and then (it seems) also tries to ROLLBACK, which also causes another exception again, this time from Spring infra.

Here's the logs I get:

2022-08-17 15:22:09.616  INFO 47728 --- [           main] victim                                   : | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
2022-08-17 15:22:09.618  INFO 47728 --- [           main] victim                                   : | request(32)
2022-08-17 15:22:09.625 DEBUG 47728 --- [           main] o.s.r.c.R2dbcTransactionManager          : Creating new transaction with name [null]: org.springframework.transaction.StaticTransactionDefinition@7db2b614
2022-08-17 15:22:10.269  INFO 47728 --- [           main] conflicting                              : | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
2022-08-17 15:22:10.269  INFO 47728 --- [           main] conflicting                              : | request(32)
2022-08-17 15:22:10.269 DEBUG 47728 --- [           main] o.s.r.c.R2dbcTransactionManager          : Creating new transaction with name [null]: org.springframework.transaction.StaticTransactionDefinition@7db2b614
2022-08-17 15:22:10.455 DEBUG 47728 --- [actor-tcp-nio-1] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2022-08-17 15:22:10.459 DEBUG 47728 --- [actor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager          : Acquired Connection [Mono.retry ? at io.r2dbc.pool.ConnectionPool.<init>(ConnectionPool.java:147)] for R2DBC transaction
2022-08-17 15:22:10.461 DEBUG 47728 --- [actor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager          : Switching R2DBC Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@8fa15c0, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@26722cf1}]] to manual commit
2022-08-17 15:22:10.464 DEBUG 47728 --- [actor-tcp-nio-1] io.r2dbc.postgresql.QUERY                : Executing query: BEGIN
2022-08-17 15:22:10.513 DEBUG 47728 --- [actor-tcp-nio-1] io.r2dbc.postgresql.QUERY                : Executing query: INSERT INTO entity1 (name) VALUES ($1) RETURNING id
2022-08-17 15:22:10.558 DEBUG 47728 --- [actor-tcp-nio-1] io.r2dbc.postgresql.QUERY                : Executing query: INSERT INTO entity2 (name, parent_id) VALUES ($1, $2) RETURNING id
2022-08-17 15:22:10.575 DEBUG 47728 --- [actor-tcp-nio-1] io.r2dbc.postgresql.QUERY                : Executing query: SELECT entity2.* FROM entity2
2022-08-17 15:22:10.591 DEBUG 47728 --- [actor-tcp-nio-1] io.r2dbc.postgresql.QUERY                : Executing query: SELECT entity1.* FROM entity1
2022-08-17 15:22:10.634 DEBUG 47728 --- [actor-tcp-nio-1] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2022-08-17 15:22:10.634 DEBUG 47728 --- [actor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager          : Acquired Connection [Mono.retry ? at io.r2dbc.pool.ConnectionPool.<init>(ConnectionPool.java:147)] for R2DBC transaction
2022-08-17 15:22:10.634 DEBUG 47728 --- [actor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager          : Switching R2DBC Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@1f75c477, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@29518079}]] to manual commit
2022-08-17 15:22:10.634 DEBUG 47728 --- [actor-tcp-nio-1] io.r2dbc.postgresql.QUERY                : Executing query: BEGIN
2022-08-17 15:22:10.642 DEBUG 47728 --- [actor-tcp-nio-1] io.r2dbc.postgresql.QUERY                : Executing query: INSERT INTO entity1 (name) VALUES ($1) RETURNING id
2022-08-17 15:22:10.649 DEBUG 47728 --- [actor-tcp-nio-1] io.r2dbc.postgresql.QUERY                : Executing query: INSERT INTO entity2 (name, parent_id) VALUES ($1, $2) RETURNING id
2022-08-17 15:22:10.664 DEBUG 47728 --- [actor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager          : Initiating transaction commit
2022-08-17 15:22:10.664 DEBUG 47728 --- [actor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager          : Committing R2DBC transaction on Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@1f75c477, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@29518079}]]
2022-08-17 15:22:10.665 DEBUG 47728 --- [actor-tcp-nio-1] io.r2dbc.postgresql.QUERY                : Executing query: COMMIT
2022-08-17 15:22:10.682 DEBUG 47728 --- [actor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager          : Releasing R2DBC Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@1f75c477, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@29518079}]] after transaction
2022-08-17 15:22:10.685  INFO 47728 --- [actor-tcp-nio-1] conflicting                              : | onNext(Entity1(id=89715b9c-013e-4f8a-aceb-8aa77a7a124e, name=name2))
2022-08-17 15:22:10.690  INFO 47728 --- [actor-tcp-nio-1] conflicting                              : | onComplete()
2022-08-17 15:22:13.618 DEBUG 47728 --- [     parallel-2] io.r2dbc.postgresql.QUERY                : Executing query: UPDATE entity1 SET name = $1 WHERE entity1.id = $2
2022-08-17 15:22:13.637 DEBUG 47728 --- [actor-tcp-nio-1] io.r2dbc.postgresql.QUERY                : Executing query: DELETE FROM entity2 WHERE entity2.parent_id = $1
2022-08-17 15:22:13.643 DEBUG 47728 --- [actor-tcp-nio-1] io.r2dbc.postgresql.QUERY                : Executing query: INSERT INTO entity2 (name, parent_id) VALUES ($1, $2) RETURNING id
2022-08-17 15:22:13.646  INFO 47728 --- [actor-tcp-nio-1] victim                                   : | onNext(Entity1(id=127a03a9-e092-4d11-842e-00437f4b244d, name=name1))
2022-08-17 15:22:13.646 DEBUG 47728 --- [actor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager          : Initiating transaction commit
2022-08-17 15:22:13.646 DEBUG 47728 --- [actor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager          : Committing R2DBC transaction on Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@8fa15c0, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@26722cf1}]]
2022-08-17 15:22:13.646 DEBUG 47728 --- [actor-tcp-nio-1] io.r2dbc.postgresql.QUERY                : Executing query: COMMIT
2022-08-17 15:22:13.656 DEBUG 47728 --- [actor-tcp-nio-1] io.r2dbc.postgresql.QUERY                : Executing query: COMMIT
2022-08-17 15:22:13.663 ERROR 47728 --- [actor-tcp-nio-1] o.s.t.r.TransactionalOperatorImpl        : Application exception overridden by rollback exception

java.lang.RuntimeException: Async resource cleanup failed after onComplete
	at reactor.core.publisher.FluxUsingWhen$CommitInner.onError(FluxUsingWhen.java:520) ~[reactor-core-3.4.21.jar:3.4.21]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxUsingWhen] :
	reactor.core.publisher.Flux.usingWhen(Flux.java:2121)
	org.springframework.transaction.reactive.TransactionalOperatorImpl.lambda$null$7(TransactionalOperatorImpl.java:98)
Error has been observed at the following site(s):
	*__Flux.usingWhen ? at org.springframework.transaction.reactive.TransactionalOperatorImpl.lambda$null$7(TransactionalOperatorImpl.java:98)
Original Stack Trace:
		at reactor.core.publisher.FluxUsingWhen$CommitInner.onError(FluxUsingWhen.java:520) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84) ~[reactor-core-3.4.21.jar:3.4.21]
		at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onError(FluxDiscardOnCancel.java:97) ~[r2dbc-postgresql-0.9.1.RELEASE.jar:0.9.1.RELEASE]
		at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84) ~[reactor-core-3.4.21.jar:3.4.21]
		at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onError(FluxDiscardOnCancel.java:97) ~[r2dbc-postgresql-0.9.1.RELEASE.jar:0.9.1.RELEASE]
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onError(FluxHandleFuseable.java:226) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onError(FluxMapFuseable.java:340) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onError(FluxFilterFuseable.java:382) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onError(FluxPeekFuseable.java:553) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.onNext(FluxHandle.java:347) ~[reactor-core-3.4.21.jar:3.4.21]
		at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onNext(FluxDiscardOnCancel.java:91) ~[r2dbc-postgresql-0.9.1.RELEASE.jar:0.9.1.RELEASE]
		at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:814) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:739) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:161) ~[reactor-core-3.4.21.jar:3.4.21]
		at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.emit(ReactorNettyClient.java:635) ~[r2dbc-postgresql-0.9.1.RELEASE.jar:0.9.1.RELEASE]
		at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:887) ~[r2dbc-postgresql-0.9.1.RELEASE.jar:0.9.1.RELEASE]
		at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:761) ~[r2dbc-postgresql-0.9.1.RELEASE.jar:0.9.1.RELEASE]
		at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:667) ~[r2dbc-postgresql-0.9.1.RELEASE.jar:0.9.1.RELEASE]
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:191) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onNext(FluxPeekFuseable.java:503) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279) ~[reactor-netty-core-1.0.21.jar:1.0.21]
		at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388) ~[reactor-netty-core-1.0.21.jar:1.0.21]
		at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404) ~[reactor-netty-core-1.0.21.jar:1.0.21]
		at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93) ~[reactor-netty-core-1.0.21.jar:1.0.21]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) ~[netty-codec-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314) ~[netty-codec-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435) ~[netty-codec-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) ~[netty-codec-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.79.Final.jar:4.1.79.Final]
		at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: io.r2dbc.postgresql.ExceptionFactory$PostgresqlNonTransientResourceException: there is no transaction in progress
	at io.r2dbc.postgresql.ExceptionFactory.createException(ExceptionFactory.java:109) ~[r2dbc-postgresql-0.9.1.RELEASE.jar:0.9.1.RELEASE]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxHandle] :
	reactor.core.publisher.Flux.handle(Flux.java:5799)
	io.r2dbc.postgresql.PostgresqlConnection.exchange(PostgresqlConnection.java:447)
Error has been observed at the following site(s):
	*_____________________Flux.handle ? at io.r2dbc.postgresql.PostgresqlConnection.exchange(PostgresqlConnection.java:447)
	|_              Flux.doOnComplete ? at io.r2dbc.postgresql.PostgresqlConnection.lambda$commitTransaction$4(PostgresqlConnection.java:190)
	|_                    Flux.filter ? at io.r2dbc.postgresql.PostgresqlConnection.lambda$commitTransaction$4(PostgresqlConnection.java:191)
	|_                      Flux.cast ? at io.r2dbc.postgresql.PostgresqlConnection.lambda$commitTransaction$4(PostgresqlConnection.java:192)
	|_                    Flux.handle ? at io.r2dbc.postgresql.PostgresqlConnection.lambda$commitTransaction$4(PostgresqlConnection.java:193)
	*______________________Flux.defer ? at io.r2dbc.postgresql.PostgresqlConnection.useTransactionStatus(PostgresqlConnection.java:434)
	|_                      Flux.then ? at io.r2dbc.postgresql.PostgresqlConnection.useTransactionStatus(PostgresqlConnection.java:436)
	|_ InternalFluxOperator.subscribe ? at io.r2dbc.postgresql.util.FluxDiscardOnCancel.subscribe(FluxDiscardOnCancel.java:49)
	*______________________Flux.defer ? at io.r2dbc.postgresql.PostgresqlConnection.useTransactionStatus(PostgresqlConnection.java:434)
	|_                      Flux.then ? at io.r2dbc.postgresql.PostgresqlConnection.useTransactionStatus(PostgresqlConnection.java:436)
	*_______________________Mono.then ? at org.springframework.r2dbc.connection.R2dbcTransactionManager.lambda$doCleanupAfterCompletion$13(R2dbcTransactionManager.java:328)
	*_______________________Mono.then ? at org.springframework.r2dbc.connection.R2dbcTransactionManager.lambda$doCleanupAfterCompletion$13(R2dbcTransactionManager.java:336)
	*______________________Mono.defer ? at org.springframework.r2dbc.connection.R2dbcTransactionManager.doCleanupAfterCompletion(R2dbcTransactionManager.java:314)
	*______________________Mono.defer ? at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.cleanupAfterCompletion(AbstractReactiveTransactionManager.java:674)
	*_______________________Mono.then ? at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.lambda$processCommit$25(AbstractReactiveTransactionManager.java:480)
	*______________Mono.onErrorResume ? at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.processCommit(AbstractReactiveTransactionManager.java:479)
	*_______________________Mono.then ? at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.processCommit(AbstractReactiveTransactionManager.java:480)
	*____________________Mono.flatMap ? at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.commit(AbstractReactiveTransactionManager.java:412)
Original Stack Trace:
		at io.r2dbc.postgresql.ExceptionFactory.createException(ExceptionFactory.java:109) ~[r2dbc-postgresql-0.9.1.RELEASE.jar:0.9.1.RELEASE]
		at io.r2dbc.postgresql.ExceptionFactory.createException(ExceptionFactory.java:65) ~[r2dbc-postgresql-0.9.1.RELEASE.jar:0.9.1.RELEASE]
		at io.r2dbc.postgresql.ExceptionFactory.handleErrorResponse(ExceptionFactory.java:132) ~[r2dbc-postgresql-0.9.1.RELEASE.jar:0.9.1.RELEASE]
		at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.onNext(FluxHandle.java:324) ~[reactor-core-3.4.21.jar:3.4.21]
		at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onNext(FluxDiscardOnCancel.java:91) ~[r2dbc-postgresql-0.9.1.RELEASE.jar:0.9.1.RELEASE]
		at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:814) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:739) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:161) ~[reactor-core-3.4.21.jar:3.4.21]
		at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.emit(ReactorNettyClient.java:635) ~[r2dbc-postgresql-0.9.1.RELEASE.jar:0.9.1.RELEASE]
		at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:887) ~[r2dbc-postgresql-0.9.1.RELEASE.jar:0.9.1.RELEASE]
		at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:761) ~[r2dbc-postgresql-0.9.1.RELEASE.jar:0.9.1.RELEASE]
		at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:667) ~[r2dbc-postgresql-0.9.1.RELEASE.jar:0.9.1.RELEASE]
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:191) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onNext(FluxPeekFuseable.java:503) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279) ~[reactor-netty-core-1.0.21.jar:1.0.21]
		at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388) ~[reactor-netty-core-1.0.21.jar:1.0.21]
		at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404) ~[reactor-netty-core-1.0.21.jar:1.0.21]
		at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93) ~[reactor-netty-core-1.0.21.jar:1.0.21]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) ~[netty-codec-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314) ~[netty-codec-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435) ~[netty-codec-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) ~[netty-codec-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.79.Final.jar:4.1.79.Final]
		at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]

2022-08-17 15:22:13.664 ERROR 47728 --- [actor-tcp-nio-1] victim                                   : | onError(org.springframework.transaction.IllegalTransactionStateException: Transaction is already completed - do not call commit or rollback more than once per transaction)
2022-08-17 15:22:13.665 ERROR 47728 --- [actor-tcp-nio-1] victim                                   : 

org.springframework.transaction.IllegalTransactionStateException: Transaction is already completed - do not call commit or rollback more than once per transaction
	at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.rollback(AbstractReactiveTransactionManager.java:492) ~[spring-tx-5.3.22.jar:5.3.22]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoError] :
	reactor.core.publisher.Mono.error(Mono.java:314)
	org.springframework.transaction.reactive.AbstractReactiveTransactionManager.rollback(AbstractReactiveTransactionManager.java:492)
Error has been observed at the following site(s):
	*__________Mono.error ? at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.rollback(AbstractReactiveTransactionManager.java:492)
	|_    Mono.onErrorMap ? at org.springframework.transaction.reactive.TransactionalOperatorImpl.rollbackOnException(TransactionalOperatorImpl.java:119)
	*___________Mono.then ? at org.springframework.transaction.reactive.TransactionalOperatorImpl.lambda$null$6(TransactionalOperatorImpl.java:105)
	*__Flux.onErrorResume ? at org.springframework.transaction.reactive.TransactionalOperatorImpl.lambda$null$7(TransactionalOperatorImpl.java:104)
	*____Mono.flatMapMany ? at org.springframework.transaction.reactive.TransactionalOperatorImpl.lambda$execute$8(TransactionalOperatorImpl.java:97)
	*____Mono.flatMapMany ? at org.springframework.transaction.reactive.TransactionalOperatorImpl.execute(TransactionalOperatorImpl.java:91)
	|_  Flux.contextWrite ? at org.springframework.transaction.reactive.TransactionalOperatorImpl.execute(TransactionalOperatorImpl.java:107)
	|_  Flux.contextWrite ? at org.springframework.transaction.reactive.TransactionalOperatorImpl.execute(TransactionalOperatorImpl.java:108)
Original Stack Trace:
		at org.springframework.transaction.reactive.AbstractReactiveTransactionManager.rollback(AbstractReactiveTransactionManager.java:492) ~[spring-tx-5.3.22.jar:5.3.22]
		at org.springframework.transaction.reactive.TransactionalOperatorImpl.rollbackOnException(TransactionalOperatorImpl.java:119) ~[spring-tx-5.3.22.jar:5.3.22]
		at org.springframework.transaction.reactive.TransactionalOperatorImpl.lambda$null$6(TransactionalOperatorImpl.java:105) ~[spring-tx-5.3.22.jar:5.3.22]
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.deferredError(FluxUsingWhen.java:398) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxUsingWhen$CommitInner.onError(FluxUsingWhen.java:521) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84) ~[reactor-core-3.4.21.jar:3.4.21]
		at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onError(FluxDiscardOnCancel.java:97) ~[r2dbc-postgresql-0.9.1.RELEASE.jar:0.9.1.RELEASE]
		at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84) ~[reactor-core-3.4.21.jar:3.4.21]
		at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onError(FluxDiscardOnCancel.java:97) ~[r2dbc-postgresql-0.9.1.RELEASE.jar:0.9.1.RELEASE]
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onError(FluxHandleFuseable.java:226) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onError(FluxMapFuseable.java:340) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onError(FluxFilterFuseable.java:382) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onError(FluxPeekFuseable.java:553) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.onNext(FluxHandle.java:347) ~[reactor-core-3.4.21.jar:3.4.21]
		at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onNext(FluxDiscardOnCancel.java:91) ~[r2dbc-postgresql-0.9.1.RELEASE.jar:0.9.1.RELEASE]
		at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:814) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:739) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:161) ~[reactor-core-3.4.21.jar:3.4.21]
		at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.emit(ReactorNettyClient.java:635) ~[r2dbc-postgresql-0.9.1.RELEASE.jar:0.9.1.RELEASE]
		at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:887) ~[r2dbc-postgresql-0.9.1.RELEASE.jar:0.9.1.RELEASE]
		at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:761) ~[r2dbc-postgresql-0.9.1.RELEASE.jar:0.9.1.RELEASE]
		at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:667) ~[r2dbc-postgresql-0.9.1.RELEASE.jar:0.9.1.RELEASE]
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:191) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onNext(FluxPeekFuseable.java:503) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224) ~[reactor-core-3.4.21.jar:3.4.21]
		at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279) ~[reactor-netty-core-1.0.21.jar:1.0.21]
		at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388) ~[reactor-netty-core-1.0.21.jar:1.0.21]
		at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404) ~[reactor-netty-core-1.0.21.jar:1.0.21]
		at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93) ~[reactor-netty-core-1.0.21.jar:1.0.21]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) ~[netty-codec-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314) ~[netty-codec-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435) ~[netty-codec-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) ~[netty-codec-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.79.Final.jar:4.1.79.Final]
		at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]

2022-08-17 15:22:13.666  INFO 47728 --- [actor-tcp-nio-1] victim                                   : | cancel()

the original issue text:

I've already commented under commit diff, although I am not sure if it's visible to anyone, so I decided to post an issue too: b5e5e33#r81307010

In the current main version, that would be question about

	})).then(Mono.empty().onErrorResume(ex -> {

I both can not imagine how that onErrorResume could be activated for a MonoEmpty publisher; also I have not been able to trigger any breakpoint inside this lambda, even though I can confirm that code visits both assembly phase parts and lambdas before and after this code.

Thanks for checking this out!

Some context: I am fighting a situation where, faced with an exception while COMMIT, the code (instead of retrying transaction, as we would want to, per https://www.cockroachlabs.com/docs/v22.1/transactions.html#client-side-intervention) tries to issue yet another COMMIT, which of course fails and now it's not a retryable exception even.

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged or decided on label Aug 16, 2022
@62mkv 62mkv changed the title Possibly bug in AbstractReactiveTransactionManager error handling Duplicate (un-intended) commit with Spring Data R2dbc, causing "Async resource cleanup failed after onComplete" and "Transaction is already completed - do not call commit or rollback more than once per transaction" exceptions Aug 18, 2022
@mp911de
Copy link
Member

mp911de commented Aug 25, 2022

There are a couple of things that come together:

  1. The commit exception handling is wrong and Mono.empty() does nothing (as pointed out). Fixing that issue reveals a couple other problems:
  2. The commit exception translation in R2dbcTransactionManager translates the commit failure into a DAO exception instead of a TransactionException. That leads to doRollbackOnCommitException which is unconditional. PlatformTransactionManager has a flag isRollbackOnCommitFailure that defaults to false.
  3. TransactionOperator's exception handling always calls rollbackOnException, rollbackOnException should happen only for exceptions during the actual action, not on commit errors. TransactionAspectSupport suffers from the same issue.
  4. The Postgres driver signals the error early on without consuming the final response frame so it attempts a transaction cleanup. I filed Await ReadyForQuery before emitting errors from transactional control methods pgjdbc/r2dbc-postgresql#541

The most difficult part was to simplify the reproducer. I'm going to submit a pull request to fix these issues but it can take a while because I'm currently busy with other tasks.

@poutsma poutsma added the in: data Issues in data modules (jdbc, orm, oxm, tx) label Jan 30, 2023
@jhoeller jhoeller added type: bug A general bug and removed status: waiting-for-triage An issue we've not yet triaged or decided on labels Feb 1, 2023
@jhoeller jhoeller added this to the 6.0.x milestone Feb 1, 2023
@sdeleuze sdeleuze modified the milestones: 6.0.x, 6.0.7 Mar 1, 2023
@sdeleuze
Copy link
Contributor

sdeleuze commented Mar 7, 2023

@mp911de If you get the time, please send a PR with your fix and a related test.

@mp911de
Copy link
Member

mp911de commented Mar 9, 2023

I tried to come up with a patch for this one:

Index: spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java
--- a/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java	(revision 63d841664c4b7b11451c25104dc0d66c220ba24f)
+++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java	(date 1677686286349)
@@ -433,7 +433,7 @@
 
 		AtomicBoolean beforeCompletionInvoked = new AtomicBoolean();
 
-		Mono<Object> commit = prepareForCommit(synchronizationManager, status)
+		Mono<Void> commit = prepareForCommit(synchronizationManager, status)
 				.then(triggerBeforeCommit(synchronizationManager, status))
 				.then(triggerBeforeCompletion(synchronizationManager, status))
 				.then(Mono.defer(() -> {
@@ -445,11 +445,11 @@
 						return doCommit(synchronizationManager, status);
 					}
 					return Mono.empty();
-				})).then(Mono.empty().onErrorResume(ex -> {
-					Mono<Object> propagateException = Mono.error(ex);
+				})).onErrorResume(ex -> {
+					Mono<Void> propagateException = Mono.error(ex);
 					// Store result in a local variable in order to appease the
 					// Eclipse compiler with regard to inferred generics.
-					Mono<Object> result = propagateException;
+					Mono<Void> result = propagateException;
 					if (ErrorPredicates.UNEXPECTED_ROLLBACK.test(ex)) {
 						result = triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK)
 								.then(propagateException);
@@ -471,7 +471,7 @@
 					}
 
 					return result;
-				})).then(Mono.defer(() -> triggerAfterCommit(synchronizationManager, status).onErrorResume(ex ->
+				}).then(Mono.defer(() -> triggerAfterCommit(synchronizationManager, status).onErrorResume(ex ->
 						triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED).then(Mono.error(ex)))
 						.then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED))));

The erroneous then(Mono.empty().onErrorResume(…)) needs to be a onErrorResume(…) to properly catch and handle errors if the underlying COMMIT fails. I hadn't had the bandwidth to come up with a proper test, in combination with TransactionalOperator the commit should not lead to another top-level-rollback but rather cleanup the transaction following the ErrorPredicates cleanup mechanisms (after-completion or doRollbackOnCommitException).

@simonbasle simonbasle self-assigned this Mar 9, 2023
@sdeleuze sdeleuze removed their assignment Mar 9, 2023
@simonbasle
Copy link
Contributor

I'll try to come up with a unit test that can reproduce this, and apply @mp911de 's patch (thanks Mark)

simonbasle added a commit to simonbasle/spring-framework that referenced this issue Mar 9, 2023
This change fixes a situation where error handling was skipped during
`processCommit()` in case the `doCommit()` failed. The error handling
was set up via an `onErrorResume` operator that was nested inside a
`then(...)`, applied to an inner `Mono.empty()`. As a consequence,
it would never receive an error signal (effectively decoupling the
onErrorResume from the main chain).

This change simply moves the error handling back one level up. It also
simplifies the `doCommit` code a bit by getting rid of the steps that
artificially introduce a `Mono<Object>` return type, which is not really
needed.

A pre-existing test was missing the fact that the rollback didn't occur,
which is now fixed. Another dedicated test is introduced building upon
the `ReactiveTestTransactionManager` class.

Closes spring-projectsgh-28968
Closes spring-projectsgh-30096
@sdeleuze
Copy link
Contributor

I am closing this issue in favor of the PR #30096.

@sdeleuze sdeleuze closed this as not planned Won't fix, can't repro, duplicate, stale Mar 10, 2023
@sdeleuze sdeleuze removed this from the 6.0.7 milestone Mar 10, 2023
@sdeleuze sdeleuze added the status: duplicate A duplicate of another issue label Mar 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: data Issues in data modules (jdbc, orm, oxm, tx) status: duplicate A duplicate of another issue type: bug A general bug
Projects
None yet
Development

No branches or pull requests

7 participants