From a5ed7013e37fcaf8758b2bca6170edfcb63e5e1c Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 24 Jan 2022 16:47:50 +0000 Subject: [PATCH 01/12] HADOOP-18091. WeakReferenceMap A map where references can be garbage collected. This makes it a best-effort map Change-Id: I4622f51f87cfe0fb50629e447f6828c8b5072c8c --- .../apache/hadoop/util/WeakReferenceMap.java | 207 ++++++++++++++++++ 1 file changed, 207 insertions(+) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java new file mode 100644 index 0000000000000..fb05a4491453b --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import java.lang.ref.WeakReference; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; +import java.util.function.Function; + +import javax.annotation.Nullable; + +import org.apache.hadoop.classification.InterfaceAudience; + +import static java.util.Objects.requireNonNull; + +/** + * A map of keys type k to objects of type V which uses weak references, + * so does lot leak memory through long-lived references + * at the expense of losing references when GC takes place.. + * + * This class is intended be used instead of ThreadLocal storage when + * references are to be cleaned up when the instance holding. + * In this use case, the key is the Long key. + * + * Concurrency. + * The class assumes that map entries are rarely contended for when writing, + * and that not blocking other threads is more important than atomicity. + * - a ConcurrentHashMap is used to map keys to weak references, with + * all its guarantees. + * - there is no automatic pruning. + * - see {@link #create(Object)} for the concurrency semantics on entry creation. + */ +@InterfaceAudience.Private +public class WeakReferenceMap { + + /** + * The reference map. + */ + private final Map> map = new ConcurrentHashMap<>(); + + + /** + * Supplier of new instances. + */ + private final Function factory; + + /** + * Nullable callback when a get on a key got a weak reference back. + * The assumption is that this is for logging/stats, which is why + * no attempt is made to use the call as a supplier of a new value. + */ + private final Consumer referenceLost; + + /** + * instantiate. + * @param factory supplier of new instances + * @param referenceLost optional callback on lost references. + */ + public WeakReferenceMap( + Function factory, + @Nullable final Consumer referenceLost) { + + this.factory = requireNonNull(factory); + this.referenceLost = referenceLost; + } + + /** + * Clear all entries. + */ + public void clear() { + map.clear(); + } + + /** + * look up the value, returning the possibly empty weak reference + * to a value, or null if no value was found. + * @param key key to look up + * @return null if there is no entry, a weak reference if found + */ + public WeakReference lookup(K key) { + return map.get(key); + } + + /** + * Get the value, creating if needed. + * @param key key. + * @return an instance. + */ + public V get(K key) { + final WeakReference current = lookup(key); + if (current != null && current.get() != null) { + // all good. + return current.get(); + } + // here, either no ref, or the value is null + if (current != null) { + noteLost(key); + } + return create(key); + } + + /** + * Create a new instance. + * The instance is created, added to the map and then the + * map value retrieved. + * This ensures that the reference returned is that in the map, + * even if there is more than one entry being created at the same time. + * @param key key + * @return the value + */ + public V create(K key) { + WeakReference newRef = new WeakReference<>( + requireNonNull(factory.apply(key))); + map.put(key, newRef); + return map.get(key).get(); + } + + /** + * Put a value under the key. + * @param key key + * @param value value + * @return any old non-null reference. + */ + public V put(K key, V value) { + return resolve(map.put(key, new WeakReference<>(value))); + } + + /** + * Remove any value under the key. + * @param key key + * @return any old non-null reference. + */ + public V remove(K key) { + return resolve(map.remove(key)); + } + + /** + * Does the map have a valid reference for this object? + * no-side effects: there's no attempt to notify or cleanup + * if the reference is null. + * @param key key to look up + * @return true if there is a valid reference. + */ + public boolean containsKey(K key) { + final WeakReference current = lookup(key); + return resolve(current) != null; + } + + /** + * Given a possibly null weak reference, resolve + * its value. + * @param r reference to resolve + * @return the value or null + */ + private V resolve(WeakReference r) { + return r == null ? null : r.get(); + } + + /** + * Prune all null weak references, calling the referenceLost + * callback for each one. + * + * non-atomic and non-blocking. + * @return the number of entries pruned. + */ + public int prune() { + int count = 0; + final Iterator>> it = map.entrySet().iterator(); + while (it.hasNext()) { + final Map.Entry> next = it.next(); + if (next.getValue().get() == null) { + it.remove(); + count++; + noteLost(next.getKey()); + } + } + return count; + } + + /** + * Notify the reference lost callback. + * @param key key of lost reference + */ + private void noteLost(final K key) { + if (referenceLost != null) { + referenceLost.accept(key); + } + } +} From 4d6daf539931ad71a46639bd67d4960abce9b055 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 25 Jan 2022 13:48:27 +0000 Subject: [PATCH 02/12] HADOOP-18091. add a specific map for thread references in o.a.h.fs.impl if this is working, hope to use for iostats snapshots too Change-Id: I808ced6dc4bfbf2b961174dbdb544636a6253592 --- .../fs/impl/WeakReferenceThreadMap.java | 54 +++++++++++++++++++ .../apache/hadoop/util/WeakReferenceMap.java | 10 ++-- 2 files changed, 60 insertions(+), 4 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java new file mode 100644 index 0000000000000..a00e3fa47f425 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import java.util.function.Consumer; +import java.util.function.Function; +import javax.annotation.Nullable; + +import org.apache.hadoop.util.WeakReferenceMap; + +/** + * A WeakReferenceMap for threads. + * @param value type of the map + */ +public class WeakReferenceThreadMap extends WeakReferenceMap { + + public WeakReferenceThreadMap(final Function factory, + @Nullable final Consumer referenceLost) { + super(factory, referenceLost); + } + + public V getForCurrentThread() { + return get(curentThreadId()); + } + + public V removeForCurrentThread() { + return remove(curentThreadId()); + } + + private long curentThreadId() { + return Thread.currentThread().getId(); + } + + public V setForCurrentThread(V newVal) { + return put(curentThreadId(), newVal); + } + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java index fb05a4491453b..b90c4669f1729 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java @@ -32,7 +32,7 @@ import static java.util.Objects.requireNonNull; /** - * A map of keys type k to objects of type V which uses weak references, + * A map of keys type K to objects of type V which uses weak references, * so does lot leak memory through long-lived references * at the expense of losing references when GC takes place.. * @@ -106,10 +106,12 @@ public WeakReference lookup(K key) { */ public V get(K key) { final WeakReference current = lookup(key); - if (current != null && current.get() != null) { + V val = resolve(current); + if (val != null) { // all good. - return current.get(); + return val; } + // here, either no ref, or the value is null if (current != null) { noteLost(key); @@ -118,7 +120,7 @@ public V get(K key) { } /** - * Create a new instance. + * Create a new instance under a key. * The instance is created, added to the map and then the * map value retrieved. * This ensures that the reference returned is that in the map, From 928f33b54cfb39490b17ff1b394fa27b0c863c71 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 25 Jan 2022 17:40:28 +0000 Subject: [PATCH 03/12] HADOOP-18091. add a test which generates OOMs Creates managers in a separate thread, creates spans. runs out of memory Change-Id: Ic4ff382e4edfa37ec7feb46265ba5a63e1abb560 --- .../fs/s3a/audit/MemoryHungryAuditor.java | 124 ++++++++++++++++++ .../TestActiveAuditManagerThreadLeakage.java | 122 +++++++++++++++++ 2 files changed, 246 insertions(+) create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/MemoryHungryAuditor.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestActiveAuditManagerThreadLeakage.java diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/MemoryHungryAuditor.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/MemoryHungryAuditor.java new file mode 100644 index 0000000000000..1ce505b9bb985 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/MemoryHungryAuditor.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.audit; + +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nullable; + +import org.apache.hadoop.fs.s3a.audit.impl.AbstractAuditSpanImpl; +import org.apache.hadoop.fs.s3a.audit.impl.AbstractOperationAuditor; + + +/** + * An audit service which returns the {@link NoopSpan}. + * Even though the spans are no-ops, each span is still + * created with a unique span ID. + */ +public class MemoryHungryAuditor extends AbstractOperationAuditor { + + public static final String NAME = "org.apache.hadoop.fs.s3a.audit.MemoryHungryAuditor"; + + /** + * How big is each manager? + */ + public static final int MANAGER_SIZE = 10 * 1024 * 1024; + + /** + * How big is each span? + */ + public static final int SPAN_SIZE = 512 * 1024; + + private static final AtomicLong INSTANCE_COUNT = new AtomicLong(); + + private final AtomicLong spanCount = new AtomicLong(); + + private final byte[] data = new byte[MANAGER_SIZE]; + + /** + * unbonded span created on demand. + */ + private AuditSpanS3A unbondedSpan; + + + /** + * Constructor. + */ + public MemoryHungryAuditor() { + super("MemoryHungryAuditor"); + INSTANCE_COUNT.incrementAndGet(); + } + + public long getSpanCount() { + return spanCount.get(); + } + + @Override + public AuditSpanS3A createSpan( + final String operation, + @Nullable final String path1, + @Nullable final String path2) { + spanCount.incrementAndGet(); + return new MemorySpan(createSpanID(), operation); + } + + @Override + public AuditSpanS3A getUnbondedSpan() { + if (unbondedSpan == null) { + unbondedSpan = new MemorySpan(createSpanID(), "unbonded"); + } + return unbondedSpan; + } + + @Override + public String toString() { + return String.format("%s instance %d span count %d", + super.toString(), + getInstanceCount(), + getSpanCount()); + } + + public static long getInstanceCount() { + return INSTANCE_COUNT.get(); + } + + /** + * A span which consumes a lot of memory. + */ + private static class MemorySpan extends AbstractAuditSpanImpl { + + private final byte[] data = new byte[SPAN_SIZE]; + + private MemorySpan(final String spanId, final String operationName) { + super(spanId, operationName); + } + + @Override + public AuditSpanS3A activate() { + + return this; + } + + @Override + public void deactivate() { + + } + + } + +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestActiveAuditManagerThreadLeakage.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestActiveAuditManagerThreadLeakage.java new file mode 100644 index 0000000000000..756771fffcf7b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestActiveAuditManagerThreadLeakage.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.audit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.audit.impl.ActiveAuditManagerS3A; +import org.apache.hadoop.test.AbstractHadoopTestBase; + +import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.AUDIT_SERVICE_CLASSNAME; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.emptyStatisticsStore; + + +/** + * This test attempts to recreate the OOM problems of + * HADOOP-18091. S3A auditing leaks memory through ThreadLocal references + * it does this by creating a thread pool, then + * creates and destroys FS instances, with threads in + * the pool (but not the main JUnit test thread) creating + * audit spans. + * + * With a custom audit span with a large memory footprint, + * memory demands will be high, and if the closed instances + * don't get cleaned up, memory runs out. + * GCs are forced. + * It is critical no spans are created in the junit thread because they will last for + * the duration of the test JVM. + */ +@SuppressWarnings("ResultOfMethodCallIgnored") +public class TestActiveAuditManagerThreadLeakage extends AbstractHadoopTestBase { + /** + * Logging. + */ + private static final Logger LOG = + LoggerFactory.getLogger(TestActiveAuditManagerThreadLeakage.class); + + private static final int MANAGER_COUNT = 1000; + private static final int THREAD_COUNT = 20; + private ExecutorService workers; + + @Test + public void testMemoryLeak() throws Throwable { + workers = Executors.newFixedThreadPool(THREAD_COUNT); + for (int i = 0; i < MANAGER_COUNT; i++) { + final long oneAuditConsumption = createDestroyOneAuditor(); + LOG.info("manager {} memory retained {}", i, oneAuditConsumption); + } + } + + private long createDestroyOneAuditor() throws Exception { + long original = Runtime.getRuntime().freeMemory(); + ExecutorService factory = Executors.newSingleThreadExecutor(); + + final Future action = factory.submit(this::createAuditorAndWorkers); + final long count = action.get(); + LOG.info("Span count {}", count); + + factory.shutdown(); + factory.awaitTermination(60, TimeUnit.SECONDS); + + System.gc(); + final long current = Runtime.getRuntime().freeMemory(); + return current - original; + + } + + private long createAuditorAndWorkers() + throws IOException, InterruptedException, ExecutionException { + final Configuration conf = new Configuration(false); + conf.set(AUDIT_SERVICE_CLASSNAME, MemoryHungryAuditor.NAME); + try (ActiveAuditManagerS3A auditManager = new ActiveAuditManagerS3A(emptyStatisticsStore())){ + auditManager.init(conf); + auditManager.start(); + LOG.info("Using {}", auditManager); + // no guarentee every thread gets used + + List> futures = new ArrayList<>(THREAD_COUNT); + for (int i = 0; i < THREAD_COUNT; i++) { + futures.add(workers.submit(() -> worker(auditManager))); + } + + for (Future future : futures) { + future.get(); + } + return ((MemoryHungryAuditor)auditManager.getAuditor()).getSpanCount(); + } + + } + + private AuditSpanS3A worker(final ActiveAuditManagerS3A auditManager) throws IOException { + return auditManager.createSpan("span", null, null); + } +} From 7739be008ed894216d19bdef56ad21056b069df6 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 25 Jan 2022 18:47:51 +0000 Subject: [PATCH 04/12] HADOOP-18091. test feature creep no longer prunes anything, however. Change-Id: I71f4f36dcc0900531441b870f3f5de79c64fb2ee --- .../hadoop/fs/s3a/audit/OperationAuditor.java | 11 +++ .../s3a/audit/impl/ActiveAuditManagerS3A.java | 41 ++++++++-- .../fs/s3a/audit/MemoryHungryAuditor.java | 10 +++ .../TestActiveAuditManagerThreadLeakage.java | 79 +++++++++++++++---- 4 files changed, 121 insertions(+), 20 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/OperationAuditor.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/OperationAuditor.java index 672bcdf7f9c77..10c8f871cad32 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/OperationAuditor.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/OperationAuditor.java @@ -70,4 +70,15 @@ default boolean checkAccess(Path path, S3AFileStatus status, FsAction mode) * @return ID */ String getAuditorId(); + + /** + * Span reference lost from GC operations. + * This is only called when an attempt is made to retrieve on + * the active thread or when a prune operation is cleaning up. + * + * @param threadId thread ID. + */ + default void noteSpanReferenceLost(long threadId) { + + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java index 25f97c07a2f98..c53569368179d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java @@ -43,6 +43,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.impl.WeakReferenceThreadMap; import org.apache.hadoop.fs.s3a.S3AFileStatus; import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.fs.s3a.audit.AWSAuditEventCallbacks; @@ -88,6 +89,11 @@ * then the IOStatistics counter for {@code AUDIT_FAILURE} * is incremented. * + * Uses the WeakReferenceThreadMap to store spans for threads. + * Provided a calling class retains a reference to the span, + * the active span will be retained. + * + * */ @InterfaceAudience.Private public final class ActiveAuditManagerS3A @@ -131,8 +137,11 @@ public final class ActiveAuditManagerS3A * Thread local span. This defaults to being * the unbonded span. */ - private final ThreadLocal activeSpan = - ThreadLocal.withInitial(() -> getUnbondedSpan()); + + private final WeakReferenceThreadMap activeSpan = + new WeakReferenceThreadMap<>( + (k) -> getUnbondedSpan(), + this::noteSpanReferenceLost); /** * Destination for recording statistics, especially duration/count of @@ -225,7 +234,7 @@ public AuditSpanS3A getActiveAuditSpan() { * @return the active WrappingAuditSpan */ private WrappingAuditSpan activeSpan() { - return activeSpan.get(); + return activeSpan.getForCurrentThread(); } /** @@ -247,13 +256,35 @@ private AuditSpanS3A setActiveThreadSpan(AuditSpanS3A span) { */ private WrappingAuditSpan switchToActiveSpan(WrappingAuditSpan span) { if (span != null && span.isValidSpan()) { - activeSpan.set(span); + activeSpan.setForCurrentThread(span); } else { - activeSpan.set(unbondedSpan); + activeSpan.removeForCurrentThread(); } return activeSpan(); } + /** + * Span reference lost from GC operations. + * This is only called when an attempt is made to retrieve on + * the active thread or when a prune operation is cleaning up. + * + * @param threadId thread ID. + */ + private void noteSpanReferenceLost(long threadId) { + auditor.noteSpanReferenceLost(threadId); + } + + /** + * Prune all null weak references, calling the referenceLost + * callback for each one. + * + * non-atomic and non-blocking. + * @return the number of entries pruned. + */ + public int prune() { + return activeSpan.prune(); + } + /** * The Span ID in the audit manager is the ID of the auditor, * which can be used in the filesystem toString() method diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/MemoryHungryAuditor.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/MemoryHungryAuditor.java index 1ce505b9bb985..baedda25515f2 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/MemoryHungryAuditor.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/MemoryHungryAuditor.java @@ -21,6 +21,9 @@ import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.fs.s3a.audit.impl.AbstractAuditSpanImpl; import org.apache.hadoop.fs.s3a.audit.impl.AbstractOperationAuditor; @@ -34,6 +37,8 @@ public class MemoryHungryAuditor extends AbstractOperationAuditor { public static final String NAME = "org.apache.hadoop.fs.s3a.audit.MemoryHungryAuditor"; + private static final Logger LOG = + LoggerFactory.getLogger(MemoryHungryAuditor.class); /** * How big is each manager? */ @@ -93,6 +98,11 @@ public String toString() { getSpanCount()); } + @Override + public void noteSpanReferenceLost(final long threadId) { + LOG.info("Span lost for thread {}", threadId); + } + public static long getInstanceCount() { return INSTANCE_COUNT.get(); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestActiveAuditManagerThreadLeakage.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestActiveAuditManagerThreadLeakage.java index 756771fffcf7b..39e7c70c148d8 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestActiveAuditManagerThreadLeakage.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestActiveAuditManagerThreadLeakage.java @@ -20,14 +20,16 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; -import java.util.concurrent.CompletableFuture; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.assertj.core.api.Assertions; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,34 +68,48 @@ public class TestActiveAuditManagerThreadLeakage extends AbstractHadoopTestBase private static final int MANAGER_COUNT = 1000; private static final int THREAD_COUNT = 20; private ExecutorService workers; + private int pruneCount; @Test public void testMemoryLeak() throws Throwable { workers = Executors.newFixedThreadPool(THREAD_COUNT); for (int i = 0; i < MANAGER_COUNT; i++) { - final long oneAuditConsumption = createDestroyOneAuditor(); + final long oneAuditConsumption = createDestroyOneAuditor(i); LOG.info("manager {} memory retained {}", i, oneAuditConsumption); } + + // pruning must have taken place. + // that's somewhat implicit in the test not going OOM. + // but if memory allocation in test runs increase, it + // may cease to hold. in which case: create more + // audit managers + LOG.info("Totel prune count {}", pruneCount); + + Assertions.assertThat(pruneCount) + .describedAs("Totel prune count") + .isGreaterThan(0); + + } - private long createDestroyOneAuditor() throws Exception { + private long createDestroyOneAuditor(final int instance) throws Exception { long original = Runtime.getRuntime().freeMemory(); ExecutorService factory = Executors.newSingleThreadExecutor(); - final Future action = factory.submit(this::createAuditorAndWorkers); - final long count = action.get(); - LOG.info("Span count {}", count); + final Future action = factory.submit(this::createAuditorAndWorkers); + final int pruned = action.get(); + pruneCount += pruned; factory.shutdown(); factory.awaitTermination(60, TimeUnit.SECONDS); - System.gc(); + final long current = Runtime.getRuntime().freeMemory(); return current - original; } - private long createAuditorAndWorkers() + private int createAuditorAndWorkers() throws IOException, InterruptedException, ExecutionException { final Configuration conf = new Configuration(false); conf.set(AUDIT_SERVICE_CLASSNAME, MemoryHungryAuditor.NAME); @@ -103,20 +119,53 @@ private long createAuditorAndWorkers() LOG.info("Using {}", auditManager); // no guarentee every thread gets used - List> futures = new ArrayList<>(THREAD_COUNT); + List> futures = new ArrayList<>(THREAD_COUNT); + Set threadIds = new HashSet<>(); for (int i = 0; i < THREAD_COUNT; i++) { - futures.add(workers.submit(() -> worker(auditManager))); + futures.add(workers.submit(() -> spanningOperation(auditManager))); } - for (Future future : futures) { - future.get(); + for (Future future : futures) { + final Result r = future.get(); + threadIds.add(r.getThreadId()); } - return ((MemoryHungryAuditor)auditManager.getAuditor()).getSpanCount(); + // get rid of any references to spans other than in the weak ref map + futures = null; + // gc + System.gc(); + final int pruned = auditManager.prune(); + LOG.info("{} executed across {} threads and pruned {} entries", + auditManager, threadIds.size(), pruned); + return pruned; } } - private AuditSpanS3A worker(final ActiveAuditManagerS3A auditManager) throws IOException { - return auditManager.createSpan("span", null, null); + private Result spanningOperation(final ActiveAuditManagerS3A auditManager) + throws IOException, InterruptedException { + final AuditSpanS3A auditSpan = auditManager.getActiveAuditSpan(); + Assertions.assertThat(auditSpan) + .describedAs("audit span for current thread") + .isNotNull(); + Thread.sleep(200); + return new Result(Thread.currentThread().getId(), auditSpan); + } + + private static final class Result { + private final long threadId; + private final AuditSpanS3A auditSpan; + + public Result(final long threadId, final AuditSpanS3A auditSpan) { + this.threadId = threadId; + this.auditSpan = auditSpan; + } + + public long getThreadId() { + return threadId; + } + + public AuditSpanS3A getAuditSpan() { + return auditSpan; + } } } From 4240e9fcdc6676317cf48e5af71a2292099c0927 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 25 Jan 2022 18:58:43 +0000 Subject: [PATCH 05/12] HADOOP-18091. test fixed again. need to create lots of spans during the life of an auditor so the memory consumption is enough for GC to care. Change-Id: Iad3a350b8243031ad158c966e130bab77ab59fb3 --- .../TestActiveAuditManagerThreadLeakage.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestActiveAuditManagerThreadLeakage.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestActiveAuditManagerThreadLeakage.java index 39e7c70c148d8..519ad7e5f55d3 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestActiveAuditManagerThreadLeakage.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestActiveAuditManagerThreadLeakage.java @@ -65,7 +65,7 @@ public class TestActiveAuditManagerThreadLeakage extends AbstractHadoopTestBase private static final Logger LOG = LoggerFactory.getLogger(TestActiveAuditManagerThreadLeakage.class); - private static final int MANAGER_COUNT = 1000; + private static final int MANAGER_COUNT = 500; private static final int THREAD_COUNT = 20; private ExecutorService workers; private int pruneCount; @@ -141,13 +141,21 @@ private int createAuditorAndWorkers() } + /** + * The operation in each worker thread. + * @param auditManager audit manager + * @return result of the call + * @throws IOException troluble + */ private Result spanningOperation(final ActiveAuditManagerS3A auditManager) - throws IOException, InterruptedException { - final AuditSpanS3A auditSpan = auditManager.getActiveAuditSpan(); + throws IOException { + auditManager.getActiveAuditSpan(); + final AuditSpanS3A auditSpan = + auditManager.createSpan("span", null, null); Assertions.assertThat(auditSpan) .describedAs("audit span for current thread") .isNotNull(); - Thread.sleep(200); + Thread.yield(); return new Result(Thread.currentThread().getId(), auditSpan); } From af6a430389a0ab142adeac17529bd47af74a3210 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 26 Jan 2022 18:01:15 +0000 Subject: [PATCH 06/12] HADOOP-18091. tests for GC, map semantics. docs * hadoop common TestWeakReferenceMap for map semantics and recreating of entries on demand. * TestActiveAuditManagerThreadLeakage test enhanced a lot with tests of demand creation etc * auditing docs brought up to date * CommonAuditContext docs cover memory leak risks, as does audit arch. the put() call will log call stack if logging at trace, to provide a way to debug problems, if they ever surface. * s3a committers explicitly remove jobid in commit/abort ops. (manifest committer should will do the same) * auditing is still turned off for consistency with Hadoop 3.3.2 its there for support calls, people can deploy it in production when they see benefits, but there's no cost/risk to others. Change-Id: I63ec9ec35d80d25d9e3cf1eedaca3aff595a06fa --- .../hadoop/fs/audit/CommonAuditContext.java | 27 ++- .../apache/hadoop/util/WeakReferenceMap.java | 49 +++++ .../fs/audit/TestCommonAuditContext.java | 18 ++ .../hadoop/util/TestWeakReferenceMap.java | 199 ++++++++++++++++++ .../fs/s3a/audit/AWSRequestAnalyzer.java | 2 +- .../s3a/audit/impl/ActiveAuditManagerS3A.java | 28 ++- .../fs/s3a/commit/AbstractS3ACommitter.java | 3 + .../commit/magic/MagicS3GuardCommitter.java | 2 + .../s3a/commit/staging/StagingCommitter.java | 2 + .../markdown/tools/hadoop-aws/auditing.md | 11 +- .../tools/hadoop-aws/auditing_architecture.md | 110 +++++++++- .../fs/s3a/audit/MemoryHungryAuditor.java | 8 +- .../TestActiveAuditManagerThreadLeakage.java | 155 ++++++++++++-- .../src/test/resources/log4j.properties | 3 + 14 files changed, 571 insertions(+), 46 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWeakReferenceMap.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/audit/CommonAuditContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/audit/CommonAuditContext.java index 11681546e3d0a..e188e168e5313 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/audit/CommonAuditContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/audit/CommonAuditContext.java @@ -24,6 +24,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -69,11 +72,16 @@ * {@link #currentAuditContext()} to get the thread-local * context for the caller, which can then be manipulated. * + * For further information, especially related to memory consumption, + * read the document `auditing_architecture` in the `hadoop-aws` module. */ @InterfaceAudience.Public @InterfaceStability.Unstable public final class CommonAuditContext { + private static final Logger LOG = LoggerFactory.getLogger( + CommonAuditContext.class); + /** * Process ID; currently built from UUID and timestamp. */ @@ -92,7 +100,7 @@ public final class CommonAuditContext { * Supplier operations must themselves be thread safe. */ private final Map> evaluatedEntries = - new ConcurrentHashMap<>(); + new ConcurrentHashMap<>(1); static { // process ID is fixed. @@ -108,7 +116,7 @@ public final class CommonAuditContext { * the span is finalized. */ private static final ThreadLocal ACTIVE_CONTEXT = - ThreadLocal.withInitial(() -> createInstance()); + ThreadLocal.withInitial(CommonAuditContext::createInstance); private CommonAuditContext() { } @@ -125,11 +133,21 @@ public Supplier put(String key, String value) { /** * Put a context entry dynamically evaluated on demand. + * Important: as these supplier methods are long-lived, + * the supplier function MUST NOT be part of/refer to + * any object instance of significant memory size. + * Applications SHOULD remove references when they are + * no longer needed. + * When logged at TRACE, prints the key and stack trace of the caller, + * to allow for debugging of any problems. * @param key key * @param value new value * @return old value or null */ public Supplier put(String key, Supplier value) { + if (LOG.isTraceEnabled()) { + LOG.trace("Adding context entry {}", key, new Exception(key)); + } return evaluatedEntries.put(key, value); } @@ -138,6 +156,9 @@ public Supplier put(String key, Supplier value) { * @param key key */ public void remove(String key) { + if (LOG.isTraceEnabled()) { + LOG.trace("Remove context entry {}", key); + } evaluatedEntries.remove(key); } @@ -168,7 +189,7 @@ public void reset() { private void init() { // thread 1 is dynamic - put(PARAM_THREAD1, () -> currentThreadID()); + put(PARAM_THREAD1, CommonAuditContext::currentThreadID); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java index b90c4669f1729..323909fe4ece0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Function; @@ -69,6 +70,16 @@ public class WeakReferenceMap { */ private final Consumer referenceLost; + /** + * Counter of references lost. + */ + private final AtomicLong referenceLostCount = new AtomicLong(); + + /** + * Counter of entries created. + */ + private final AtomicLong entriesCreatedCount = new AtomicLong(); + /** * instantiate. * @param factory supplier of new instances @@ -82,6 +93,19 @@ public WeakReferenceMap( this.referenceLost = referenceLost; } + @Override + public String toString() { + return "WeakReferenceMap{" + + "size=" + size() + + ", referenceLostCount=" + referenceLostCount + + ", entriesCreatedCount=" + entriesCreatedCount + + '}'; + } + + public int size() { + return map.size(); + } + /** * Clear all entries. */ @@ -129,6 +153,7 @@ public V get(K key) { * @return the value */ public V create(K key) { + entriesCreatedCount.incrementAndGet(); WeakReference newRef = new WeakReference<>( requireNonNull(factory.apply(key))); map.put(key, newRef); @@ -137,6 +162,9 @@ public V create(K key) { /** * Put a value under the key. + * A null value can be put, though on a get() call + * a new entry is generated + * * @param key key * @param value value * @return any old non-null reference. @@ -202,8 +230,29 @@ public int prune() { * @param key key of lost reference */ private void noteLost(final K key) { + // incrment local counter + referenceLostCount.incrementAndGet(); + + // and call any notification function supplied in the constructor if (referenceLost != null) { referenceLost.accept(key); } } + + /** + * Get count of references lost as detected + * during prune() or get() calls. + * @return count of references lost + */ + public final long getReferenceLostCount() { + return referenceLostCount.get(); + } + + /** + * Get count of entries created on demand. + * @return count of entries created + */ + public final long getEntriesCreatedCount() { + return entriesCreatedCount.get(); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/audit/TestCommonAuditContext.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/audit/TestCommonAuditContext.java index 798841a2d6905..9782eb276d306 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/audit/TestCommonAuditContext.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/audit/TestCommonAuditContext.java @@ -132,6 +132,15 @@ private AbstractStringAssert assertContextValue(final String key) { .describedAs("Value of context element %s", key) .isNotBlank(); } + /** + * Assert a context value is null. + * @param key key to look up + */ + private void assertContextValueIsNull(final String key) { + assertThat(context.get(key)) + .describedAs("Value of context element %s", key) + .isNull(); + } @Test public void testNoteEntryPoint() throws Throwable { @@ -158,4 +167,13 @@ private AbstractStringAssert assertGlobalEntry(final String key) { return anAssert; } + @Test + public void testAddRemove() throws Throwable { + final String key = "testAddRemove"; + assertContextValueIsNull(key); + context.put(key, key); + assertContextValue(key).isEqualTo(key); + context.remove(key); + assertContextValueIsNull(key); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWeakReferenceMap.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWeakReferenceMap.java new file mode 100644 index 0000000000000..cf15743ea06a1 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWeakReferenceMap.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import java.util.ArrayList; +import java.util.List; + +import org.assertj.core.api.Assertions; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.test.AbstractHadoopTestBase; + +/** + * Test {@link WeakReferenceMap}. + * There's no attempt to force GC here, so the tests are + * more about the basic behavior not the handling of empty references. + */ +public class TestWeakReferenceMap extends AbstractHadoopTestBase { + + public static final String FACTORY_STRING = "recreated %d"; + + /** + * The map to test. + */ + private WeakReferenceMap referenceMap; + + /** + * List of references notified of loss. + */ + private List lostReferences; + + @Before + public void setup() { + lostReferences = new ArrayList<>(); + referenceMap = new WeakReferenceMap<>( + this::factory, + this::referenceLost); + } + + /** + * Reference lost callback. + * @param key key lost + */ + private void referenceLost(Integer key) { + lostReferences.add(key); + } + + + /** + * Basic insertions and lookups of those values. + */ + @Test + public void testBasicOperationsWithValidReferences() { + + referenceMap.put(1, "1"); + referenceMap.put(2, "2"); + assertMapSize(2); + assertMapContainsKey(1); + assertMapEntryEquals(1, "1"); + assertMapEntryEquals(2, "2"); + // overwrite + referenceMap.put(1, "3"); + assertMapEntryEquals(1, "3"); + + // remove an entry + referenceMap.remove(1); + assertMapDoesNotContainKey(1); + assertMapSize(1); + + // clear the map + referenceMap.clear(); + assertMapSize(0); + } + + /** + * pruning removes null entries, leaves the others alone. + */ + @Test + public void testPruneNullEntries() { + referenceMap.put(1, "1"); + assertPruned(0); + referenceMap.put(2, null); + assertMapSize(2); + assertPruned(1); + assertMapSize(1); + assertMapDoesNotContainKey(2); + assertMapEntryEquals(1, "1"); + assertLostCount(1); + } + + /** + * Demand create entries. + */ + @Test + public void testDemandCreateEntries() { + + // ask for an unknown key and expect a generated value + assertMapEntryEquals(1, factory(1)); + assertMapSize(1); + assertMapContainsKey(1); + assertLostCount(0); + + // an empty ref has the same outcome + referenceMap.put(2, null); + assertMapEntryEquals(2, factory(2)); + // but the lost coun goes up + assertLostCount(1); + + } + + /** + * Assert that the value of a map entry is as expected. + * Will trigger entry creation if the key is absent. + * @param key key + * @param val expected valued + */ + private void assertMapEntryEquals(int key, String val) { + Assertions.assertThat(referenceMap.get(key)) + .describedAs("map enty of key %d", key) + .isEqualTo(val); + } + + /** + * Assert that a map entry is present. + * @param key key + */ + private void assertMapContainsKey(int key) { + Assertions.assertThat(referenceMap.containsKey(key)) + .describedAs("map enty of key %d should be present", key) + .isTrue(); + } + + /** + * Assert that a map entry is not present. + * @param key key + */ + private void assertMapDoesNotContainKey(int key) { + Assertions.assertThat(referenceMap.containsKey(key)) + .describedAs("map enty of key %d should be absent", key) + .isFalse(); + } + + /** + * Assert map size. + * @param size expected size. + */ + private void assertMapSize(int size) { + Assertions.assertThat(referenceMap.size()) + .describedAs("size of map %s", referenceMap) + .isEqualTo(size); + } + + /** + * Assert prune returned the given count. + * @param count expected count. + */ + private void assertPruned(int count) { + Assertions.assertThat(referenceMap.prune()) + .describedAs("number of entries pruned from map %s", referenceMap) + .isEqualTo(count); + } + + /** + * Assert number of entries lost matches expected count. + * @param count expected count. + */ + private void assertLostCount(int count) { + Assertions.assertThat(lostReferences) + .describedAs("number of entries lost from map %s", referenceMap) + .hasSize(count); + } + + /** + * Factory operation. + * @param key map key + * @return a string + */ + private String factory(Integer key) { + return String.format(FACTORY_STRING, key); + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java index a5a6dbc84797f..f2963d7319205 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java @@ -210,7 +210,7 @@ private RequestInfo writing(final String verb, * @param request request * @return true if the transfer manager creates them. */ - public static final boolean + public static boolean isRequestNotAlwaysInSpan(final Object request) { return request instanceof CopyPartRequest || request instanceof CompleteMultipartUploadRequest diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java index c53569368179d..14fb3e679a1ad 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java @@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; @@ -138,7 +139,7 @@ public final class ActiveAuditManagerS3A * the unbonded span. */ - private final WeakReferenceThreadMap activeSpan = + private final WeakReferenceThreadMap activeSpanMap = new WeakReferenceThreadMap<>( (k) -> getUnbondedSpan(), this::noteSpanReferenceLost); @@ -187,6 +188,13 @@ protected void serviceStart() throws Exception { LOG.debug("Started audit service {}", auditor); } + @Override + protected void serviceStop() throws Exception { + // clear all references. + activeSpanMap.clear(); + super.serviceStop(); + } + @Override public String toString() { final StringBuilder sb = new StringBuilder(super.toString()); @@ -234,7 +242,7 @@ public AuditSpanS3A getActiveAuditSpan() { * @return the active WrappingAuditSpan */ private WrappingAuditSpan activeSpan() { - return activeSpan.getForCurrentThread(); + return activeSpanMap.getForCurrentThread(); } /** @@ -256,9 +264,9 @@ private AuditSpanS3A setActiveThreadSpan(AuditSpanS3A span) { */ private WrappingAuditSpan switchToActiveSpan(WrappingAuditSpan span) { if (span != null && span.isValidSpan()) { - activeSpan.setForCurrentThread(span); + activeSpanMap.setForCurrentThread(span); } else { - activeSpan.removeForCurrentThread(); + activeSpanMap.removeForCurrentThread(); } return activeSpan(); } @@ -282,7 +290,17 @@ private void noteSpanReferenceLost(long threadId) { * @return the number of entries pruned. */ public int prune() { - return activeSpan.prune(); + return activeSpanMap.prune(); + } + + /** + * Get the map of threads to active spans; allows + * for testing of weak reference resolution after GC. + * @return the span map + */ + @VisibleForTesting + public WeakReferenceThreadMap getActiveSpanMap() { + return activeSpanMap; } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java index b2c0c5e1b8b9a..f08d6448e4993 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java @@ -904,6 +904,8 @@ public void commitJob(JobContext context) throws IOException { jobCompleted(false); abortJobInternal(context, true); throw e; + } finally { + resetCommonContext(); } } @@ -946,6 +948,7 @@ protected void cleanup(JobContext context, } finally { destroyThreadPool(); cleanupStagingDirs(); + resetCommonContext(); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java index 34bbfd4ed76e2..c1ecd7d6b9b53 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java @@ -169,6 +169,7 @@ public void commitTask(TaskAttemptContext context) throws IOException { // delete the task attempt so there's no possibility of a second attempt deleteTaskAttemptPathQuietly(context); destroyThreadPool(); + resetCommonContext(); } getCommitOperations().taskCompleted(true); LOG.debug("aggregate statistics\n{}", @@ -252,6 +253,7 @@ public void abortTask(TaskAttemptContext context) throws IOException { attemptPath.getFileSystem(context.getConfiguration()), attemptPath, true); destroyThreadPool(); + resetCommonContext(); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java index 6d75cb2dd3c79..121ea7f851c02 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java @@ -600,6 +600,7 @@ public void commitTask(TaskAttemptContext context) throws IOException { throw e; } finally { destroyThreadPool(); + resetCommonContext(); } getCommitOperations().taskCompleted(true); } @@ -739,6 +740,7 @@ public void abortTask(TaskAttemptContext context) throws IOException { throw e; } finally { destroyThreadPool(); + resetCommonContext(); } } diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing.md index 54fbdd11a8609..17f0c9ad54326 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing.md @@ -24,13 +24,16 @@ this document covers its use. ## Important: Auditing is disabled by default -Due to a memory leak from the use of `ThreadLocal` fields, this auditing feature leaks memory as S3A filesystem -instances are created and deleted. -This causes problems in long-lived processes which either do not re-use filesystem +Due to a memory leak from the use of `ThreadLocal` fields, this auditing feature +leaked memory as S3A filesystem instances were created and deleted. +This caused problems in long-lived processes which either do not re-use filesystem instances, or attempt to delete all instances belonging to specific users. See [HADOOP-18091](https://issues.apache.org/jira/browse/HADOOP-18091) _S3A auditing leaks memory through ThreadLocal references_. -To avoid these memory leaks, auditing is disabled by default. +To avoid these memory leaks, auditing was disabled by default in the hadoop 3.3.2 release. + +Although these memory leaks have been fixed, for safety and consistency, +auditing is disabled by default. To turn auditing on, set `fs.s3a.audit.enabled` to `true`. diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing_architecture.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing_architecture.md index a4f4fe445ec96..1238182e7f894 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing_architecture.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing_architecture.md @@ -119,16 +119,81 @@ The auditor then creates and returns a span for the specific operation. The AuditManagerS3A will automatically activate the span returned by the auditor (i.e. assign it the thread local variable tracking the active span in each thread). -### Memory Leakage through `ThreadLocal` use +### Memory Leakage through `ThreadLocal` misuse -This architecture contains a critical defect, +The original implementation of the integration with the S3AFileSystem class +contained a critical defect, [HADOOP-18091](https://issues.apache.org/jira/browse/HADOOP-18091) _S3A auditing leaks memory through ThreadLocal references_. -The code was written assuming that when the `ActiveAuditManagerS3A` service is -stopped, it's `ThreadLocal` fields would be freed. -In fact, they are retained until the threads with references are terminated. +The original code was written with the assumption that when the `ActiveAuditManagerS3A` service was +garbage collected, references in its `ThreadLocal` field would be freed. +In fact, they are retained until all threads with references are terminated. +If any long-lived thread had performed an s3 operation which created a span, +a reference back to the audit manager instance was created +*whose lifetime was that of the thread* + +In short-lived processes, and long-lived processes where a limited set of +`S3AFileSystem` instances were reused, this had no adverse effect. +Indeed, if the filesystem instances were retained in the cache until +the process was shut down, there would be strong references to the owning +`S3AFileSystem` instance anyway. + +Where it did have problems was when the following conditions were met +1. Process was long-lived. +2. Long-lived threads in the process invoked filesystem operations on `s3a://` URLs. +3. Either `S3AFileSystem` instances were created repeatedly, rather than retrieved + from the cache of active instances. +4. Or, after a query for a specific user was completed, + `FileSystem.closeAllForUGI(UserGroupInformation)` was invoked to remove all + cached FS instances of that user. + +Conditions 1, 2 and 4 are exactly those which long-lived Hive services can +encounter. + +_Auditing was disabled by default until a fix was implemented._ + +The memory leak has been fixed using a new class `org.apache.hadoop.util.WeakReferenceMap` +to store a map of thread IDs to active spans. When the S3A fileystem is closed, +its audit manager service is stopped and all references to spans removed from the +map of thread ID to span. + +Weak References are used for the map so that span references do not consume memory even if +threads are terminated without resetting the span reference of that thread. +There is therefore a theoretical risk that if a garbage collection takes place during +execution of a spanned operation, the reference will be lost. + +This is not considered an issue as all bounded entry points into the S3A filesystem +retain a strong reference to their audit span. + +All entry points which return an object which can invoke s3 operations (input and output +streams, list iterators, etc.) also retain a strong reference to their span, a reference +they activate before performing S3 operations. +A factory method is provided to demand-generate a new span if, somehow, these conditions +are not met. The "unbounded span" is used here. +Except in deployments where `fs.s3a.audit.reject.out.of.span.operations` is true, +invoking S3 operations within the unbounded span are permitted. +That option is set to `true` within S3A test suites. +Therefore it is unlikely that any operations are invoked in unbounded spans except +for the special case of copy operations invoked by the transfer manager threads. +Those are already ignored in the logging auditor, whose unbounded span ignores +requests which `AWSRequestAnalyzer.isRequestNotAlwaysInSpan()` indicates +may happen outside of a span. +This is restricted to bucket location probes possibly performed by the SDK +on instantiatio, and copy part/complete calls. + + +```java + public static boolean + isRequestNotAlwaysInSpan(final Object request) { + return request instanceof CopyPartRequest + || request instanceof CompleteMultipartUploadRequest + || request instanceof GetBucketLocationRequest; + } +``` + +Auditing is still disabled by default for consistency with previous releases. + -This is why auditing is now disabled by default until a fix is implemented. ### Class `org.apache.hadoop.fs.audit.CommonAuditContext` @@ -149,6 +214,39 @@ Spans may be used on different thread from that which they were created. Spans MUST always use the values from the `currentAuditContext()` in the creation thread. +#### Memory Usage of `CommonAuditContext` + +The `CommonAuditContext` map has a `ThreadLocal` field to store global +information which is intended to span multiple operations across multiple +filesystems, for example the MapReduce or Spark job ID, which is set +in the S3A committers. + +Applications and Hadoop code MUST NOT attach context entries +which directly or indirectly consumes lots of memory, as the life +of that memory use will become that of the thread. + +Applications and Hadoop code SHOULD remove context entries when +no-longer needed. + +If memory leakage is suspected here, set the log +`org.apache.hadoop.fs.audit.CommonAuditContext` to `TRACE` +to log the origin of operations which add log entries. + +This will produce a log entry whose strack trace will show the caller chain.f +``` +2022-01-26 16:10:28,384 TRACE audit.CommonAuditContext (CommonAuditContext.java:put(149)) - Adding context entry t1 +java.lang.Exception: t1 + at org.apache.hadoop.fs.audit.CommonAuditContext.put(CommonAuditContext.java:149) + at org.apache.hadoop.fs.audit.CommonAuditContext.init(CommonAuditContext.java:192) + at org.apache.hadoop.fs.audit.CommonAuditContext.createInstance(CommonAuditContext.java:210) + at org.apache.hadoop.fs.audit.CommonAuditContext.lambda$static$0(CommonAuditContext.java:119) + at java.lang.ThreadLocal$SuppliedThreadLocal.initialValue(ThreadLocal.java:284) + at java.lang.ThreadLocal.setInitialValue(ThreadLocal.java:180) + at java.lang.ThreadLocal.get(ThreadLocal.java:170) + at org.apache.hadoop.fs.audit.CommonAuditContext.currentAuditContext(CommonAuditContext.java:219) + at org.apache.hadoop.fs.audit.TestCommonAuditContext.(TestCommonAuditContext.java:54) +``` + ### class `NoopAuditor` diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/MemoryHungryAuditor.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/MemoryHungryAuditor.java index baedda25515f2..63e7922001e8b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/MemoryHungryAuditor.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/MemoryHungryAuditor.java @@ -29,9 +29,7 @@ /** - * An audit service which returns the {@link NoopSpan}. - * Even though the spans are no-ops, each span is still - * created with a unique span ID. + * An audit service which consumes lots of memory. */ public class MemoryHungryAuditor extends AbstractOperationAuditor { @@ -110,7 +108,7 @@ public static long getInstanceCount() { /** * A span which consumes a lot of memory. */ - private static class MemorySpan extends AbstractAuditSpanImpl { + private static final class MemorySpan extends AbstractAuditSpanImpl { private final byte[] data = new byte[SPAN_SIZE]; @@ -120,13 +118,11 @@ private MemorySpan(final String spanId, final String operationName) { @Override public AuditSpanS3A activate() { - return this; } @Override public void deactivate() { - } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestActiveAuditManagerThreadLeakage.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestActiveAuditManagerThreadLeakage.java index 519ad7e5f55d3..f32540245dd9a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestActiveAuditManagerThreadLeakage.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestActiveAuditManagerThreadLeakage.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.s3a.audit; import java.io.IOException; +import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.impl.WeakReferenceThreadMap; import org.apache.hadoop.fs.s3a.audit.impl.ActiveAuditManagerS3A; import org.apache.hadoop.test.AbstractHadoopTestBase; @@ -54,8 +56,8 @@ * memory demands will be high, and if the closed instances * don't get cleaned up, memory runs out. * GCs are forced. - * It is critical no spans are created in the junit thread because they will last for - * the duration of the test JVM. + * It is critical no spans are created in the junit thread because they will + * last for the duration of the test JVM. */ @SuppressWarnings("ResultOfMethodCallIgnored") public class TestActiveAuditManagerThreadLeakage extends AbstractHadoopTestBase { @@ -70,11 +72,42 @@ public class TestActiveAuditManagerThreadLeakage extends AbstractHadoopTestBase private ExecutorService workers; private int pruneCount; + /** + * As audit managers are created they are added to the list, + * so we can verify they get GC'd.` + */ + private final List> auditManagers = + new ArrayList<>(); + + /** + * When the service is stopped, the span map is + * cleared immediately. + */ + @Test + public void testSpanMapClearedInServiceStop() throws IOException { + try (ActiveAuditManagerS3A auditManager = + new ActiveAuditManagerS3A(emptyStatisticsStore())) { + auditManager.init(createMemoryHungryConfiguration()); + auditManager.start(); + auditManager.getActiveAuditSpan(); + // get the span map + final WeakReferenceThreadMap spanMap + = auditManager.getActiveSpanMap(); + Assertions.assertThat(spanMap.size()) + .describedAs("map size") + .isEqualTo(1); + auditManager.stop(); + Assertions.assertThat(spanMap.size()) + .describedAs("map size") + .isEqualTo(0); + } + } + @Test public void testMemoryLeak() throws Throwable { workers = Executors.newFixedThreadPool(THREAD_COUNT); for (int i = 0; i < MANAGER_COUNT; i++) { - final long oneAuditConsumption = createDestroyOneAuditor(i); + final long oneAuditConsumption = createAndTestOneAuditor(); LOG.info("manager {} memory retained {}", i, oneAuditConsumption); } @@ -87,18 +120,27 @@ public void testMemoryLeak() throws Throwable { Assertions.assertThat(pruneCount) .describedAs("Totel prune count") - .isGreaterThan(0); - + .isNotZero(); + // now count number of audit managers GC'd + // some must have been GC'd, showing that no other + // references are being retained internally. + Assertions.assertThat(auditManagers.stream() + .filter((r) -> r.get() == null) + .count()) + .describedAs("number of audit managers garbage collected") + .isNotZero(); } - private long createDestroyOneAuditor(final int instance) throws Exception { + /** + * Create, use and then shutdown one auditor in a unique thread. + * @return memory consumed/released +f */ + private long createAndTestOneAuditor() throws Exception { long original = Runtime.getRuntime().freeMemory(); ExecutorService factory = Executors.newSingleThreadExecutor(); - final Future action = factory.submit(this::createAuditorAndWorkers); - final int pruned = action.get(); - pruneCount += pruned; + pruneCount += factory.submit(this::createAuditorAndWorkers).get(); factory.shutdown(); factory.awaitTermination(60, TimeUnit.SECONDS); @@ -109,38 +151,109 @@ private long createDestroyOneAuditor(final int instance) throws Exception { } + /** + * This is the core of the test. + * Create an audit manager and spans across multiple threads. + * The spans are created in the long-lived pool, so if there is + * any bonding of the life of managers/spans to that of threads, + * it will surface as OOM events. + * @return count of weak references whose reference values were + * nullified. + */ private int createAuditorAndWorkers() throws IOException, InterruptedException, ExecutionException { - final Configuration conf = new Configuration(false); - conf.set(AUDIT_SERVICE_CLASSNAME, MemoryHungryAuditor.NAME); - try (ActiveAuditManagerS3A auditManager = new ActiveAuditManagerS3A(emptyStatisticsStore())){ - auditManager.init(conf); + try (ActiveAuditManagerS3A auditManager = + new ActiveAuditManagerS3A(emptyStatisticsStore())) { + auditManager.init(createMemoryHungryConfiguration()); auditManager.start(); LOG.info("Using {}", auditManager); - // no guarentee every thread gets used + auditManagers.add(new WeakReference<>(auditManager)); + + // no guarantee every thread gets used, so track + // in a set. This will give us the thread ID of every + // entry in the map. List> futures = new ArrayList<>(THREAD_COUNT); Set threadIds = new HashSet<>(); + + // perform the spanning operation in a long lived thread. for (int i = 0; i < THREAD_COUNT; i++) { futures.add(workers.submit(() -> spanningOperation(auditManager))); } + // get the results and so determine the thread IDs for (Future future : futures) { final Result r = future.get(); threadIds.add(r.getThreadId()); } + final int threadsUsed = threadIds.size(); + final Long[] threadIdArray = threadIds.toArray(new Long[0]); + // get rid of any references to spans other than in the weak ref map futures = null; + // gc System.gc(); - final int pruned = auditManager.prune(); - LOG.info("{} executed across {} threads and pruned {} entries", - auditManager, threadIds.size(), pruned); - return pruned; + // get the span map + final WeakReferenceThreadMap spanMap + = auditManager.getActiveSpanMap(); + + // count number of spans removed + final long derefenced = threadIds.stream() + .filter((id) -> !spanMap.containsKey(id)) + .count(); + if (derefenced > 0) { + LOG.info("{} executed across {} threads and dereferenced {} entries", + auditManager, threadsUsed, derefenced); + } + + // resolve not quite all of the threads. + // why not all? leaves at least one for pruning + // but it does complicate some of the assertions... + int spansRecreated = 0; + int subset = threadIdArray.length - 1; + LOG.info("Resolving {} thread references", subset); + for (int i = 0; i < subset; i++) { + final long id = threadIdArray[i]; + final boolean present = spanMap.containsKey(id); + Assertions.assertThat(spanMap.get(id)) + .describedAs("Span map entry for thread %d", id) + .isNotNull(); + if (!present) { + spansRecreated++; + } + } + LOG.info("Recreated {} spans", subset); + + // if the number of spans lost is more than the number + // of entries not probed, then at least one span was + // recreated + if (derefenced > threadIdArray.length - subset) { + Assertions.assertThat(spansRecreated) + .describedAs("number of recreated spans") + .isGreaterThan(0); + } + + // now prune. + int pruned = auditManager.prune(); + if (pruned > 0) { + LOG.info("{} executed across {} threads and pruned {} entries", + auditManager, threadsUsed, pruned); + } + Assertions.assertThat(pruned) + .describedAs("Count of references pruned") + .isEqualTo(derefenced - spansRecreated); + return pruned + (int)derefenced; } } + private Configuration createMemoryHungryConfiguration() { + final Configuration conf = new Configuration(false); + conf.set(AUDIT_SERVICE_CLASSNAME, MemoryHungryAuditor.NAME); + return conf; + } + /** * The operation in each worker thread. * @param auditManager audit manager @@ -163,16 +276,16 @@ private static final class Result { private final long threadId; private final AuditSpanS3A auditSpan; - public Result(final long threadId, final AuditSpanS3A auditSpan) { + private Result(final long threadId, final AuditSpanS3A auditSpan) { this.threadId = threadId; this.auditSpan = auditSpan; } - public long getThreadId() { + private long getThreadId() { return threadId; } - public AuditSpanS3A getAuditSpan() { + private AuditSpanS3A getAuditSpan() { return auditSpan; } } diff --git a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties index 637d015a091f2..8442d9ff4b207 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties @@ -82,3 +82,6 @@ log4j.logger.org.apache.hadoop.fs.s3a.S3AStorageStatistics=INFO #log4j.logger.org.apache.hadoop.fs.s3a.audit=DEBUG # log request creation, span lifecycle and other low-level details #log4j.logger.org.apache.hadoop.fs.s3a.audit=TRACE + +# uncomment this to trace where context entries are set +# log4j.logger.org.apache.hadoop.fs.audit.CommonAuditContext=TRACE \ No newline at end of file From 6b6a621a4423e97e2777733e286cf94077e6ee2e Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 27 Jan 2022 13:39:42 +0000 Subject: [PATCH 07/12] HADOOP-18091. Add regular prunings on span deactivation If, somehow, spans were not being deactivated from threads which where then destroyed, then the map would fill up with weak references to those spans. They wouldn't stop GC-ing of the audit manager, but they would fill up the hash table. Now, when a span is deactivated. 1. its entry is removed from the map (not replaced with the unbounded one, as that will happen on demand) 2. when the unbounded span is active on a thread, its deactivate() call also removes it from the map. 3. every so often (PRUNE_THRESHOLD = 10000), a full prune of the reference map is performed. this will remove any null references. Because of 1 & 2, when valid or unbound spans are deactivated, the map shrinks. Because of 3, even if threads are destroyed with valid spans active, after a GC the fact that they have been GC'd will surface. This doesn't remove map entries with unbonded spans active as the strong references held to them means they get GC'd. We could do that with a prune() in the audit manager removing all entries where !isValid() is true. Change-Id: I62fc9d66216dd8fdafb22d8b57dcdd6d311915e3 --- .../fs/impl/WeakReferenceThreadMap.java | 2 +- .../apache/hadoop/util/WeakReferenceMap.java | 5 +- .../s3a/audit/impl/ActiveAuditManagerS3A.java | 74 +++++++-- .../TestActiveAuditManagerThreadLeakage.java | 154 +++++++++++++++--- 4 files changed, 195 insertions(+), 40 deletions(-) rename hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/{ => impl}/TestActiveAuditManagerThreadLeakage.java (66%) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java index a00e3fa47f425..2f6a62e92afc0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java @@ -43,7 +43,7 @@ public V removeForCurrentThread() { return remove(curentThreadId()); } - private long curentThreadId() { + public long curentThreadId() { return Thread.currentThread().getId(); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java index 323909fe4ece0..cdc85dcb7cafa 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java @@ -57,7 +57,6 @@ public class WeakReferenceMap { */ private final Map> map = new ConcurrentHashMap<>(); - /** * Supplier of new instances. */ @@ -102,6 +101,10 @@ public String toString() { '}'; } + /** + * Map size. + * @return the current map size. + */ public int size() { return map.size(); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java index 14fb3e679a1ad..46221cdf03e5a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java @@ -23,6 +23,7 @@ import java.lang.reflect.Constructor; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import com.amazonaws.AmazonWebServiceRequest; import com.amazonaws.HandlerContextAware; @@ -33,7 +34,6 @@ import com.amazonaws.handlers.HandlerBeforeAttemptContext; import com.amazonaws.handlers.RequestHandler2; import com.amazonaws.http.HttpResponse; -import com.amazonaws.services.s3.transfer.Transfer; import com.amazonaws.services.s3.transfer.internal.TransferStateChangeListener; import org.apache.hadoop.util.Preconditions; import org.slf4j.Logger; @@ -118,6 +118,13 @@ public final class ActiveAuditManagerS3A public static final String NOT_A_WRAPPED_SPAN = "Span attached to request is not a wrapped span"; + /** + * Arbritary threshold for triggering pruning on deactivation. + * High enough it doesn't happen very often, low enough + * that it will happen regularly on a busy system. + */ + static final int PRUNE_THRESHOLD = 10000; + /** * Audit service. */ @@ -134,6 +141,18 @@ public final class ActiveAuditManagerS3A */ private WrappingAuditSpan unbondedSpan; + /** + * How many spans have to be deactivated before a prune is triggered? + * Fixed as a constant for now unless/until some pressing need + * for it to be made configurable ever surfaces. + */ + private final int pruneThreshold = PRUNE_THRESHOLD; + + /** + * Count down to next pruning. + */ + private final AtomicInteger deactivationsBeforePrune = new AtomicInteger(); + /** * Thread local span. This defaults to being * the unbonded span. @@ -157,6 +176,7 @@ public final class ActiveAuditManagerS3A public ActiveAuditManagerS3A(final IOStatisticsStore iostatistics) { super("ActiveAuditManagerS3A"); this.ioStatisticsStore = iostatistics; + this.deactivationsBeforePrune.set(pruneThreshold); } @Override @@ -289,17 +309,38 @@ private void noteSpanReferenceLost(long threadId) { * non-atomic and non-blocking. * @return the number of entries pruned. */ - public int prune() { + @VisibleForTesting + int prune() { return activeSpanMap.prune(); } + /** + * remove the span from the reference map, shrinking the map in the process. + * if/when a new span is activated in the thread, a new entry will be created. + * and if queried for a span, the unbounded span will be automatically + * added to the map for this thread ID. + * + */ + @VisibleForTesting + boolean removeActiveSpanFromMap() { + // remove from the map + activeSpanMap.removeForCurrentThread(); + if (deactivationsBeforePrune.decrementAndGet() == 0) { + // trigger a prune + activeSpanMap.prune(); + deactivationsBeforePrune.set(pruneThreshold); + return true; + } + return false; + } + /** * Get the map of threads to active spans; allows * for testing of weak reference resolution after GC. * @return the span map */ @VisibleForTesting - public WeakReferenceThreadMap getActiveSpanMap() { + WeakReferenceThreadMap getActiveSpanMap() { return activeSpanMap; } @@ -380,13 +421,7 @@ public List createRequestHandlers() @Override public TransferStateChangeListener createStateChangeListener() { final WrappingAuditSpan span = activeSpan(); - return new TransferStateChangeListener() { - @Override - public void transferStateChanged(final Transfer transfer, - final Transfer.TransferState state) { - switchToActiveSpan(span); - } - }; + return (transfer, state) -> switchToActiveSpan(span); } @Override @@ -690,16 +725,21 @@ public AuditSpanS3A activate() { */ @Override public void deactivate() { - // no-op for invalid spans, + + // span is inactive; ignore + if (!isActive()) { + return; + } + // skipped for invalid spans, // so as to prevent the unbounded span from being closed // and everything getting very confused. - if (!isValid || !isActive()) { - return; + if (isValid) { + // deactivate the span + span.deactivate(); } - // deactivate the span - span.deactivate(); - // and go to the unbounded one. - switchToActiveSpan(getUnbondedSpan()); + // remove the span from the reference map, + // sporadically triggering a prune operation. + removeActiveSpanFromMap(); } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestActiveAuditManagerThreadLeakage.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/impl/TestActiveAuditManagerThreadLeakage.java similarity index 66% rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestActiveAuditManagerThreadLeakage.java rename to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/impl/TestActiveAuditManagerThreadLeakage.java index f32540245dd9a..0f69f95c8cbf0 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestActiveAuditManagerThreadLeakage.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/impl/TestActiveAuditManagerThreadLeakage.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.fs.s3a.audit; +package org.apache.hadoop.fs.s3a.audit.impl; import java.io.IOException; import java.lang.ref.WeakReference; @@ -29,6 +29,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.assertj.core.api.Assertions; import org.junit.Test; @@ -37,13 +38,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.impl.WeakReferenceThreadMap; -import org.apache.hadoop.fs.s3a.audit.impl.ActiveAuditManagerS3A; +import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A; +import org.apache.hadoop.fs.s3a.audit.MemoryHungryAuditor; import org.apache.hadoop.test.AbstractHadoopTestBase; import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.AUDIT_SERVICE_CLASSNAME; +import static org.apache.hadoop.fs.s3a.audit.impl.ActiveAuditManagerS3A.PRUNE_THRESHOLD; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.emptyStatisticsStore; - /** * This test attempts to recreate the OOM problems of * HADOOP-18091. S3A auditing leaks memory through ThreadLocal references @@ -67,9 +69,17 @@ public class TestActiveAuditManagerThreadLeakage extends AbstractHadoopTestBase private static final Logger LOG = LoggerFactory.getLogger(TestActiveAuditManagerThreadLeakage.class); + /** how many managers to sequentially create. */ private static final int MANAGER_COUNT = 500; + + /** size of long lived hread pool. */ private static final int THREAD_COUNT = 20; private ExecutorService workers; + + /** + * count of prunings which have taken place in the manager lifecycle + * operations. + */ private int pruneCount; /** @@ -126,8 +136,8 @@ public void testMemoryLeak() throws Throwable { // some must have been GC'd, showing that no other // references are being retained internally. Assertions.assertThat(auditManagers.stream() - .filter((r) -> r.get() == null) - .count()) + .filter((r) -> r.get() == null) + .count()) .describedAs("number of audit managers garbage collected") .isNotZero(); } @@ -135,7 +145,7 @@ public void testMemoryLeak() throws Throwable { /** * Create, use and then shutdown one auditor in a unique thread. * @return memory consumed/released -f */ + */ private long createAndTestOneAuditor() throws Exception { long original = Runtime.getRuntime().freeMemory(); ExecutorService factory = Executors.newSingleThreadExecutor(); @@ -145,14 +155,13 @@ private long createAndTestOneAuditor() throws Exception { factory.shutdown(); factory.awaitTermination(60, TimeUnit.SECONDS); - final long current = Runtime.getRuntime().freeMemory(); return current - original; } /** - * This is the core of the test. + * This is the core of the leakage test. * Create an audit manager and spans across multiple threads. * The spans are created in the long-lived pool, so if there is * any bonding of the life of managers/spans to that of threads, @@ -173,25 +182,29 @@ private int createAuditorAndWorkers() // in a set. This will give us the thread ID of every // entry in the map. - List> futures = new ArrayList<>(THREAD_COUNT); Set threadIds = new HashSet<>(); - // perform the spanning operation in a long lived thread. - for (int i = 0; i < THREAD_COUNT; i++) { - futures.add(workers.submit(() -> spanningOperation(auditManager))); - } + // use a code block here so that futures becomes null after use + // without having to set it to null and then the IDE/spotbugs + // complaining about unused assignments + { + List> futures; + futures = new ArrayList<>(THREAD_COUNT); + + // perform the spanning operation in a long lived thread. + for (int i = 0; i < THREAD_COUNT; i++) { + futures.add(workers.submit(() -> spanningOperation(auditManager))); + } - // get the results and so determine the thread IDs - for (Future future : futures) { - final Result r = future.get(); - threadIds.add(r.getThreadId()); + // get the results and so determine the thread IDs + for (Future future : futures) { + final Result r = future.get(); + threadIds.add(r.getThreadId()); + } } final int threadsUsed = threadIds.size(); final Long[] threadIdArray = threadIds.toArray(new Long[0]); - // get rid of any references to spans other than in the weak ref map - futures = null; - // gc System.gc(); // get the span map @@ -215,10 +228,18 @@ private int createAuditorAndWorkers() LOG.info("Resolving {} thread references", subset); for (int i = 0; i < subset; i++) { final long id = threadIdArray[i]; + + // note whether or not the span is present final boolean present = spanMap.containsKey(id); + + // get the the span for that ID. which must never be + // null Assertions.assertThat(spanMap.get(id)) .describedAs("Span map entry for thread %d", id) .isNotNull(); + + // if it wasn't present, the unbounded span must therefore have been + // bounded to this map entry. if (!present) { spansRecreated++; } @@ -243,7 +264,7 @@ private int createAuditorAndWorkers() Assertions.assertThat(pruned) .describedAs("Count of references pruned") .isEqualTo(derefenced - spansRecreated); - return pruned + (int)derefenced; + return pruned + (int) derefenced; } } @@ -268,12 +289,19 @@ private Result spanningOperation(final ActiveAuditManagerS3A auditManager) Assertions.assertThat(auditSpan) .describedAs("audit span for current thread") .isNotNull(); + // this is needed to ensure that more of the thread pool is used up Thread.yield(); return new Result(Thread.currentThread().getId(), auditSpan); } + /** + * Result of the spanning operation. + */ private static final class Result { + /** thread operation took place in. */ private final long threadId; + + /** generated span. not currently used in tests. */ private final AuditSpanS3A auditSpan; private Result(final long threadId, final AuditSpanS3A auditSpan) { @@ -289,4 +317,88 @@ private AuditSpanS3A getAuditSpan() { return auditSpan; } } + + /** + * Verifu that pruning takes place intermittently. + */ + @Test + public void testRegularPruning() throws Throwable { + try (ActiveAuditManagerS3A auditManager = + new ActiveAuditManagerS3A(emptyStatisticsStore())) { + auditManager.init(createMemoryHungryConfiguration()); + auditManager.start(); + // get the span map + final WeakReferenceThreadMap spanMap + = auditManager.getActiveSpanMap(); + // add a null entry at a thread ID other than this one + spanMap.put(Thread.currentThread().getId() + 1, null); + + // remove this span enough times that pruning shall take + // place twice + // this verifies that pruning takes place and that the + // counter is reset + int pruningCount = 0; + for (int i = 0; i < PRUNE_THRESHOLD * 2 + 1; i++) { + boolean pruned = auditManager.removeActiveSpanFromMap(); + if (pruned) { + pruningCount++; + } + } + // pruning must have taken place + Assertions.assertThat(pruningCount) + .describedAs("Intermittent pruning count") + .isEqualTo(2); + } + } + + /** + * Verify span deactivation removes the entry from the map. + */ + @Test + public void testSpanDeactivationRemovesEntryFromMap() throws Throwable { + try (ActiveAuditManagerS3A auditManager = + new ActiveAuditManagerS3A(emptyStatisticsStore())) { + auditManager.init(createMemoryHungryConfiguration()); + auditManager.start(); + // get the span map + final WeakReferenceThreadMap spanMap + = auditManager.getActiveSpanMap(); + final AuditSpanS3A auditSpan = + auditManager.createSpan("span", null, null); + Assertions.assertThat(auditManager.getActiveAuditSpan()) + .describedAs("active span") + .isSameAs(auditSpan); + // this assert gets used repeatedly, so define a lambda-exp + // which can be envoked with different arguments + Consumer assertMapHasKey = expected -> + Assertions.assertThat(spanMap.containsKey(spanMap.curentThreadId())) + .describedAs("map entry for current thread") + .isEqualTo(expected); + + // sets the span to null + auditSpan.deactivate(); + + // there's no entry + assertMapHasKey.accept(false); + + // asking for the current span will return the unbonded one + final AuditSpanS3A newSpan = auditManager.getActiveAuditSpan(); + Assertions.assertThat(newSpan) + .describedAs("active span") + .isNotNull() + .matches(s -> !s.isValidSpan()); + // which is in the map + // there's an entry + assertMapHasKey.accept(true); + + // deactivating the old span does nothing + auditSpan.deactivate(); + assertMapHasKey.accept(true); + + // deactivating the current unbounded span does + // remove the entry + newSpan.deactivate(); + assertMapHasKey.accept(false); + } + } } From bb6f5c78db805a711777d85da34308791dc6c98e Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 27 Jan 2022 15:48:06 +0000 Subject: [PATCH 08/12] HADOOP-18091. renabling auditing. Turning auditing back on. Change-Id: I46032e467cc399a0f292f756a4bf7d1fd5e30352 --- .../hadoop-common/src/main/resources/core-default.xml | 11 +++++++++++ .../apache/hadoop/fs/s3a/audit/S3AAuditConstants.java | 4 ++-- .../fs/s3a/audit/impl/ActiveAuditManagerS3A.java | 3 ++- .../src/site/markdown/tools/hadoop-aws/auditing.md | 8 +++----- 4 files changed, 18 insertions(+), 8 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 26126f14c5602..7e2023603cd8f 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -2025,6 +2025,17 @@ + + + fs.s3a.audit.enabled + true + + Should auditing of S3A requests be enabled? + + + fs.AbstractFileSystem.wasb.impl diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/S3AAuditConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/S3AAuditConstants.java index 09ff0cbfd7c1c..ad0ee36fe11d8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/S3AAuditConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/S3AAuditConstants.java @@ -44,7 +44,7 @@ private S3AAuditConstants() { * Default auditing flag. * Value: {@value}. */ - public static final boolean AUDIT_ENABLED_DEFAULT = false; + public static final boolean AUDIT_ENABLED_DEFAULT = true; /** @@ -57,7 +57,7 @@ private S3AAuditConstants() { * Classname of the logging auditor: {@value}. */ public static final String LOGGING_AUDIT_SERVICE = - "org.apache.hadoop.fs.s3a.audit.impl.LoggingAuditor"; + "org.apache.hadoop.fs.s3a.audit.impl.LoggingAuditor"; /** * Classname of the No-op auditor: {@value}. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java index 46221cdf03e5a..2c7a506bf5521 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java @@ -122,8 +122,9 @@ public final class ActiveAuditManagerS3A * Arbritary threshold for triggering pruning on deactivation. * High enough it doesn't happen very often, low enough * that it will happen regularly on a busy system. + * Value: {@value}. */ - static final int PRUNE_THRESHOLD = 10000; + static final int PRUNE_THRESHOLD = 10_000; /** * Audit service. diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing.md index 17f0c9ad54326..7c004627357b4 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing.md @@ -32,10 +32,9 @@ See [HADOOP-18091](https://issues.apache.org/jira/browse/HADOOP-18091) _S3A audi To avoid these memory leaks, auditing was disabled by default in the hadoop 3.3.2 release. -Although these memory leaks have been fixed, for safety and consistency, -auditing is disabled by default. +As these memory leaks have now been fixed, auditing has been re-enabled. -To turn auditing on, set `fs.s3a.audit.enabled` to `true`. +To disable it, set `fs.s3a.audit.enabled` to `false`. ## Auditing workflow @@ -87,7 +86,7 @@ Other auditor classes may be used instead. | Option | Meaning | Default Value | |--------|---------|---------------| -| `fs.s3a.audit.enabled` | Is auditing enabled | `false` | +| `fs.s3a.audit.enabled` | Is auditing enabled? | `true` | | `fs.s3a.audit.service.classname` | Auditor classname | `org.apache.hadoop.fs.s3a.audit.impl.LoggingAuditor` | | `fs.s3a.audit.request.handlers` | List of extra subclasses of AWS SDK RequestHandler2 to include in handler chain | `""` | | `fs.s3a.audit.referrer.enabled` | Logging auditor to publish the audit information in the HTTP Referrer header | `true` | @@ -141,7 +140,6 @@ The Logging Auditor is enabled by providing its classname in the option ``` - To print auditing events in the local client logs, set the associated Log4J log to log at debug: From d1446cd049ae61b3742f5ea15defb95076bf801b Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 28 Jan 2022 12:05:57 +0000 Subject: [PATCH 09/12] HADOOP-18091. style and EOLs Change-Id: I4af0a49cc87a0998578c3170e5199665f7f572bc --- .../fs/s3a/audit/S3AAuditConstants.java | 2 +- .../tools/hadoop-aws/auditing_architecture.md | 6 +-- .../TestActiveAuditManagerThreadLeakage.java | 38 +++++++------------ 3 files changed, 18 insertions(+), 28 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/S3AAuditConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/S3AAuditConstants.java index ad0ee36fe11d8..1d76833f8ceab 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/S3AAuditConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/S3AAuditConstants.java @@ -57,7 +57,7 @@ private S3AAuditConstants() { * Classname of the logging auditor: {@value}. */ public static final String LOGGING_AUDIT_SERVICE = - "org.apache.hadoop.fs.s3a.audit.impl.LoggingAuditor"; + "org.apache.hadoop.fs.s3a.audit.impl.LoggingAuditor"; /** * Classname of the No-op auditor: {@value}. diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing_architecture.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing_architecture.md index 1238182e7f894..a59620b8d1c2b 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing_architecture.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing_architecture.md @@ -129,7 +129,7 @@ The original code was written with the assumption that when the `ActiveAuditMana garbage collected, references in its `ThreadLocal` field would be freed. In fact, they are retained until all threads with references are terminated. If any long-lived thread had performed an s3 operation which created a span, -a reference back to the audit manager instance was created +a reference back to the audit manager instance was created *whose lifetime was that of the thread* In short-lived processes, and long-lived processes where a limited set of @@ -163,7 +163,7 @@ There is therefore a theoretical risk that if a garbage collection takes place d execution of a spanned operation, the reference will be lost. This is not considered an issue as all bounded entry points into the S3A filesystem -retain a strong reference to their audit span. +retain a strong reference to their audit span. All entry points which return an object which can invoke s3 operations (input and output streams, list iterators, etc.) also retain a strong reference to their span, a reference @@ -174,7 +174,7 @@ Except in deployments where `fs.s3a.audit.reject.out.of.span.operations` is true invoking S3 operations within the unbounded span are permitted. That option is set to `true` within S3A test suites. Therefore it is unlikely that any operations are invoked in unbounded spans except -for the special case of copy operations invoked by the transfer manager threads. +for the special case of copy operations invoked by the transfer manager threads. Those are already ignored in the logging auditor, whose unbounded span ignores requests which `AWSRequestAnalyzer.isRequestNotAlwaysInSpan()` indicates may happen outside of a span. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/impl/TestActiveAuditManagerThreadLeakage.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/impl/TestActiveAuditManagerThreadLeakage.java index 0f69f95c8cbf0..0696bc0b664f9 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/impl/TestActiveAuditManagerThreadLeakage.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/impl/TestActiveAuditManagerThreadLeakage.java @@ -84,7 +84,7 @@ public class TestActiveAuditManagerThreadLeakage extends AbstractHadoopTestBase /** * As audit managers are created they are added to the list, - * so we can verify they get GC'd.` + * so we can verify they get GC'd. */ private final List> auditManagers = new ArrayList<>(); @@ -184,24 +184,19 @@ private int createAuditorAndWorkers() Set threadIds = new HashSet<>(); - // use a code block here so that futures becomes null after use - // without having to set it to null and then the IDE/spotbugs - // complaining about unused assignments - { - List> futures; - futures = new ArrayList<>(THREAD_COUNT); + List> futures = new ArrayList<>(THREAD_COUNT); - // perform the spanning operation in a long lived thread. - for (int i = 0; i < THREAD_COUNT; i++) { - futures.add(workers.submit(() -> spanningOperation(auditManager))); - } + // perform the spanning operation in a long lived thread. + for (int i = 0; i < THREAD_COUNT; i++) { + futures.add(workers.submit(() -> spanningOperation(auditManager))); + } - // get the results and so determine the thread IDs - for (Future future : futures) { - final Result r = future.get(); - threadIds.add(r.getThreadId()); - } + // get the results and so determine the thread IDs + for (Future future : futures) { + final Result r = future.get(); + threadIds.add(r.getThreadId()); } + final int threadsUsed = threadIds.size(); final Long[] threadIdArray = threadIds.toArray(new Long[0]); @@ -291,7 +286,7 @@ private Result spanningOperation(final ActiveAuditManagerS3A auditManager) .isNotNull(); // this is needed to ensure that more of the thread pool is used up Thread.yield(); - return new Result(Thread.currentThread().getId(), auditSpan); + return new Result(Thread.currentThread().getId()); } /** @@ -301,21 +296,16 @@ private static final class Result { /** thread operation took place in. */ private final long threadId; - /** generated span. not currently used in tests. */ - private final AuditSpanS3A auditSpan; - private Result(final long threadId, final AuditSpanS3A auditSpan) { + private Result(final long threadId) { this.threadId = threadId; - this.auditSpan = auditSpan; } private long getThreadId() { return threadId; } - private AuditSpanS3A getAuditSpan() { - return auditSpan; - } + } /** From ec7fac67ce49afeb5ad5aa4f512acc08a45fff41 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 31 Jan 2022 11:09:00 +0000 Subject: [PATCH 10/12] HADOOP-18091. fix EOL Change-Id: I6b2c76131e520081c649a89a638b03b2398780e1 --- .../src/site/markdown/tools/hadoop-aws/auditing_architecture.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing_architecture.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing_architecture.md index a59620b8d1c2b..dc91063ac1d11 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing_architecture.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing_architecture.md @@ -145,7 +145,7 @@ Where it did have problems was when the following conditions were met from the cache of active instances. 4. Or, after a query for a specific user was completed, `FileSystem.closeAllForUGI(UserGroupInformation)` was invoked to remove all - cached FS instances of that user. + cached FS instances of that user. Conditions 1, 2 and 4 are exactly those which long-lived Hive services can encounter. From 06154c505ce2f8c1b35dd87577ba00d98659f514 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 8 Feb 2022 15:44:48 +0000 Subject: [PATCH 11/12] HADOOP-18091. feedback from PR review * typos and nits * cleanup in test teardown * tuning docs Change-Id: I088f050fa34b097266b70f3e078da89e083dfdf9 --- .../fs/impl/WeakReferenceThreadMap.java | 8 +++--- .../tools/hadoop-aws/auditing_architecture.md | 9 +++---- .../TestActiveAuditManagerThreadLeakage.java | 26 ++++++++++++++----- .../src/test/resources/log4j.properties | 2 +- 4 files changed, 27 insertions(+), 18 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java index 2f6a62e92afc0..b24bef2a816bf 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java @@ -36,19 +36,19 @@ public WeakReferenceThreadMap(final Function factory, } public V getForCurrentThread() { - return get(curentThreadId()); + return get(currentThreadId()); } public V removeForCurrentThread() { - return remove(curentThreadId()); + return remove(currentThreadId()); } - public long curentThreadId() { + public long currentThreadId() { return Thread.currentThread().getId(); } public V setForCurrentThread(V newVal) { - return put(curentThreadId(), newVal); + return put(currentThreadId(), newVal); } } diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing_architecture.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing_architecture.md index dc91063ac1d11..aa35fae92cebf 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing_architecture.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing_architecture.md @@ -153,7 +153,7 @@ encounter. _Auditing was disabled by default until a fix was implemented._ The memory leak has been fixed using a new class `org.apache.hadoop.util.WeakReferenceMap` -to store a map of thread IDs to active spans. When the S3A fileystem is closed, +to store a map of thread IDs to active spans. When the S3A filesystem is closed, its audit manager service is stopped and all references to spans removed from the map of thread ID to span. @@ -179,7 +179,7 @@ Those are already ignored in the logging auditor, whose unbounded span ignores requests which `AWSRequestAnalyzer.isRequestNotAlwaysInSpan()` indicates may happen outside of a span. This is restricted to bucket location probes possibly performed by the SDK -on instantiatio, and copy part/complete calls. +on instantiation, and copy part/complete calls. ```java @@ -191,9 +191,6 @@ on instantiatio, and copy part/complete calls. } ``` -Auditing is still disabled by default for consistency with previous releases. - - ### Class `org.apache.hadoop.fs.audit.CommonAuditContext` @@ -232,7 +229,7 @@ If memory leakage is suspected here, set the log `org.apache.hadoop.fs.audit.CommonAuditContext` to `TRACE` to log the origin of operations which add log entries. -This will produce a log entry whose strack trace will show the caller chain.f +This will produce a log entry whose stack trace will show the caller chain. ``` 2022-01-26 16:10:28,384 TRACE audit.CommonAuditContext (CommonAuditContext.java:put(149)) - Adding context entry t1 java.lang.Exception: t1 diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/impl/TestActiveAuditManagerThreadLeakage.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/impl/TestActiveAuditManagerThreadLeakage.java index 0696bc0b664f9..d156947db9474 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/impl/TestActiveAuditManagerThreadLeakage.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/impl/TestActiveAuditManagerThreadLeakage.java @@ -32,6 +32,7 @@ import java.util.function.Consumer; import org.assertj.core.api.Assertions; +import org.junit.After; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +73,7 @@ public class TestActiveAuditManagerThreadLeakage extends AbstractHadoopTestBase /** how many managers to sequentially create. */ private static final int MANAGER_COUNT = 500; - /** size of long lived hread pool. */ + /** size of long lived thread pool. */ private static final int THREAD_COUNT = 20; private ExecutorService workers; @@ -89,6 +90,14 @@ public class TestActiveAuditManagerThreadLeakage extends AbstractHadoopTestBase private final List> auditManagers = new ArrayList<>(); + @After + public void teardown() { + if (workers != null) { + workers.shutdown(); + } + } + + /** * When the service is stopped, the span map is * cleared immediately. @@ -126,7 +135,7 @@ public void testMemoryLeak() throws Throwable { // but if memory allocation in test runs increase, it // may cease to hold. in which case: create more // audit managers - LOG.info("Totel prune count {}", pruneCount); + LOG.info("Total prune count {}", pruneCount); Assertions.assertThat(pruneCount) .describedAs("Totel prune count") @@ -150,10 +159,13 @@ private long createAndTestOneAuditor() throws Exception { long original = Runtime.getRuntime().freeMemory(); ExecutorService factory = Executors.newSingleThreadExecutor(); - pruneCount += factory.submit(this::createAuditorAndWorkers).get(); + try { + pruneCount += factory.submit(this::createAuditorAndWorkers).get(); + } finally { + factory.shutdown(); + factory.awaitTermination(60, TimeUnit.SECONDS); + } - factory.shutdown(); - factory.awaitTermination(60, TimeUnit.SECONDS); final long current = Runtime.getRuntime().freeMemory(); return current - original; @@ -309,7 +321,7 @@ private long getThreadId() { } /** - * Verifu that pruning takes place intermittently. + * Verify that pruning takes place intermittently. */ @Test public void testRegularPruning() throws Throwable { @@ -361,7 +373,7 @@ public void testSpanDeactivationRemovesEntryFromMap() throws Throwable { // this assert gets used repeatedly, so define a lambda-exp // which can be envoked with different arguments Consumer assertMapHasKey = expected -> - Assertions.assertThat(spanMap.containsKey(spanMap.curentThreadId())) + Assertions.assertThat(spanMap.containsKey(spanMap.currentThreadId())) .describedAs("map entry for current thread") .isEqualTo(expected); diff --git a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties index 8442d9ff4b207..fc287e9845c76 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties @@ -84,4 +84,4 @@ log4j.logger.org.apache.hadoop.fs.s3a.S3AStorageStatistics=INFO #log4j.logger.org.apache.hadoop.fs.s3a.audit=TRACE # uncomment this to trace where context entries are set -# log4j.logger.org.apache.hadoop.fs.audit.CommonAuditContext=TRACE \ No newline at end of file +# log4j.logger.org.apache.hadoop.fs.audit.CommonAuditContext=TRACE From 554b55f1e8f189cf354c8206a1291a89dd80021a Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 9 Feb 2022 19:04:12 +0000 Subject: [PATCH 12/12] HADOOP-18068. final typos Change-Id: I1809cec29d9354185d759e24e65688a44d50cb89 --- .../apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java | 2 +- .../fs/s3a/audit/impl/TestActiveAuditManagerThreadLeakage.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java index 2c7a506bf5521..3d2102d305c7d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java @@ -119,7 +119,7 @@ public final class ActiveAuditManagerS3A = "Span attached to request is not a wrapped span"; /** - * Arbritary threshold for triggering pruning on deactivation. + * Arbitrary threshold for triggering pruning on deactivation. * High enough it doesn't happen very often, low enough * that it will happen regularly on a busy system. * Value: {@value}. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/impl/TestActiveAuditManagerThreadLeakage.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/impl/TestActiveAuditManagerThreadLeakage.java index d156947db9474..901347d29d87b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/impl/TestActiveAuditManagerThreadLeakage.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/impl/TestActiveAuditManagerThreadLeakage.java @@ -138,7 +138,7 @@ public void testMemoryLeak() throws Throwable { LOG.info("Total prune count {}", pruneCount); Assertions.assertThat(pruneCount) - .describedAs("Totel prune count") + .describedAs("Total prune count") .isNotZero(); // now count number of audit managers GC'd