Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelization issues in getLowCardinalityKeyValues #4356

Closed
roookeee opened this issue Nov 15, 2023 · 22 comments
Closed

Parallelization issues in getLowCardinalityKeyValues #4356

roookeee opened this issue Nov 15, 2023 · 22 comments

Comments

@roookeee
Copy link

roookeee commented Nov 15, 2023

Describe the bug
We recently updated to Spring Boot 3.x which changed its metric tagging to be based on the micrometer Observation object.
We are only creating one custom tag with a custom Spring @Bean which was migrated to an ObservationFilter:

@Bean
fun warmupTagObservationFilter(executor: WarmupExecutor): ObservationFilter = ObservationFilter {
    if (it is ServerRequestObservationContext) {
        it.addLowCardinalityKeyValue(KeyValue.of("warmup", "${executor.isInWarmup()}"))
    } else {
        it
    }
}

We are getting the following errors (see collapsible sections below) once in a while which are suspected to be HTTP connection abortions as they happen rarely and are not correlated to service load.

java.lang.ArrayIndexOutOfBoundsException · Index 6 out of bounds for length 6
java.lang.ArrayIndexOutOfBoundsException · Index 6 out of bounds for length 6
LinkedHashMap.java:555 java.util.LinkedHashMap.valuesToArray
LinkedHashMap.java:639 java.util.LinkedHashMap$LinkedValues.toArray
KeyValues.java:268 io.micrometer.common.KeyValues.of
Observation.java:1148 io.micrometer.observation.Observation$Context.getLowCardinalityKeyValues
DefaultMeterObservationHandler.java:87 io.micrometer.core.instrument.observation.DefaultMeterObservationHandler.createTags
DefaultMeterObservationHandler.java:63 io.micrometer.core.instrument.observation.DefaultMeterObservationHandler.onStop
TracingAwareMeterObservationHandler.java:84 io.micrometer.tracing.handler.TracingAwareMeterObservationHandler.onStop
ObservationHandler.java:197 io.micrometer.observation.ObservationHandler$FirstMatchingCompositeObservationHandler.onStop
SimpleObservation.java:276 io.micrometer.observation.SimpleObservation.lambda$notifyOnObservationStopped$0
ArrayDeque.java:772 java.util.ArrayDeque$DescendingIterator.forEachRemaining
SimpleObservation.java:276 io.micrometer.observation.SimpleObservation.notifyOnObservationStopped
SimpleObservation.java:196 io.micrometer.observation.SimpleObservation.stop
ServerHttpObservationFilter.java:129 org.springframework.web.filter.reactive.ServerHttpObservationFilter.onTerminalSignal
ServerHttpObservationFilter.java:117 org.springframework.web.filter.reactive.ServerHttpObservationFilter.lambda$filter$2
FluxDoOnEach.java:186 reactor.core.publisher.FluxDoOnEach$DoOnEachSubscriber.onError
FluxDoOnEach.java:218 reactor.core.publisher.FluxDoOnEach$DoOnEachSubscriber.onComplete
FluxOnAssembly.java:549 reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete
MonoFlatMap.java:250 reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete
MonoFlatMap.java:324 reactor.core.publisher.MonoFlatMap$FlatMapInner.onComplete
MonoFlatMap.java:250 reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete
MonoFlatMap.java:324 reactor.core.publisher.MonoFlatMap$FlatMapInner.onComplete
FluxOnAssembly.java:549 reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete
FluxContextWrite.java:126 reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete
MonoFlatMap.java:250 reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete
MonoFlatMap.java:324 reactor.core.publisher.MonoFlatMap$FlatMapInner.onComplete
FluxContextWrite.java:126 reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete
MonoPeekTerminal.java:299 reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete
MonoFlatMap.java:250 reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete
MonoFlatMap.java:324 reactor.core.publisher.MonoFlatMap$FlatMapInner.onComplete
FluxPeekFuseable.java:277 reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onComplete
MonoPeekTerminal.java:299 reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete
MonoIgnoreElements.java:89 reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete
FluxConcatArray.java:230 reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete
FluxConcatArray.java:78 reactor.core.publisher.FluxConcatArray.subscribe
InternalMonoOperator.java:64 reactor.core.publisher.InternalMonoOperator.subscribe
MonoFlatMap.java:165 reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext
FluxContextWrite.java:107 reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext
Operators.java:2545 reactor.core.publisher.Operators$ScalarSubscription.request
FluxContextWrite.java:136 reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request
MonoFlatMap.java:194 reactor.core.publisher.MonoFlatMap$FlatMapMain.request
MonoPeekTerminal.java:139 reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request
FluxContextWrite.java:136 reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request
MonoFlatMap.java:291 reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe
FluxContextWrite.java:101 reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe
MonoPeekTerminal.java:152 reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe
MonoFlatMap.java:117 reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe
FluxContextWrite.java:101 reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe
MonoJust.java:55 reactor.core.publisher.MonoJust.subscribe
InternalMonoOperator.java:64 reactor.core.publisher.InternalMonoOperator.subscribe
MonoFlatMap.java:165 reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext
FluxSwitchIfEmpty.java:74 reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext
Operators.java:2811 reactor.core.publisher.Operators$MonoInnerProducerBase.complete
MonoSingle.java:180 reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete
FluxMapFuseable.java:152 reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete
Operators.java:2547 reactor.core.publisher.Operators$ScalarSubscription.request
FluxMapFuseable.java:171 reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request
MonoSingle.java:103 reactor.core.publisher.MonoSingle$SingleSubscriber.doOnRequest
Operators.java:2878 reactor.core.publisher.Operators$MonoInnerProducerBase.request
Operators.java:2341 reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set
Operators.java:2215 reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe
MonoSingle.java:115 reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe
FluxMapFuseable.java:96 reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe
MonoJust.java:55 reactor.core.publisher.MonoJust.subscribe
FluxFromMonoOperator.java:83 reactor.core.publisher.FluxFromMonoOperator.subscribe
FluxDeferContextual.java:57 reactor.core.publisher.FluxDeferContextual.subscribe
InternalMonoOperator.java:64 reactor.core.publisher.InternalMonoOperator.subscribe
MonoFlatMap.java:165 reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext
FluxMap.java:122 reactor.core.publisher.FluxMap$MapSubscriber.onNext
MonoCreate.java:172 reactor.core.publisher.MonoCreate$DefaultMonoSink.success
Mono.kt:101 kotlinx.coroutines.reactor.MonoCoroutine.onCompleted
AbstractCoroutine.kt:93 kotlinx.coroutines.AbstractCoroutine.onCompletionInternal
JobSupport.kt:294 kotlinx.coroutines.JobSupport.tryFinalizeSimpleState
JobSupport.kt:856 kotlinx.coroutines.JobSupport.tryMakeCompleting
JobSupport.kt:828 kotlinx.coroutines.JobSupport.makeCompletingOnce$kotlinx_coroutines_core
AbstractCoroutine.kt:100 kotlinx.coroutines.AbstractCoroutine.resumeWith
ContinuationImpl.kt:46 kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith
Scopes.kt:33 kotlinx.coroutines.internal.ScopeCoroutine.afterResume
AbstractCoroutine.kt:102 kotlinx.coroutines.AbstractCoroutine.resumeWith
ContinuationImpl.kt:46 kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith
DispatchedTask.kt:234 kotlinx.coroutines.DispatchedTaskKt.resume
DispatchedTask.kt:190 kotlinx.coroutines.DispatchedTaskKt.resumeUnconfined
DispatchedTask.kt:161 kotlinx.coroutines.DispatchedTaskKt.dispatch
CancellableContinuationImpl.kt:397 kotlinx.coroutines.CancellableContinuationImpl.dispatchResume
CancellableContinuationImpl.kt:431 kotlinx.coroutines.CancellableContinuationImpl.resumeImpl
CancellableContinuationImpl.kt:420 kotlinx.coroutines.CancellableContinuationImpl.resumeImpl$default
CancellableContinuationImpl.kt:328 kotlinx.coroutines.CancellableContinuationImpl.resumeWith
Future.kt:188 kotlinx.coroutines.future.ContinuationConsumer.accept
Future.kt:180 kotlinx.coroutines.future.ContinuationConsumer.accept
CompletableFuture.java:863 java.util.concurrent.CompletableFuture.uniWhenComplete
CompletableFuture.java:841 java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire
CompletableFuture.java:510 java.util.concurrent.CompletableFuture.postComplete
CompletableFuture.java:2147 java.util.concurrent.CompletableFuture.complete
AbstractAsyncExecutionStrategy.java:38 graphql.execution.AbstractAsyncExecutionStrategy.lambda$handleResults$0
CompletableFuture.java:863 java.util.concurrent.CompletableFuture.uniWhenComplete
CompletableFuture.java:887 java.util.concurrent.CompletableFuture.uniWhenCompleteStage
CompletableFuture.java:2325 java.util.concurrent.CompletableFuture.whenComplete
AsyncExecutionStrategy.java:72 graphql.execution.AsyncExecutionStrategy.lambda$execute$1
CompletableFuture.java:863 java.util.concurrent.CompletableFuture.uniWhenComplete
CompletableFuture.java:841 java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire
CompletableFuture.java:510 java.util.concurrent.CompletableFuture.postComplete
CompletableFuture.java:2147 java.util.concurrent.CompletableFuture.complete
Async.java:87 graphql.execution.Async$Single.lambda$await$1
CompletableFuture.java:863 java.util.concurrent.CompletableFuture.uniWhenComplete
CompletableFuture.java:841 java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire
CompletableFuture.java:510 java.util.concurrent.CompletableFuture.postComplete
CompletableFuture.java:2147 java.util.concurrent.CompletableFuture.complete
DataLoaderHelper.java:259 org.dataloader.DataLoaderHelper.lambda$dispatchQueueBatch$2
CompletableFuture.java:646 java.util.concurrent.CompletableFuture$UniApply.tryFire
CompletableFuture.java:510 java.util.concurrent.CompletableFuture.postComplete
CompletableFuture.java:1773 java.util.concurrent.CompletableFuture$AsyncSupply.run
ThreadPoolExecutor.java:1136 java.util.concurrent.ThreadPoolExecutor.runWorker
ThreadPoolExecutor.java:635 java.util.concurrent.ThreadPoolExecutor$Worker.run
Thread.java:840 java.lang.Thread.run
java.lang.NullPointerException · Cannot invoke "java.lang.Comparable.compareTo(Object)" because "a[runHi]" is null
 java.lang.NullPointerException · Cannot invoke "java.lang.Comparable.compareTo(Object)" because "a[runHi]" is null
ComparableTimSort.java:325 java.util.ComparableTimSort.countRunAndMakeAscending
ComparableTimSort.java:188 java.util.ComparableTimSort.sort
Arrays.java:1041 java.util.Arrays.sort
KeyValues.java:47 io.micrometer.common.KeyValues.<init>
KeyValues.java:268 io.micrometer.common.KeyValues.of
Observation.java:1148 io.micrometer.observation.Observation$Context.getLowCardinalityKeyValues
DefaultMeterObservationHandler.java:87 io.micrometer.core.instrument.observation.DefaultMeterObservationHandler.createTags
DefaultMeterObservationHandler.java:63 io.micrometer.core.instrument.observation.DefaultMeterObservationHandler.onStop
TracingAwareMeterObservationHandler.java:84 io.micrometer.tracing.handler.TracingAwareMeterObservationHandler.onStop
ObservationHandler.java:197 io.micrometer.observation.ObservationHandler$FirstMatchingCompositeObservationHandler.onStop
SimpleObservation.java:276 io.micrometer.observation.SimpleObservation.lambda$notifyOnObservationStopped$0
ArrayDeque.java:772 java.util.ArrayDeque$DescendingIterator.forEachRemaining
SimpleObservation.java:276 io.micrometer.observation.SimpleObservation.notifyOnObservationStopped
SimpleObservation.java:196 io.micrometer.observation.SimpleObservation.stop
ServerHttpObservationFilter.java:129 org.springframework.web.filter.reactive.ServerHttpObservationFilter.onTerminalSignal
ServerHttpObservationFilter.java:117 org.springframework.web.filter.reactive.ServerHttpObservationFilter.lambda$filter$2
FluxDoOnEach.java:186 reactor.core.publisher.FluxDoOnEach$DoOnEachSubscriber.onError
FluxDoOnEach.java:218 reactor.core.publisher.FluxDoOnEach$DoOnEachSubscriber.onComplete
FluxOnAssembly.java:549 reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete
MonoFlatMap.java:250 reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete
MonoFlatMap.java:324 reactor.core.publisher.MonoFlatMap$FlatMapInner.onComplete
MonoFlatMap.java:250 reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete
MonoFlatMap.java:324 reactor.core.publisher.MonoFlatMap$FlatMapInner.onComplete
FluxOnAssembly.java:549 reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete
FluxContextWrite.java:126 reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete
MonoFlatMap.java:250 reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete
MonoFlatMap.java:324 reactor.core.publisher.MonoFlatMap$FlatMapInner.onComplete
FluxContextWrite.java:126 reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete
MonoPeekTerminal.java:299 reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete
MonoFlatMap.java:250 reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete
MonoFlatMap.java:324 reactor.core.publisher.MonoFlatMap$FlatMapInner.onComplete
FluxPeekFuseable.java:277 reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onComplete
MonoPeekTerminal.java:299 reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete
MonoIgnoreElements.java:89 reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete
FluxConcatArray.java:230 reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete
FluxConcatArray.java:78 reactor.core.publisher.FluxConcatArray.subscribe
InternalMonoOperator.java:64 reactor.core.publisher.InternalMonoOperator.subscribe
MonoFlatMap.java:165 reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext
FluxContextWrite.java:107 reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext
Operators.java:2545 reactor.core.publisher.Operators$ScalarSubscription.request
FluxContextWrite.java:136 reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request
MonoFlatMap.java:194 reactor.core.publisher.MonoFlatMap$FlatMapMain.request
MonoPeekTerminal.java:139 reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request
FluxContextWrite.java:136 reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request
MonoFlatMap.java:291 reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe
FluxContextWrite.java:101 reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe
MonoPeekTerminal.java:152 reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe
MonoFlatMap.java:117 reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe
FluxContextWrite.java:101 reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe
MonoJust.java:55 reactor.core.publisher.MonoJust.subscribe
InternalMonoOperator.java:64 reactor.core.publisher.InternalMonoOperator.subscribe
MonoFlatMap.java:165 reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext
FluxSwitchIfEmpty.java:74 reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext
Operators.java:2811 reactor.core.publisher.Operators$MonoInnerProducerBase.complete
MonoSingle.java:180 reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete
FluxMapFuseable.java:152 reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete
Operators.java:2547 reactor.core.publisher.Operators$ScalarSubscription.request
FluxMapFuseable.java:171 reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request
MonoSingle.java:103 reactor.core.publisher.MonoSingle$SingleSubscriber.doOnRequest
Operators.java:2878 reactor.core.publisher.Operators$MonoInnerProducerBase.request
Operators.java:2341 reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set
Operators.java:2215 reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe
MonoSingle.java:115 reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe
FluxMapFuseable.java:96 reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe
MonoJust.java:55 reactor.core.publisher.MonoJust.subscribe
FluxFromMonoOperator.java:83 reactor.core.publisher.FluxFromMonoOperator.subscribe
FluxDeferContextual.java:57 reactor.core.publisher.FluxDeferContextual.subscribe
InternalMonoOperator.java:64 reactor.core.publisher.InternalMonoOperator.subscribe
MonoFlatMap.java:165 reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext
FluxMap.java:122 reactor.core.publisher.FluxMap$MapSubscriber.onNext
MonoCreate.java:172 reactor.core.publisher.MonoCreate$DefaultMonoSink.success
Mono.kt:101 kotlinx.coroutines.reactor.MonoCoroutine.onCompleted
AbstractCoroutine.kt:93 kotlinx.coroutines.AbstractCoroutine.onCompletionInternal
JobSupport.kt:294 kotlinx.coroutines.JobSupport.tryFinalizeSimpleState
JobSupport.kt:856 kotlinx.coroutines.JobSupport.tryMakeCompleting
JobSupport.kt:828 kotlinx.coroutines.JobSupport.makeCompletingOnce$kotlinx_coroutines_core
AbstractCoroutine.kt:100 kotlinx.coroutines.AbstractCoroutine.resumeWith
ContinuationImpl.kt:46 kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith
Scopes.kt:33 kotlinx.coroutines.internal.ScopeCoroutine.afterResume
AbstractCoroutine.kt:102 kotlinx.coroutines.AbstractCoroutine.resumeWith
ContinuationImpl.kt:46 kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith
DispatchedTask.kt:234 kotlinx.coroutines.DispatchedTaskKt.resume
DispatchedTask.kt:190 kotlinx.coroutines.DispatchedTaskKt.resumeUnconfined
DispatchedTask.kt:161 kotlinx.coroutines.DispatchedTaskKt.dispatch
CancellableContinuationImpl.kt:397 kotlinx.coroutines.CancellableContinuationImpl.dispatchResume
CancellableContinuationImpl.kt:431 kotlinx.coroutines.CancellableContinuationImpl.resumeImpl
CancellableContinuationImpl.kt:420 kotlinx.coroutines.CancellableContinuationImpl.resumeImpl$default
CancellableContinuationImpl.kt:328 kotlinx.coroutines.CancellableContinuationImpl.resumeWith
Future.kt:188 kotlinx.coroutines.future.ContinuationConsumer.accept
Future.kt:180 kotlinx.coroutines.future.ContinuationConsumer.accept
CompletableFuture.java:863 java.util.concurrent.CompletableFuture.uniWhenComplete
CompletableFuture.java:841 java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire
CompletableFuture.java:510 java.util.concurrent.CompletableFuture.postComplete
CompletableFuture.java:2147 java.util.concurrent.CompletableFuture.complete
AbstractAsyncExecutionStrategy.java:38 graphql.execution.AbstractAsyncExecutionStrategy.lambda$handleResults$0
CompletableFuture.java:863 java.util.concurrent.CompletableFuture.uniWhenComplete
CompletableFuture.java:887 java.util.concurrent.CompletableFuture.uniWhenCompleteStage
CompletableFuture.java:2325 java.util.concurrent.CompletableFuture.whenComplete
AsyncExecutionStrategy.java:72 graphql.execution.AsyncExecutionStrategy.lambda$execute$1
CompletableFuture.java:863 java.util.concurrent.CompletableFuture.uniWhenComplete
CompletableFuture.java:841 java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire
CompletableFuture.java:510 java.util.concurrent.CompletableFuture.postComplete
CompletableFuture.java:2147 java.util.concurrent.CompletableFuture.complete
Async.java:87 graphql.execution.Async$Single.lambda$await$1
CompletableFuture.java:863 java.util.concurrent.CompletableFuture.uniWhenComplete
CompletableFuture.java:841 java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire
CompletableFuture.java:510 java.util.concurrent.CompletableFuture.postComplete
CompletableFuture.java:2147 java.util.concurrent.CompletableFuture.complete
DataLoaderHelper.java:259 org.dataloader.DataLoaderHelper.lambda$dispatchQueueBatch$2
CompletableFuture.java:646 java.util.concurrent.CompletableFuture$UniApply.tryFire
CompletableFuture.java:510 java.util.concurrent.CompletableFuture.postComplete
CompletableFuture.java:1773 java.util.concurrent.CompletableFuture$AsyncSupply.run
ThreadPoolExecutor.java:1136 java.util.concurrent.ThreadPoolExecutor.runWorker
ThreadPoolExecutor.java:635 java.util.concurrent.ThreadPoolExecutor$Worker.run
Thread.java:840 java.lang.Thread.run

We dug a bit and determined that the index out of bounds error can only error when addLowCardinalityKeyValue and getLowCardinalityKeyValues are called in parallel, see the following micrometer Observation.Context.getLowCardinalityKeyValues() codeflow and inline comments:

// note: these are excerpts from micrometer and java stdlib classes

public KeyValues Observation.Context.getLowCardinalityKeyValues() {
            return KeyValues.of(this.lowCardinalityKeyValues.values());
}

public static KeyValues of(@Nullable Iterable<? extends KeyValue> keyValues) {
    // other branches omitted
    else if (keyValues instanceof Collection) {
        Collection<? extends KeyValue> keyValuesCollection = (Collection<? extends KeyValue>) keyValues;
        return new KeyValues(keyValuesCollection.toArray(new KeyValue[0]));
    }
    // other branches omitted
}

public <T> T[] LinkedHashMap$LinkedValues.toArray(T[] a) {
  // prepareArray creates an appropriately sized array as a 0 length array was passed
  return valuesToArray(prepareArray(a));
}

@Override
final <T> T[] LinkedHashMap.valuesToArray(T[] a) {
    Object[] r = a;
    int idx = 0;
    // this can only index out of bound when prepareArray returned an array of size X and the map was modified after prepareArray was called
    for (LinkedHashMap.Entry<K,V> e = head; e != null; e = e.after) {
        r[idx++] = e.value;
    }
    return a;
}

We found a closed issue for the other error, but that issue was closed without solving the error for us.

Environment

  • Micrometer version: 1.11.5
  • Micrometer registry: prometheus
  • OS: Linux
  • Java version: 17

Minimal reproducer
We are trying to create a minimal producer but this error is only occuring 10 times over a period of 24 hours with a constant load of > 300 requests / second and peak load of around 1500 requests / second (across 4 instances of our service).

Expected behavior
Addind custom tags via a ObservationFilter should not create errors.

Additional context
We currently expect this issue to occur when an inflight http request of a client is terminated abnormally. We want to tag such failing requests and their metrics anyways. We just thought this might help find the root cause.

@jonatan-ivanov
Copy link
Member

Observation.Context is not thread-safe (it shouldn't be), its methods should not be called concurrently.

We are trying to create a minimal producer

That would be appreciated.

Addind custom tags via a ObservationFilter should not create errors.

I doubt this is caused by the filter. Check what happens when you stop an Observation: the filter is called first then the handlers are notified sequentially. This cannot happen concurrently.

I think this is caused by something adding a KeyValue to the LinkedHashMap in Observation.Context concurrently from another thread outside of Micrometer.

Could you please upgrade Spring and Micrometer first and check if it is still an issue?

@jonatan-ivanov jonatan-ivanov added waiting for feedback We need additional information before we can continue and removed waiting-for-triage labels Nov 15, 2023
@AndreasEK
Copy link

Hi @jonatan-ivanov , thank's for taking a look at this. I'm a colleague of roookeee, who is ooo today, so let me jump in.
As far as I understand the issue, the Observation.Context is (a) added a custom tag via the filter, while at the same time the client connection crashed, so (b) the observation is stopped.

=> Both the NullPointerException and the IndexOutOfBounds originated from:
ServerHttpObservationFilter.java:129 org.springframework.web.filter.reactive.ServerHttpObservationFilter.onTerminalSignal

I understand this is a super rare case, and thus we only see this phenomenon a few times a day. My assumption is, that the onTerminalSignal is running in another thread than the observation, hence it can happen that both are modifying/reading the underlying LinkedHashMap simultaneously.

Are there other means for us to make sure that usage pattern cannot happen? Like injecting a ConcurrentHashMap, overriding the SimpleObserver, … ?

This comment was marked as outdated.

This comment was marked as outdated.

@markrichardstm
Copy link

I am seeing a similar problem

In app code that has been working great until we upgraded Spring Boot (including an upgrade of Micrometer)

The use of observation is encapsulated in a method like so:

 private final ObservationRegistry registry = [default provided in Spring Boot]
 private final Scheduler customScheduler = Schedulers.fromExecutor(ExecutorServiceMetrics.monitor(new ThreadPoolExecutor([at least 10 threads]));
 public <T> Mono<T> observe(String name, Mono<T> mono) {
    return Mono.deferContextual(view -> mono.publishOn(customScheduler).contextWrite(view).name(name)
        .tap(observation(registry)).subscribeOn(customScheduler));
  }

Where webflux controllers call functionality that uses the observe function in various places that wrap IO operations we want to be sure are not happening on the web flux threads.

The upgrade notable consists of

  • Spring Boot 3.3.5 -> 3.4.0
  • micrometer 1.13.7 -> 1.14.1
  • micrometer tracing 1.13.6 -> 1.14.0
  • opentelemetry 1.44.1
  • opentelemetry java instrumentation + agent 2.10

We get many exceptions of type ArrayIndexOutOfBoundsException as the MicrometerObservationListener adds tags to an observation whilst a TracingObservationHandler tries to read them.

There is some magic happening above these (opentelemetry agent has instrumented itself into the flow), so perhaps this is a bug in how they have hooked in?

Using a debugger with only current thread suspended, I captured the following.

addLowCardinalityKeyValue:1115, Observation$Context (io.micrometer.observation)
lowCardinalityKeyValue:118, SimpleObservation (io.micrometer.observation)
lowCardinalityKeyValue:363, Observation (io.micrometer.observation)
doOnComplete:202, MicrometerObservationListener (reactor.core.observability.micrometer)
onComplete:279, FluxTap$TapSubscriber (reactor.core.publisher)
onComplete:92, TracingSubscriber (io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1)
onComplete:147, FluxHide$SuppressFuseableSubscriber (reactor.core.publisher)
onComplete:126, FluxContextWrite$ContextWriteSubscriber (reactor.core.publisher)
onComplete:92, TracingSubscriber (io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1)
run:190, MonoPublishOn$PublishOnSubscriber (reactor.core.publisher)
run:373, ContextPropagationOperator$RunnableWrapper (io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1)
call:68, SchedulerTask (reactor.core.scheduler)
call:28, SchedulerTask (reactor.core.scheduler)
call:51, TimedCallable (io.micrometer.core.instrument.internal)
run:317, FutureTask (java.util.concurrent)
runWorker:1144, ThreadPoolExecutor (java.util.concurrent)
run:642, ThreadPoolExecutor$Worker (java.util.concurrent)
runWith:1596, Thread (java.lang)
run:1583, Thread (java.lang)

and at the same time

getLowCardinalityKeyValues:1210, Observation$Context (io.micrometer.observation)
getAllKeyValues:1232, Observation$Context (io.micrometer.observation)
matchingBaggageKeyValues:114, RevertingScope (io.micrometer.tracing.handler)
maybeWithBaggage:81, RevertingScope (io.micrometer.tracing.handler)
setMaybeScopeOnTracingContext:100, TracingObservationHandler (io.micrometer.tracing.handler)
onScopeOpened:82, TracingObservationHandler (io.micrometer.tracing.handler)
onScopeOpened:173, ObservationHandler$FirstMatchingCompositeObservationHandler (io.micrometer.observation)
notifyOnScopeOpened:235, SimpleObservation (io.micrometer.observation)
openScope:194, SimpleObservation (io.micrometer.observation)
setValue:117, ObservationThreadLocalAccessor (io.micrometer.observation.contextpropagation)
setValue:33, ObservationThreadLocalAccessor (io.micrometer.observation.contextpropagation)
setThreadLocal:102, DefaultContextSnapshot (io.micrometer.context)
setAllThreadLocalsFrom:125, DefaultContextSnapshotFactory (io.micrometer.context)
setThreadLocalsFrom:109, DefaultContextSnapshotFactory (io.micrometer.context)
restoreThreadLocals:366, ContextPropagation$ContextRestore103SignalListener (reactor.core.publisher)
doOnRequest:271, ContextPropagation$ContextRestoreSignalListener (reactor.core.publisher)
request:301, FluxTap$TapSubscriber (reactor.core.publisher)
lambda$trySchedule$0:193, MonoSubscribeOn$SubscribeOnSubscriber (reactor.core.publisher)
run:-1, MonoSubscribeOn$SubscribeOnSubscriber$$Lambda/0x0000004001d7b6d0 (reactor.core.publisher)
run:373, ContextPropagationOperator$RunnableWrapper (io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1)
call:84, WorkerTask (reactor.core.scheduler)
call:37, WorkerTask (reactor.core.scheduler)
call:51, TimedCallable (io.micrometer.core.instrument.internal)
run:317, FutureTask (java.util.concurrent)
runWorker:1144, ThreadPoolExecutor (java.util.concurrent)
run:642, ThreadPoolExecutor$Worker (java.util.concurrent)
runWith:1596, Thread (java.lang)
run:1583, Thread (java.lang)

and also at the same time

getLowCardinalityKeyValues:1210, Observation$Context (io.micrometer.observation)
getAllKeyValues:1232, Observation$Context (io.micrometer.observation)
matchingBaggageKeyValues:114, RevertingScope (io.micrometer.tracing.handler)
maybeWithBaggage:81, RevertingScope (io.micrometer.tracing.handler)
setMaybeScopeOnTracingContext:100, TracingObservationHandler (io.micrometer.tracing.handler)
onScopeOpened:82, TracingObservationHandler (io.micrometer.tracing.handler)
onScopeOpened:173, ObservationHandler$FirstMatchingCompositeObservationHandler (io.micrometer.observation)
notifyOnScopeOpened:235, SimpleObservation (io.micrometer.observation)
openScope:194, SimpleObservation (io.micrometer.observation)
setValue:117, ObservationThreadLocalAccessor (io.micrometer.observation.contextpropagation)
setValue:33, ObservationThreadLocalAccessor (io.micrometer.observation.contextpropagation)
setThreadLocal:102, DefaultContextSnapshot (io.micrometer.context)
setAllThreadLocalsFrom:125, DefaultContextSnapshotFactory (io.micrometer.context)
setThreadLocalsFrom:109, DefaultContextSnapshotFactory (io.micrometer.context)
restoreThreadLocals:366, ContextPropagation$ContextRestore103SignalListener (reactor.core.publisher)
doAfterComplete:306, ContextPropagation$ContextRestoreSignalListener (reactor.core.publisher)
onComplete:289, FluxTap$TapSubscriber (reactor.core.publisher)
onComplete:92, TracingSubscriber (io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1)
onComplete:147, FluxHide$SuppressFuseableSubscriber (reactor.core.publisher)
onComplete:126, FluxContextWrite$ContextWriteSubscriber (reactor.core.publisher)
onComplete:92, TracingSubscriber (io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1)
run:182, MonoPublishOn$PublishOnSubscriber (reactor.core.publisher)
run:373, ContextPropagationOperator$RunnableWrapper (io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1)
call:68, SchedulerTask (reactor.core.scheduler)
call:28, SchedulerTask (reactor.core.scheduler)
call:51, TimedCallable (io.micrometer.core.instrument.internal)
run:317, FutureTask (java.util.concurrent)
runWorker:1144, ThreadPoolExecutor (java.util.concurrent)
run:642, ThreadPoolExecutor$Worker (java.util.concurrent)
runWith:1596, Thread (java.lang)
run:1583, Thread (java.lang)

I don't believe our code is doing anything it is not supposed to, that instead we have chanced on a bug somewhere between Spring, Micrometer and Opentelemetry: Micrometer looks most likely given its proximity in the stack to the calling of the methods here.

Any help would be greatly appreciated; it has been great to use this instrumentation.

@ttddyy
Copy link
Contributor

ttddyy commented Nov 29, 2024

I believe the use of publishOn and subscribeOn might be causing this multithreading issue.
(Possibly using them within deferContextual could also have an influence.)

The first stacktrace originates from the HTTP server processing the request.
The second stacktrace includes MonoSubscribeOn$SubscribeOnSubscriber$$Lambda/0x0000004001d7b6d0, indicating a connection to subscribeOn.
The thirdstack trace shows MonoPublishOn$PublishOnSubscriber, which is associated with publishOn.

Could the use of publishOn and subscribeOn (potentially within deferContextual) be leading to concurrent processing?

I'm not very familiar with Reactor internals—perhaps @chemicL could provide some insight here?

@chemicL
Copy link

chemicL commented Nov 29, 2024

Hey, nothing here rings a bell outside of the suspicious
onComplete:92, TracingSubscriber (io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1)
as far as I can tell this is an alternative to Micrometer and not something that should work alongside it - perhaps they're stepping on each-others' toes?

@ttddyy
Copy link
Contributor

ttddyy commented Nov 30, 2024

Looking at the OTel code, the ContextPropagationOperator here registers a TracingSubscriber here.

The TracingSubscriber opens a scope for each operation, such as onSubscribe, onNext, and onComplete.
As a result, when publishOn and subscribeOn operators perform their operations, each of them opens a scope. Micrometer's TracingObservationHandler reacts to these scopes and invokes getLowCardinalityKeyValues.

I'm not sure why publishOn, subscribeOn, and the main http request handling are happening concurrently, but I think it is the reason for the concurrent access to the key-values.

@shakuzen
Copy link
Member

shakuzen commented Dec 2, 2024

I'd have the same guess as @chemicL. It looks like Reactor is being double instrumented - by Micrometer (the tap(observation(...)) part) and by the OTel Java Agent (the ContextPropagationOperator).
@markrichardstm could you try disabling one of those and see if the issue still happens?

@markrichardstm
Copy link

@shakuzen

I disabled the opentelemetry reactor integration and still see the ArrayIndexOutOfBoundsException

But the stack traces are now like so...

Add:

addLowCardinalityKeyValue:1115, Observation$Context (io.micrometer.observation)
lowCardinalityKeyValue:118, SimpleObservation (io.micrometer.observation)
lowCardinalityKeyValue:363, Observation (io.micrometer.observation)
doOnComplete:202, MicrometerObservationListener (reactor.core.observability.micrometer)
onComplete:279, FluxTap$TapSubscriber (reactor.core.publisher)
onComplete:126, FluxContextWrite$ContextWriteSubscriber (reactor.core.publisher)
run:190, MonoPublishOn$PublishOnSubscriber (reactor.core.publisher)
call:68, SchedulerTask (reactor.core.scheduler)
call:28, SchedulerTask (reactor.core.scheduler)
call:51, TimedCallable (io.micrometer.core.instrument.internal)
run:317, FutureTask (java.util.concurrent)
runWorker:1144, ThreadPoolExecutor (java.util.concurrent)
run:642, ThreadPoolExecutor$Worker (java.util.concurrent)
runWith:1596, Thread (java.lang)
run:1583, Thread (java.lang)

Get:

getLowCardinalityKeyValues:1210, Observation$Context (io.micrometer.observation)
getAllKeyValues:1232, Observation$Context (io.micrometer.observation)
matchingBaggageKeyValues:114, RevertingScope (io.micrometer.tracing.handler)
maybeWithBaggage:81, RevertingScope (io.micrometer.tracing.handler)
setMaybeScopeOnTracingContext:100, TracingObservationHandler (io.micrometer.tracing.handler)
onScopeOpened:82, TracingObservationHandler (io.micrometer.tracing.handler)
onScopeOpened:173, ObservationHandler$FirstMatchingCompositeObservationHandler (io.micrometer.observation)
notifyOnScopeOpened:235, SimpleObservation (io.micrometer.observation)
openScope:194, SimpleObservation (io.micrometer.observation)
setValue:117, ObservationThreadLocalAccessor (io.micrometer.observation.contextpropagation)
setValue:33, ObservationThreadLocalAccessor (io.micrometer.observation.contextpropagation)
setThreadLocal:102, DefaultContextSnapshot (io.micrometer.context)
setAllThreadLocalsFrom:125, DefaultContextSnapshotFactory (io.micrometer.context)
setThreadLocalsFrom:109, DefaultContextSnapshotFactory (io.micrometer.context)
restoreThreadLocals:366, ContextPropagation$ContextRestore103SignalListener (reactor.core.publisher)
doOnRequest:271, ContextPropagation$ContextRestoreSignalListener (reactor.core.publisher)
request:301, FluxTap$TapSubscriber (reactor.core.publisher)
lambda$trySchedule$0:193, MonoSubscribeOn$SubscribeOnSubscriber (reactor.core.publisher)
run:-1, MonoSubscribeOn$SubscribeOnSubscriber$$Lambda/0x0000002001d84460 (reactor.core.publisher)
call:84, WorkerTask (reactor.core.scheduler)
call:37, WorkerTask (reactor.core.scheduler)
call:51, TimedCallable (io.micrometer.core.instrument.internal)
run:317, FutureTask (java.util.concurrent)
runWorker:1144, ThreadPoolExecutor (java.util.concurrent)
run:642, ThreadPoolExecutor$Worker (java.util.concurrent)
runWith:1596, Thread (java.lang)
run:1583, Thread (java.lang)

And another get:

getLowCardinalityKeyValues:1210, Observation$Context (io.micrometer.observation)
getAllKeyValues:1232, Observation$Context (io.micrometer.observation)
matchingBaggageKeyValues:114, RevertingScope (io.micrometer.tracing.handler)
maybeWithBaggage:81, RevertingScope (io.micrometer.tracing.handler)
setMaybeScopeOnTracingContext:100, TracingObservationHandler (io.micrometer.tracing.handler)
onScopeOpened:82, TracingObservationHandler (io.micrometer.tracing.handler)
onScopeOpened:173, ObservationHandler$FirstMatchingCompositeObservationHandler (io.micrometer.observation)
notifyOnScopeOpened:235, SimpleObservation (io.micrometer.observation)
openScope:194, SimpleObservation (io.micrometer.observation)
setValue:117, ObservationThreadLocalAccessor (io.micrometer.observation.contextpropagation)
setValue:33, ObservationThreadLocalAccessor (io.micrometer.observation.contextpropagation)
setThreadLocal:102, DefaultContextSnapshot (io.micrometer.context)
setAllThreadLocalsFrom:125, DefaultContextSnapshotFactory (io.micrometer.context)
setThreadLocalsFrom:109, DefaultContextSnapshotFactory (io.micrometer.context)
restoreThreadLocals:366, ContextPropagation$ContextRestore103SignalListener (reactor.core.publisher)
doAfterComplete:306, ContextPropagation$ContextRestoreSignalListener (reactor.core.publisher)
onComplete:289, FluxTap$TapSubscriber (reactor.core.publisher)
onComplete:126, FluxContextWrite$ContextWriteSubscriber (reactor.core.publisher)
run:182, MonoPublishOn$PublishOnSubscriber (reactor.core.publisher)
call:68, SchedulerTask (reactor.core.scheduler)
call:28, SchedulerTask (reactor.core.scheduler)
call:51, TimedCallable (io.micrometer.core.instrument.internal)
run:317, FutureTask (java.util.concurrent)
runWorker:1144, ThreadPoolExecutor (java.util.concurrent)
run:642, ThreadPoolExecutor$Worker (java.util.concurrent)
runWith:1596, Thread (java.lang)
run:1583, Thread (java.lang)

@chemicL
Copy link

chemicL commented Dec 2, 2024

Thanks for doing that @markrichardstm

That clears up way for further investigation :) I believe there is merit in what @ttddyy noticed. The use of publishOn in combination with subscribeOn should never be a problem, even if you add in defer to the mix. However, it's worth noting you are using a custom Scheduler based on an Executor. That bit is worrying, since (as per our javadoc):

	 * <p>Tasks scheduled with workers of this Scheduler are not guaranteed to run in FIFO
	 * order and strictly non-concurrently.
	 * If FIFO order is desired, use trampoline parameter of {@link Schedulers#fromExecutor(Executor, boolean)}

Consider changing the below:

Scheduler customScheduler = Schedulers.fromExecutor(ExecutorServiceMetrics.monitor(new ThreadPoolExecutor([at least 10 threads]));

to use Schedulers.fromExecutorService:

Scheduler customScheduler = Schedulers.fromExecutorService(ExecutorServiceMetrics.monitor(new ThreadPoolExecutor([at least 10 threads]));

I am assuming you can get back an ExecutorService back from ExecutorServiceMetrics.monitor() since it seems the overloads allow it.

Why would this help? Executing things in FIFO order and in isolation is essential in providing reactive-streams' guarantees and the concurrent execution (assuming they're happening against the same instance) of both operators from the same chain should not be the case.

@markrichardstm
Copy link

Thanks @chemicL

fromExecutorService didn't seem to work.

I'm not sure I understand the FIFO comment as this wraps a single Mono? I don't suppose you have any further documentation on this I could read on?

We are executing a mono flow that involves multiple calls to (sometimes different) async IO clients and this means it has to jump threads to avoid blocking.

The following is a contrived method, but I managed to see the same concurrent activity using a debugger with this

Note: the mono argument is completed via a Mono.fromCompletionStage(() -> CompletableFuture...)

  public <T> Mono<T> observe(String name, Mono<T> mono) {
    return Mono.deferContextual(view -> Mono.delay(Duration.ofMillis(10)).then(mono).contextWrite(view).name(name)
        .tap(observation(registry)).flatMap(x -> Mono.delay(Duration.ofMillis(10)).thenReturn(x)));
  }

As you can see this includes no custom schedulers, just the default delay scheduler simulating additional async activity happening.

@chemicL
Copy link

chemicL commented Dec 2, 2024

And in the code that uses fromExecutorService if you instead use Schedulers.boundedElastic just for the sake of it, do you run into the issue?

For the Mono.fromCompletionStage - it should not be an issue either. But interestingly this example does not involve subscribeOn nor publishOn. Can you perhaps provide a minimal reproducer as a zip/github repository?

@markrichardstm
Copy link

markrichardstm commented Dec 3, 2024

parallel.zip

Here is a minimal example that you can play with and I asked someone to test and they could reproduce so it should work.

I had trouble recreating this without Mono.toFuture() being called so perhaps the problem is related to that?

Hopefully, the HELP.md explains how to use this, but please note the instructions regarding breakpoints, they must not suspend all threads (else that will prevent race conditions whilst debugging)

@markrichardstm
Copy link

@chemicL sorry I forgot swap to boundedElastic in that example, but checking today it suffers the problem too.

You can swap by replacing

this.customScheduler = customScheduler;

with

this.customScheduler = Schedulers.boundedElastic();

@chemicL
Copy link

chemicL commented Dec 5, 2024

@markrichardstm I had a look at the project you provided and have some observations (wink, wink).

First, I had a suspicion that the problem is the observe method:

    public <T> Mono<T> observe(String name, Mono<T> mono) {
        return deferContextual(view -> mono.publishOn(customScheduler).contextWrite(view).name(name)
                .tap(observation(registry)).subscribeOn(customScheduler));
    }

Let me annotate it a bit:

    public <T> Mono<T> observe(String name, Mono<T> mono) {
	    return deferContextual(view -> mono.publishOn(customScheduler)
	                                       .contextWrite(view) // <1>
	                                       .name(name)
	                                       .tap(observation(registry)) // <2>
	                                       .subscribeOn(customScheduler));
    }

We should analyze the chain returned from the deferContextual's lambda argument from the bottom, but first let's discuss deferContextual. Subscribing to Mono.deferContextual uses the Reactor's Context from downstream the chain, so its Subscriber's Context. This downstream Context is available in view, let's call it CONTEXT_PARENT.

Let's head on to the inner chain:

  • In <2> the downstream Context is modified and a child Observation is created and is present in the Context seen upstream (in the preceding lines of code) - Let's call it CONTEXT_CHILD.
  • In <1> this instruction replaces contents of CONTEXT_CHILD with contents of CONTEXT_PARENT. Specifically, it replaces the child Observation with its parent Observation for the use of the upstream chain.

I believe this is absolutely undesired and breaks the expectations that reactive-streams build upon - it can no longer be Thread agnostic as the state is mutable:

  • t0: delayedStringMono("a") delivers the result on Thread_1 and triggers delayedStringMono("ab")
  • t1: delayedStringMono("ab") delivers its result on Thread_2 and completes, triggering Observation mutation
  • t1: delayedStringMono("a") completes its execution, triggering Observation mutation.

As you can see, this comes from the mistake of replacing the ContextView instead of relying on the tap operator to spawn child Observation. Unfortunately, this was not the actual issue (yet still worth fixing on your end :))

Even when we fix the above and we also fix the Schedulers.fromExecutor to avoid concurrent execution of tasks within the same chain (since these are Mono's there should not be a problem, but flatMap can be parallel, so worth always to consider), we still (!) run into the problem.

So what's going on? It's called "previous values". As Reactor processes various things in parallel, it can, and will happen, that a particular Observation is in one Thread used as the value "to come back to" and in another one it is the current one being modified. That happens with the parent-child nature (although unrelated instances can run into the same issue) between created Observations and when ThreadLocal restoration takes place.

Now, what's happening in the tap operator combined with reactor.core.observability.micrometer.Micrometer.observation?

When the chain is completed, it just adds a new key-value pair to the low-cardinality keys. If this same Observation is used by another Thread to restore ThreadLocals there is a risk of both reading the key-value pairs and modifying them at the same time.

There were some recent optimizations (#4959 and $5692) but I don't see them as being the reason for the issue, which is the use of Collection#toArray in this code of Micrometer's KeyValues class:

                Collection<? extends KeyValue> keyValuesCollection = (Collection)keyValues;
                return make((KeyValue[])keyValuesCollection.toArray(new KeyValue[0]))

I believe, since recently the RevertingScope.matchingBaggageKeyValues() was added and uses the Collection access path, the problem was just uncovered and has been sleeping there somewhere for some time now.

@shakuzen @marcingrzejszczak @jonatan-ivanov FYI ^

@lorenzopatane
Copy link

lorenzopatane commented Dec 9, 2024

Hello, I will add my two cents:

after upgrading to Spring Boot 3.4.0 (which inherits Micrometer 1.14.1) I get this stacktrace quite often.

reactor.core.publisher.Operators         : Operator called default onErrorDropped

java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1
	at java.base/java.util.LinkedHashMap.valuesToArray(Unknown Source) ~[na:na]
	at java.base/java.util.LinkedHashMap$LinkedValues.toArray(Unknown Source) ~[na:na]
	at io.micrometer.common.KeyValues.of(KeyValues.java:365) ~[micrometer-commons-1.14.1.jar:1.14.1]
	at io.micrometer.observation.Observation$Context.getLowCardinalityKeyValues(Observation.java:1210) ~[micrometer-observation-1.14.1.jar:1.14.1]
	at io.micrometer.observation.Observation$Context.getAllKeyValues(Observation.java:1232) ~[micrometer-observation-1.14.1.jar:1.14.1]
	at io.micrometer.tracing.handler.RevertingScope.matchingBaggageKeyValues(RevertingScope.java:114) ~[micrometer-tracing-1.4.0.jar:1.4.0]
	at io.micrometer.tracing.handler.RevertingScope.maybeWithBaggage(RevertingScope.java:81) ~[micrometer-tracing-1.4.0.jar:1.4.0]
	at io.micrometer.tracing.handler.TracingObservationHandler.setMaybeScopeOnTracingContext(TracingObservationHandler.java:100) ~[micrometer-tracing-1.4.0.jar:1.4.0]
	at io.micrometer.tracing.handler.TracingObservationHandler.onScopeOpened(TracingObservationHandler.java:82) ~[micrometer-tracing-1.4.0.jar:1.4.0]
	at io.micrometer.observation.ObservationHandler$FirstMatchingCompositeObservationHandler.onScopeOpened(ObservationHandler.java:173) ~[micrometer-observation-1.14.1.jar:1.14.1]
	at io.micrometer.observation.SimpleObservation.notifyOnScopeOpened(SimpleObservation.java:235) ~[micrometer-observation-1.14.1.jar:1.14.1]
	at io.micrometer.observation.SimpleObservation.openScope(SimpleObservation.java:194) ~[micrometer-observation-1.14.1.jar:1.14.1]
	at io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor.setValue(ObservationThreadLocalAccessor.java:117) ~[micrometer-observation-1.14.1.jar:1.14.1]
	at io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor.setValue(ObservationThreadLocalAccessor.java:33) ~[micrometer-observation-1.14.1.jar:1.14.1]
	at io.micrometer.context.DefaultContextSnapshot.setThreadLocal(DefaultContextSnapshot.java:102) ~[context-propagation-1.1.2.jar:1.1.2]
	at io.micrometer.context.DefaultContextSnapshotFactory.setAllThreadLocalsFrom(DefaultContextSnapshotFactory.java:125) ~[context-propagation-1.1.2.jar:1.1.2]
	at io.micrometer.context.DefaultContextSnapshotFactory.setThreadLocalsFrom(DefaultContextSnapshotFactory.java:109) ~[context-propagation-1.1.2.jar:1.1.2]
	at reactor.core.publisher.ContextPropagation.setThreadLocals(ContextPropagation.java:85) ~[reactor-core-3.7.0.jar:3.7.0]
	at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onComplete(FluxContextWriteRestoringThreadLocals.java:148) ~[reactor-core-3.7.0.jar:3.7.0]
	at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply(MonoCompletionStage.java:122) ~[reactor-core-3.7.0.jar:3.7.0]
	at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply(MonoCompletionStage.java:67) ~[reactor-core-3.7.0.jar:3.7.0]
	at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[na:na]
	at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler.setFinalResult(CqlRequestHandler.java:326) ~[java-driver-core-4.18.1.jar:4.18.1]
	at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler.access$1500(CqlRequestHandler.java:97) ~[java-driver-core-4.18.1.jar:4.18.1]
	at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler$NodeResponseCallback.onResponse(CqlRequestHandler.java:657) ~[java-driver-core-4.18.1.jar:4.18.1]
	at com.datastax.oss.driver.internal.core.channel.InFlightHandler.channelRead(InFlightHandler.java:259) ~[java-driver-core-4.18.1.jar:4.18.1]
........
.......

I think this is related to this issue. Thanks for reading this.

shakuzen added a commit that referenced this issue Dec 10, 2024
Using a ConcurrentHashMap for the Context keyvalues (low and high) avoids the issues with concurrent reads and writes. Related benchmarks and concurrency tests were added as well.

See gh-4356

---------

Co-authored-by: Jonatan Ivanov <[email protected]>
@shakuzen
Copy link
Member

Could those who were seeing this issue please try with Micrometer 1.14.2? We believe this change should have fixed it.

@lorenzopatane
Copy link

Could those who were seeing this issue please try with Micrometer 1.14.2? We believe this change should have fixed it.

@shakuzen, I can confirm that the issue from my side is fixed now.
I see no ArrayIndexOutOfBoundsException or other exceptions related to Micrometer.
Thanks.

@markrichardstm
Copy link

@shakuzen a quick test suggests it looks okay. I'll know more once we can run larger workloads through which will not likely be this week.

@shakuzen
Copy link
Member

We'll close this issue for now under the assumption that it is fixed by the mentioned change. If anyone finds that is not the case, please report back here with the details found and we can reopen the issue.

@markrichardstm
Copy link

@shakuzen we have put a larger workload through and not seen this exception so it looks fixed. Thank you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants