Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot invoke "java.lang.Comparable.compareTo(Object)" because "a[runHi]" is null #152

Closed
bilelncq opened this issue Nov 5, 2024 · 6 comments

Comments

@bilelncq
Copy link

bilelncq commented Nov 5, 2024

Bug Report

Versions

r2dbc-proxy 1.1.4.RELEASE and 1.1.5.RELEASE
postgresql: 1.14.3
OS: linux

Current Behavior

im using spring boot 3.2.5 and i have a service that has multiple monos inside of Mono.zip these monos call a repository methods thats a query that returns an object or returns Mono.error with an exception then in my service i do on error resume based on that exception to set some attributes
under load with a flux of 1000 monos i get randomly a null pointer exception that cancels my flux

i get this error in the console

Stacktrace
2024-11-05 17:30:02.845 [reactor-tcp-epoll-5] ERROR r.n.channel.ChannelOperationsHandler - [0876be15, L:/127.0.0.1:51598 - R:localhost/127.0.0.1:32890] Error was received while reading the incoming data. The connection will be closed. 
java.lang.NullPointerException: Cannot invoke "java.lang.Comparable.compareTo(Object)" because "a[runHi]" is null
	at java.base/java.util.ComparableTimSort.countRunAndMakeAscending(ComparableTimSort.java:320)
	at java.base/java.util.ComparableTimSort.sort(ComparableTimSort.java:188)
	at java.base/java.util.Arrays.sort(Arrays.java:1042)
	at io.micrometer.common.KeyValues.<init>(KeyValues.java:47)
	at io.micrometer.common.KeyValues.of(KeyValues.java:269)
	at io.micrometer.observation.Observation$Context.getHighCardinalityKeyValues(Observation.java:1216)
	at io.micrometer.observation.Observation$Context.getAllKeyValues(Observation.java:1232)
	at io.micrometer.tracing.handler.TracingObservationHandler.tagSpan(TracingObservationHandler.java:52)
	at io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler.onStop(PropagatingSenderTracingObservationHandler.java:101)
	at io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler.onStop(PropagatingSenderTracingObservationHandler.java:35)
	at io.micrometer.observation.ObservationHandler$FirstMatchingCompositeObservationHandler.onStop(ObservationHandler.java:197)
	at io.micrometer.observation.SimpleObservation.lambda$notifyOnObservationStopped$0(SimpleObservation.java:268)
	at java.base/java.util.ArrayDeque$DescendingIterator.forEachRemaining(ArrayDeque.java:771)
	at io.micrometer.observation.SimpleObservation.notifyOnObservationStopped(SimpleObservation.java:268)
	at io.micrometer.observation.SimpleObservation.stop(SimpleObservation.java:188)
	at io.r2dbc.proxy.observation.ObservationProxyExecutionListener.afterQuery(ObservationProxyExecutionListener.java:183)
	at io.r2dbc.proxy.listener.CompositeProxyExecutionListener.lambda$afterQuery$3(CompositeProxyExecutionListener.java:58)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at io.r2dbc.proxy.listener.CompositeProxyExecutionListener.afterQuery(CompositeProxyExecutionListener.java:58)
	at io.r2dbc.proxy.callback.AfterQueryCallbackInvoker.afterQuery(AfterQueryCallbackInvoker.java:55)
	at io.r2dbc.proxy.callback.ResultInvocationSubscriber.afterQuery(ResultInvocationSubscriber.java:162)
	at io.r2dbc.proxy.callback.ResultInvocationSubscriber.onComplete(ResultInvocationSubscriber.java:101)
	at io.r2dbc.proxy.callback.MethodInvocationSubscriber.onComplete(MethodInvocationSubscriber.java:94)
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
	at io.r2dbc.proxy.callback.ResultInvocationSubscriber.onComplete(ResultInvocationSubscriber.java:104)
	at io.r2dbc.proxy.callback.MethodInvocationSubscriber.onComplete(MethodInvocationSubscriber.java:94)
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:223)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:261)
	at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onComplete(FluxHandleFuseable.java:239)
	at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onComplete(FluxFilterFuseable.java:391)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940)
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940)
	at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onComplete(FluxDiscardOnCancel.java:104)
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:128)
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:223)
	at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:465)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:871)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:819)
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:249)
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:215)
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.complete(FluxCreate.java:206)
	at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.complete(ReactorNettyClient.java:668)
	at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:934)
	at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:810)
	at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:716)
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:129)
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
	at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294)
	at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403)
	at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:426)
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:801)
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:501)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:399)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:1583)

Table schema

Input Code
SELECT * from my_table where column_1 = :value1 and column_2 = :value2 and date <= :date order by id desc Limit 1;

Expected behavior/code

I expect the flux to finish successfully

Possible Solution

as a workaround for now we set r2dbc.observation.enabled=false to stop the observation

Additional context

When debugging the KeyValues map i see that it has the size 2 but it has only one element with the key r2dbc.query[0]
it looks like concurrency issue but im not sure.
Here in the screen shot the key value object in io.micrometer.common.KeyValues method the parameter passed is still being initialized in the SimpleObservation class stop method which calls the convention.getHighCardinalityKeyValues(context) method related to io.r2dbc.proxy.observation.QueryObservationConvention
Untitled design (1)

@ttddyy
Copy link
Member

ttddyy commented Nov 11, 2024

@bilelncq Thanks for the report.

I'm wondering does each of zipped elements share database connection or does each get different connection?

@bilelncq
Copy link
Author

hello @ttddyy thank you for your reply,
i can confirm that with spring.r2dbc.pool.max-size=1 the issue is not reproduced but with spring.r2dbc.pool.max-size=2 it occurs.

@ttddyy
Copy link
Member

ttddyy commented Nov 13, 2024

Hm, that is strange.
Can you try with the latest version of the r2dbc-pool?

Also, please make sure the r2dbc-proxy Connection is the outer-most Connection.
It should be: "r2dbc-proxy Connection" (outer) -> "r2dbc-pool Connection" -> "driver Connection" (inner)

@bilelncq
Copy link
Author

Sorry for the late reply i didn't have time to recheck, but in my test im using the r2dbc spring boot starter which has the default configuration and i have another implementation with my own connection factory implementation since we have multi tenancy and the issue occurs in both implementations.
I tried the latest r2dbc-pool version 1.0.2.RELEASE and the issue still occurs

@ttddyy
Copy link
Member

ttddyy commented Nov 19, 2024

Most likely this is the same issue as micrometer-metrics/micrometer#4356

@bilelncq
Copy link
Author

Hello @ttddyy i can confirm that the new version 1.14.2 of micrometer fixed the issue, thank you for your help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants