Skip to content

Commit

Permalink
CDPD-35180. HADOOP-18091. S3A auditing leaks memory through ThreadLoc…
Browse files Browse the repository at this point in the history
…al references (apache#3930)

Adds a new map type WeakReferenceMap, which stores weak
references to values, and a WeakReferenceThreadMap subclass
to more closely resemble a thread local type, as it is a
map of threadId to value.

Construct it with a factory method and optional callback
for notification on loss and regeneration.

 WeakReferenceThreadMap<WrappingAuditSpan> activeSpan =
      new WeakReferenceThreadMap<>(
          (k) -> getUnbondedSpan(),
          this::noteSpanReferenceLost);

This is used in ActiveAuditManagerS3A for span tracking.

Relates to
* HADOOP-17511. Add an Audit plugin point for S3A
* HADOOP-18094. Disable S3A auditing by default.

Contributed by Steve Loughran.

Change-Id: Ibf7bb082fd47298f7ebf46d92f56e80ca9b2aaf8

 Conflicts:
	hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

Change-Id: Iaa092c89ab0be16f816b56be78d795ce618a8656
  • Loading branch information
steveloughran authored and mukund-thakur committed Jun 29, 2022
1 parent ec0fccf commit e21e220
Show file tree
Hide file tree
Showing 18 changed files with 1,355 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

Expand Down Expand Up @@ -69,11 +72,16 @@
* {@link #currentAuditContext()} to get the thread-local
* context for the caller, which can then be manipulated.
*
* For further information, especially related to memory consumption,
* read the document `auditing_architecture` in the `hadoop-aws` module.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public final class CommonAuditContext {

private static final Logger LOG = LoggerFactory.getLogger(
CommonAuditContext.class);

/**
* Process ID; currently built from UUID and timestamp.
*/
Expand All @@ -92,7 +100,7 @@ public final class CommonAuditContext {
* Supplier operations must themselves be thread safe.
*/
private final Map<String, Supplier<String>> evaluatedEntries =
new ConcurrentHashMap<>();
new ConcurrentHashMap<>(1);

static {
// process ID is fixed.
Expand All @@ -108,7 +116,7 @@ public final class CommonAuditContext {
* the span is finalized.
*/
private static final ThreadLocal<CommonAuditContext> ACTIVE_CONTEXT =
ThreadLocal.withInitial(() -> createInstance());
ThreadLocal.withInitial(CommonAuditContext::createInstance);

private CommonAuditContext() {
}
Expand All @@ -125,11 +133,21 @@ public Supplier<String> put(String key, String value) {

/**
* Put a context entry dynamically evaluated on demand.
* Important: as these supplier methods are long-lived,
* the supplier function <i>MUST NOT</i> be part of/refer to
* any object instance of significant memory size.
* Applications SHOULD remove references when they are
* no longer needed.
* When logged at TRACE, prints the key and stack trace of the caller,
* to allow for debugging of any problems.
* @param key key
* @param value new value
* @return old value or null
*/
public Supplier<String> put(String key, Supplier<String> value) {
if (LOG.isTraceEnabled()) {
LOG.trace("Adding context entry {}", key, new Exception(key));
}
return evaluatedEntries.put(key, value);
}

Expand All @@ -138,6 +156,9 @@ public Supplier<String> put(String key, Supplier<String> value) {
* @param key key
*/
public void remove(String key) {
if (LOG.isTraceEnabled()) {
LOG.trace("Remove context entry {}", key);
}
evaluatedEntries.remove(key);
}

Expand Down Expand Up @@ -168,7 +189,7 @@ public void reset() {
private void init() {

// thread 1 is dynamic
put(PARAM_THREAD1, () -> currentThreadID());
put(PARAM_THREAD1, CommonAuditContext::currentThreadID);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.impl;

import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;

import org.apache.hadoop.util.WeakReferenceMap;

/**
* A WeakReferenceMap for threads.
* @param <V> value type of the map
*/
public class WeakReferenceThreadMap<V> extends WeakReferenceMap<Long, V> {

public WeakReferenceThreadMap(final Function<? super Long, ? extends V> factory,
@Nullable final Consumer<? super Long> referenceLost) {
super(factory, referenceLost);
}

public V getForCurrentThread() {
return get(currentThreadId());
}

public V removeForCurrentThread() {
return remove(currentThreadId());
}

public long currentThreadId() {
return Thread.currentThread().getId();
}

public V setForCurrentThread(V newVal) {
return put(currentThreadId(), newVal);
}

}
Loading

0 comments on commit e21e220

Please sign in to comment.