Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Factor out reference counting to explicit API #3081

Merged
merged 9 commits into from
Jun 5, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,24 @@
*/
package co.elastic.apm.agent.collections;

import co.elastic.apm.agent.tracer.AbstractSpan;
import co.elastic.apm.agent.tracer.reference.ReferenceCounted;

import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;

/**
* Hash map dedicated to storage of in-flight spans and transactions, reference count is being incremented/decremented
* Hash map dedicated to storage of reference counted objects where counts are being incremented/decremented
* when entry is added/removed. Usage of this map is intended for providing GC-based storage of context associated
* to a framework-level object key, when the latter is collected by GC it allows to decrement and then recycle the
* span/transaction.
*
* @param <K> key type
* @param <V> context type
*/
public class SpanConcurrentHashMap<K, V extends AbstractSpan<?>> extends ConcurrentHashMap<K, V> {
public class ReferencedCountedConcurrentHashMap<K, V extends ReferenceCounted> extends ConcurrentHashMap<K, V> {

SpanConcurrentHashMap() {
ReferencedCountedConcurrentHashMap() {
}

@Nullable
Expand Down Expand Up @@ -71,7 +71,7 @@ public void clear() {
super.clear();
}

private void onPut(@Nullable AbstractSpan<?> previous, AbstractSpan<?> value) {
private void onPut(@Nullable ReferenceCounted previous, ReferenceCounted value) {
if (previous == null) {
// new entry
value.incrementReferences();
Expand All @@ -82,7 +82,7 @@ private void onPut(@Nullable AbstractSpan<?> previous, AbstractSpan<?> value) {
}
}

private void onRemove(@Nullable AbstractSpan<?> removed) {
private void onRemove(@Nullable ReferenceCounted removed) {
if (removed == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import co.elastic.apm.agent.sdk.weakconcurrent.WeakConcurrent;
import co.elastic.apm.agent.sdk.weakconcurrent.WeakMap;
import co.elastic.apm.agent.sdk.weakconcurrent.WeakSet;
import co.elastic.apm.agent.tracer.AbstractSpan;
import co.elastic.apm.agent.tracer.reference.ReferenceCounted;
import com.blogspot.mydailyjava.weaklockfree.AbstractWeakConcurrentMap;
import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentSet;

Expand All @@ -37,8 +37,8 @@ public class WeakConcurrentProviderImpl implements WeakConcurrent.WeakConcurrent

private static final WeakConcurrentSet<AbstractWeakConcurrentMap<?, ?, ?>> registeredMaps = new WeakConcurrentSet<>(WeakConcurrentSet.Cleaner.INLINE);

public static <K, V extends AbstractSpan<?>> WeakMap<K, V> createWeakSpanMap() {
SpanConcurrentHashMap<AbstractWeakConcurrentMap.WeakKey<K>, V> map = new SpanConcurrentHashMap<>();
public static <K, V extends ReferenceCounted> WeakMap<K, V> createWeakReferenceCountedMap() {
ReferencedCountedConcurrentHashMap<AbstractWeakConcurrentMap.WeakKey<K>, V> map = new ReferencedCountedConcurrentHashMap<>();
NullSafeWeakConcurrentMap<K, V> result = new NullSafeWeakConcurrentMap<>(map);
registeredMaps.add(result);
return result;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package co.elastic.apm.agent.collections;

import co.elastic.apm.agent.sdk.weakconcurrent.WeakMap;
import co.elastic.apm.agent.tracer.reference.ReferenceCounted;
import co.elastic.apm.agent.tracer.reference.ReferenceCounter;

import javax.annotation.Nullable;

public class WeakMapReferenceCounter<K, V extends ReferenceCounted> implements ReferenceCounter<K, V> {
raphw marked this conversation as resolved.
Show resolved Hide resolved

private final WeakMap<K, V> map;

public WeakMapReferenceCounter(WeakMap<K, V> map) {
this.map = map;
}

@Override
@Nullable
public V get(K key) {
return map.get(key);
}

@Override
public boolean contains(K key) {
return map.containsKey(key);
}

@Override
public void put(K key, V value) {
map.put(key, value);
}

@Override
@Nullable
public V remove(K key) {
return map.remove(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package co.elastic.apm.agent.impl;

import co.elastic.apm.agent.collections.WeakConcurrentProviderImpl;
import co.elastic.apm.agent.collections.WeakMapReferenceCounter;
import co.elastic.apm.agent.common.JvmRuntimeInfo;
import co.elastic.apm.agent.common.util.WildcardMatcher;
import co.elastic.apm.agent.configuration.CoreConfiguration;
Expand Down Expand Up @@ -49,6 +51,10 @@
import co.elastic.apm.agent.sdk.weakconcurrent.WeakConcurrent;
import co.elastic.apm.agent.sdk.weakconcurrent.WeakMap;
import co.elastic.apm.agent.tracer.GlobalTracer;
import co.elastic.apm.agent.tracer.reference.ReferenceCounted;
import co.elastic.apm.agent.tracer.reference.ReferenceCounter;
import co.elastic.apm.agent.util.DependencyInjectingServiceLoader;
import co.elastic.apm.agent.util.ExecutorUtils;
import co.elastic.apm.agent.tracer.Scope;
import co.elastic.apm.agent.tracer.dispatch.BinaryHeaderGetter;
import co.elastic.apm.agent.tracer.dispatch.TextHeaderGetter;
Expand Down Expand Up @@ -597,6 +603,11 @@ public ObjectPoolFactory getObjectPoolFactory() {
return objectPoolFactory;
}

@Override
public <K, V extends ReferenceCounted> ReferenceCounter<K, V> createReferenceCounter() {
return new WeakMapReferenceCounter<>(WeakConcurrentProviderImpl.<K, V>createWeakReferenceCountedMap());
}

@Override
@Nullable
public AbstractSpan<?> getActive() {
Expand Down Expand Up @@ -938,5 +949,4 @@ public <T extends co.elastic.apm.agent.tracer.Tracer> T require(Class<T> type) {
public Set<String> getTraceHeaderNames() {
return TraceContext.TRACE_TEXTUAL_HEADERS;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

class SpanConcurrentHashMapTest {
class ReferencedCountedConcurrentHashMapTest {

@Nullable
private Object key;
Expand All @@ -45,7 +45,7 @@ void putRemove() {
checkRefCount(testSpan, 0);

key = new Object();
WeakMap<Object, TestSpan> map = WeakConcurrentProviderImpl.createWeakSpanMap();
WeakMap<Object, TestSpan> map = WeakConcurrentProviderImpl.createWeakReferenceCountedMap();
map.put(key, testSpan);

checkRefCount(testSpan, 1);
Expand All @@ -63,7 +63,7 @@ void putRemove() {
void putTwice(PutOperation operation) {
TestSpan testSpan = new TestSpan();
key = new Object();
WeakMap<Object, TestSpan> map = WeakConcurrentProviderImpl.createWeakSpanMap();
WeakMap<Object, TestSpan> map = WeakConcurrentProviderImpl.createWeakReferenceCountedMap();

checkRefCount(testSpan, 0);

Expand All @@ -82,7 +82,7 @@ void swapValues() {
TestSpan ts2 = new TestSpan();

key = new Object();
WeakMap<Object, TestSpan> map = WeakConcurrentProviderImpl.createWeakSpanMap();
WeakMap<Object, TestSpan> map = WeakConcurrentProviderImpl.createWeakReferenceCountedMap();

map.put(key, ts1);
map.put(key, ts2);
Expand All @@ -99,7 +99,7 @@ void testPutIfAbsent() {
TestSpan ts2 = new TestSpan();

key = new Object();
WeakMap<Object, TestSpan> map = WeakConcurrentProviderImpl.createWeakSpanMap();
WeakMap<Object, TestSpan> map = WeakConcurrentProviderImpl.createWeakReferenceCountedMap();

map.putIfAbsent(key, ts1);
map.putIfAbsent(key, ts2);
Expand Down Expand Up @@ -130,7 +130,7 @@ void execute(WeakMap<Object, TestSpan> map, Object key, TestSpan value) {

@Test
void clear() {
WeakMap<Object, TestSpan> map = WeakConcurrentProviderImpl.createWeakSpanMap();
WeakMap<Object, TestSpan> map = WeakConcurrentProviderImpl.createWeakReferenceCountedMap();

List<AbstractSpan<?>> list = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Expand All @@ -153,7 +153,7 @@ void weakMapDecrementOnStaleKeyGC() {
key = new Object();
TestSpan span = new TestSpan();

WeakMap<Object, AbstractSpan<?>> map = WeakConcurrentProviderImpl.createWeakSpanMap();
WeakMap<Object, AbstractSpan<?>> map = WeakConcurrentProviderImpl.createWeakReferenceCountedMap();

map.put(key, span);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
package co.elastic.apm.agent.asynchttpclient;

import co.elastic.apm.agent.bci.TracerAwareInstrumentation;
import co.elastic.apm.agent.collections.WeakConcurrentProviderImpl;
import co.elastic.apm.agent.httpclient.HttpClientHelper;
import co.elastic.apm.agent.tracer.AbstractSpan;
import co.elastic.apm.agent.tracer.Outcome;
import co.elastic.apm.agent.tracer.Span;
import co.elastic.apm.agent.sdk.DynamicTransformer;
import co.elastic.apm.agent.sdk.ElasticApmInstrumentation;
import co.elastic.apm.agent.sdk.weakconcurrent.WeakMap;
import co.elastic.apm.agent.tracer.reference.ReferenceCounter;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
Expand Down Expand Up @@ -54,7 +53,7 @@ public abstract class AbstractAsyncHttpClientInstrumentation extends TracerAware

public static class Helper {

static final WeakMap<AsyncHandler<?>, Span> handlerSpanMap = WeakConcurrentProviderImpl.createWeakSpanMap();
static final ReferenceCounter<AsyncHandler<?>, Span> handlerSpanMap = tracer.createReferenceCounter();

public static final List<Class<? extends ElasticApmInstrumentation>> ASYNC_HANDLER_INSTRUMENTATIONS = Arrays.<Class<? extends ElasticApmInstrumentation>>asList(
AsyncHandlerOnCompletedInstrumentation.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
*/
package co.elastic.apm.agent.dubbo;

import co.elastic.apm.agent.collections.WeakConcurrentProviderImpl;
import co.elastic.apm.agent.tracer.AbstractSpan;
import co.elastic.apm.agent.sdk.weakconcurrent.WeakMap;
import co.elastic.apm.agent.tracer.GlobalTracer;
import co.elastic.apm.agent.tracer.reference.ReferenceCounter;
import com.alibaba.dubbo.remoting.exchange.ResponseCallback;

public class AlibabaCallbackHolder {
public static final WeakMap<ResponseCallback, AbstractSpan<?>> callbackSpanMap = WeakConcurrentProviderImpl.createWeakSpanMap();
public static final ReferenceCounter<ResponseCallback, AbstractSpan<?>> callbackSpanMap = GlobalTracer.get().createReferenceCounter();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package co.elastic.apm.agent.grpc;

import co.elastic.apm.agent.collections.WeakConcurrentProviderImpl;
import co.elastic.apm.agent.tracer.GlobalTracer;
import co.elastic.apm.agent.tracer.AbstractSpan;
import co.elastic.apm.agent.tracer.Outcome;
Expand All @@ -31,6 +30,7 @@
import co.elastic.apm.agent.tracer.dispatch.HeaderUtils;
import co.elastic.apm.agent.tracer.dispatch.TextHeaderGetter;
import co.elastic.apm.agent.tracer.dispatch.TextHeaderSetter;
import co.elastic.apm.agent.tracer.reference.ReferenceCounter;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.Metadata;
Expand Down Expand Up @@ -58,27 +58,27 @@ public static GrpcHelper getInstance() {
/**
* Map of all in-flight {@link Span} with {@link ClientCall} instance as key.
*/
private final WeakMap<ClientCall<?, ?>, Span<?>> clientCallSpans;
private final ReferenceCounter<ClientCall<?, ?>, Span<?>> clientCallSpans;

/**
* Map of all in-flight {@link Span} with {@link ClientCall} instance as key.
*/
private final WeakMap<ClientCall<?, ?>, Span<?>> delayedClientCallSpans;
private final ReferenceCounter<ClientCall<?, ?>, Span<?>> delayedClientCallSpans;

/**
* Map of all in-flight {@link Span} with {@link ClientCall.Listener} instance as key.
*/
private final WeakMap<ClientCall.Listener<?>, Span<?>> clientCallListenerSpans;
private final ReferenceCounter<ClientCall.Listener<?>, Span<?>> clientCallListenerSpans;

/**
* Map of all in-flight {@link Transaction} with {@link ServerCall.Listener} instance as key.
*/
private final WeakMap<ServerCall.Listener<?>, Transaction<?>> serverListenerTransactions;
private final ReferenceCounter<ServerCall.Listener<?>, Transaction<?>> serverListenerTransactions;

/**
* Map of all in-flight {@link Transaction} with {@link ServerCall} instance as key.
*/
private final WeakMap<ServerCall<?, ?>, Transaction<?>> serverCallTransactions;
private final ReferenceCounter<ServerCall<?, ?>, Transaction<?>> serverCallTransactions;

/**
* gRPC header cache used to minimize allocations
Expand All @@ -91,19 +91,18 @@ public static GrpcHelper getInstance() {
private final Tracer tracer;

public GrpcHelper() {
clientCallSpans = WeakConcurrentProviderImpl.createWeakSpanMap();
delayedClientCallSpans = WeakConcurrentProviderImpl.createWeakSpanMap();
clientCallListenerSpans = WeakConcurrentProviderImpl.createWeakSpanMap();
tracer = GlobalTracer.get();
clientCallSpans = tracer.createReferenceCounter();
delayedClientCallSpans = tracer.createReferenceCounter();
clientCallListenerSpans = tracer.createReferenceCounter();

serverListenerTransactions = WeakConcurrentProviderImpl.createWeakSpanMap();
serverCallTransactions = WeakConcurrentProviderImpl.createWeakSpanMap();
serverListenerTransactions = tracer.createReferenceCounter();
serverCallTransactions = tracer.createReferenceCounter();

headerCache = WeakConcurrent.buildMap();

headerSetter = new GrpcHeaderSetter();
headerGetter = new GrpcHeaderGetter();

tracer = GlobalTracer.get();
}

// transaction management (server part)
Expand Down Expand Up @@ -480,7 +479,7 @@ public void clientCallStartExit(@Nullable Span<?> spanFromEntry, ClientCall.List
}

public void cancelCall(ClientCall<?, ?> clientCall, @Nullable Throwable cause) {
WeakMap<ClientCall<?, ?>, Span<?>> clientCallMap = (isDelayedClientCall(clientCall)) ? delayedClientCallSpans : clientCallSpans;
ReferenceCounter<ClientCall<?, ?>, Span<?>> clientCallMap = (isDelayedClientCall(clientCall)) ? delayedClientCallSpans : clientCallSpans;
// we can't remove yet, in order to avoid reference decrement prematurely
Span<?> span = clientCallMap.get(clientCall);
if (span != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package co.elastic.apm.agent.grpc;

import co.elastic.apm.agent.AbstractInstrumentationTest;
import co.elastic.apm.agent.tracer.Outcome;
import io.grpc.Status;
import org.junit.jupiter.api.Test;
Expand All @@ -26,7 +27,7 @@

import static org.assertj.core.api.Assertions.assertThat;

class GrpcHelperTest {
class GrpcHelperTest extends AbstractInstrumentationTest {

@ParameterizedTest
@EnumSource(Status.Code.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
*/
package co.elastic.apm.agent.concurrent;

import co.elastic.apm.agent.collections.WeakConcurrentProviderImpl;
import co.elastic.apm.agent.common.ThreadUtils;
import co.elastic.apm.agent.tracer.AbstractSpan;
import co.elastic.apm.agent.sdk.DynamicTransformer;
import co.elastic.apm.agent.sdk.ElasticApmInstrumentation;
import co.elastic.apm.agent.sdk.state.GlobalState;
import co.elastic.apm.agent.sdk.weakconcurrent.WeakMap;
import co.elastic.apm.agent.tracer.GlobalTracer;
import co.elastic.apm.agent.tracer.Tracer;
import co.elastic.apm.agent.tracer.reference.ReferenceCounter;

import javax.annotation.Nullable;
import java.util.ArrayList;
Expand All @@ -42,7 +42,7 @@
@GlobalState
public class JavaConcurrent {

private static final WeakMap<Object, AbstractSpan<?>> contextMap = WeakConcurrentProviderImpl.createWeakSpanMap();
private static final ReferenceCounter<Object, AbstractSpan<?>> contextMap = GlobalTracer.get().createReferenceCounter();

private static final List<Class<? extends ElasticApmInstrumentation>> RUNNABLE_CALLABLE_FJTASK_INSTRUMENTATION = Collections.
<Class<? extends ElasticApmInstrumentation>>singletonList(RunnableCallableForkJoinTaskInstrumentation.class);
Expand Down
Loading