diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/BackReference.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/BackReference.java
new file mode 100644
index 00000000000000..04e39875b60bfc
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/BackReference.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import javax.annotation.Nullable;
+
+/**
+ * Holds reference to an object to be attached to a stream or store to avoid
+ * the reference being lost to GC.
+ */
+public class BackReference {
+ private final Object reference;
+
+ public BackReference(@Nullable Object reference) {
+ this.reference = reference;
+ }
+
+ /**
+ * is the reference null?
+ * @return true if the ref. is null, else false.
+ */
+ public boolean isNull() {
+ return reference == null;
+ }
+
+ @Override
+ public String toString() {
+ return "BackReference{" +
+ "reference=" + reference +
+ '}';
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 438e2df1372065..dd543deb8a5a53 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -2321,6 +2321,15 @@ The switch to turn S3A auditing on or off.
+
+ ipc.server.handler.queue.size
+ 100
+
+ Indicates how many calls per handler are allowed in the queue. This value can
+ determine the maximum call queue size by multiplying the number of handler threads.
+
+
+
ipc.server.listen.queue.size
256
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
index 073576546c790e..1611c3c9ce532a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
@@ -308,13 +308,6 @@ Map getBpByNameserviceId() {
return bpByNameserviceId;
}
- boolean isSlownodeByNameserviceId(String nsId) {
- if (bpByNameserviceId.containsKey(nsId)) {
- return bpByNameserviceId.get(nsId).isSlownode();
- }
- return false;
- }
-
boolean isSlownodeByBlockPoolId(String bpId) {
if (bpByBlockPoolId.containsKey(bpId)) {
return bpByBlockPoolId.get(bpId).isSlownode();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 41088cf59ce4b2..2096f18d31abff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -4240,10 +4240,6 @@ public DataSetLockManager getDataSetLockManager() {
return dataSetLockManager;
}
- boolean isSlownodeByNameserviceId(String nsId) {
- return blockPoolManager.isSlownodeByNameserviceId(nsId);
- }
-
boolean isSlownodeByBlockPoolId(String bpId) {
return blockPoolManager.isSlownodeByBlockPoolId(bpId);
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index 5fb2c6e1700a81..426ad8ca1e1912 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -45,6 +45,7 @@
import javax.annotation.Nullable;
import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
@@ -155,6 +156,9 @@ public class AzureBlobFileSystem extends FileSystem
/** Rate limiting for operations which use it to throttle their IO. */
private RateLimiting rateLimiting;
+ /** Storing full path uri for better logging. */
+ private URI fullPathUri;
+
@Override
public void initialize(URI uri, Configuration configuration)
throws IOException {
@@ -165,7 +169,7 @@ public void initialize(URI uri, Configuration configuration)
setConf(configuration);
LOG.debug("Initializing AzureBlobFileSystem for {}", uri);
-
+ this.fullPathUri = uri;
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
abfsCounters = new AbfsCountersImpl(uri);
// name of the blockFactory to be used.
@@ -192,6 +196,7 @@ public void initialize(URI uri, Configuration configuration)
.withAbfsCounters(abfsCounters)
.withBlockFactory(blockFactory)
.withBlockOutputActiveBlocks(blockOutputActiveBlocks)
+ .withBackReference(new BackReference(this))
.build();
this.abfsStore = new AzureBlobFileSystemStore(systemStoreBuilder);
@@ -236,7 +241,7 @@ public void initialize(URI uri, Configuration configuration)
public String toString() {
final StringBuilder sb = new StringBuilder(
"AzureBlobFileSystem{");
- sb.append("uri=").append(uri);
+ sb.append("uri=").append(fullPathUri);
sb.append(", user='").append(abfsStore.getUser()).append('\'');
sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]");
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 79ffc796c3ae59..5c06270fa5bb81 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -55,6 +55,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
@@ -189,6 +190,9 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
/** Bounded ThreadPool for this instance. */
private ExecutorService boundedThreadPool;
+ /** ABFS instance reference to be held by the store to avoid GC close. */
+ private BackReference fsBackRef;
+
/**
* FileSystem Store for {@link AzureBlobFileSystem} for Abfs operations.
* Built using the {@link AzureBlobFileSystemStoreBuilder} with parameters
@@ -202,6 +206,7 @@ public AzureBlobFileSystemStore(
String[] authorityParts = authorityParts(uri);
final String fileSystemName = authorityParts[0];
final String accountName = authorityParts[1];
+ this.fsBackRef = abfsStoreBuilder.fsBackRef;
leaseRefs = Collections.synchronizedMap(new WeakHashMap<>());
@@ -711,6 +716,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
.withExecutorService(new SemaphoredDelegatingExecutor(boundedThreadPool,
blockOutputActiveBlocks, true))
.withTracingContext(tracingContext)
+ .withAbfsBackRef(fsBackRef)
.build();
}
@@ -818,6 +824,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext(
abfsConfiguration.shouldReadBufferSizeAlways())
.withReadAheadBlockSize(abfsConfiguration.getReadAheadBlockSize())
.withBufferedPreadDisabled(bufferedPreadDisabled)
+ .withAbfsBackRef(fsBackRef)
.build();
}
@@ -1871,6 +1878,7 @@ public static final class AzureBlobFileSystemStoreBuilder {
private AbfsCounters abfsCounters;
private DataBlocks.BlockFactory blockFactory;
private int blockOutputActiveBlocks;
+ private BackReference fsBackRef;
public AzureBlobFileSystemStoreBuilder withUri(URI value) {
this.uri = value;
@@ -1906,6 +1914,12 @@ public AzureBlobFileSystemStoreBuilder withBlockOutputActiveBlocks(
return this;
}
+ public AzureBlobFileSystemStoreBuilder withBackReference(
+ BackReference fsBackRef) {
+ this.fsBackRef = fsBackRef;
+ return this;
+ }
+
public AzureBlobFileSystemStoreBuilder build() {
return this;
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index fdeaf701775712..86442dac9aaf70 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -26,6 +26,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
@@ -121,6 +122,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
*/
private long nextReadPos;
+ /** ABFS instance to be held by the input stream to avoid GC close. */
+ private final BackReference fsBackRef;
+
public AbfsInputStream(
final AbfsClient client,
final Statistics statistics,
@@ -152,6 +156,7 @@ public AbfsInputStream(
this.tracingContext.setStreamID(inputStreamId);
this.context = abfsInputStreamContext;
readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize();
+ this.fsBackRef = abfsInputStreamContext.getFsBackRef();
// Propagate the config values to ReadBufferManager so that the first instance
// to initialize can set the readAheadBlockSize
@@ -857,4 +862,9 @@ long getFCursorAfterLastRead() {
long getLimit() {
return this.limit;
}
+
+ @VisibleForTesting
+ BackReference getFsBackRef() {
+ return fsBackRef;
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
index e258958b1a1116..b78a899340f875 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
@@ -20,6 +20,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.util.Preconditions;
/**
@@ -51,6 +53,9 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
private boolean bufferedPreadDisabled;
+ /** A BackReference to the FS instance that created this OutputStream. */
+ private BackReference fsBackRef;
+
public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
super(sasTokenRenewPeriodForStreamsInSeconds);
}
@@ -122,6 +127,12 @@ public AbfsInputStreamContext withBufferedPreadDisabled(
return this;
}
+ public AbfsInputStreamContext withAbfsBackRef(
+ final BackReference fsBackRef) {
+ this.fsBackRef = fsBackRef;
+ return this;
+ }
+
public AbfsInputStreamContext build() {
if (readBufferSize > readAheadBlockSize) {
LOG.debug(
@@ -180,4 +191,8 @@ public int getReadAheadBlockSize() {
public boolean isBufferedPreadDisabled() {
return bufferedPreadDisabled;
}
+
+ public BackReference getFsBackRef() {
+ return fsBackRef;
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 82e20ce5b76842..4268dc3f918a12 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -27,6 +27,7 @@
import java.util.UUID;
import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
@@ -126,6 +127,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
/** Executor service to carry out the parallel upload requests. */
private final ListeningExecutorService executorService;
+ /** ABFS instance to be held by the output stream to avoid GC close. */
+ private final BackReference fsBackRef;
+
public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
throws IOException {
this.client = abfsOutputStreamContext.getClient();
@@ -147,6 +151,7 @@ public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
this.numOfAppendsToServerSinceLastFlush = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics();
+ this.fsBackRef = abfsOutputStreamContext.getFsBackRef();
if (this.isAppendBlob) {
this.maxConcurrentRequestCount = 1;
@@ -488,6 +493,12 @@ public synchronized void close() throws IOException {
}
try {
+ // Check if Executor Service got shutdown before the writes could be
+ // completed.
+ if (hasActiveBlockDataToUpload() && executorService.isShutdown()) {
+ throw new PathIOException(path, "Executor Service closed before "
+ + "writes could be completed.");
+ }
flushInternal(true);
} catch (IOException e) {
// Problems surface in try-with-resources clauses if
@@ -766,4 +777,14 @@ public String toString() {
sb.append("}");
return sb.toString();
}
+
+ @VisibleForTesting
+ BackReference getFsBackRef() {
+ return fsBackRef;
+ }
+
+ @VisibleForTesting
+ ListeningExecutorService getExecutorService() {
+ return executorService;
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
index ed897330367418..1d1a99c7d9f6f9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
@@ -22,6 +22,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.fs.store.DataBlocks;
/**
@@ -65,6 +66,9 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
private TracingContext tracingContext;
+ /** A BackReference to the FS instance that created this OutputStream. */
+ private BackReference fsBackRef;
+
public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
super(sasTokenRenewPeriodForStreamsInSeconds);
}
@@ -157,6 +161,12 @@ public AbfsOutputStreamContext withTracingContext(
return this;
}
+ public AbfsOutputStreamContext withAbfsBackRef(
+ final BackReference fsBackRef) {
+ this.fsBackRef = fsBackRef;
+ return this;
+ }
+
public AbfsOutputStreamContext build() {
// Validation of parameters to be done here.
if (streamStatistics == null) {
@@ -261,4 +271,8 @@ public ExecutorService getExecutorService() {
public TracingContext getTracingContext() {
return tracingContext;
}
+
+ public BackReference getFsBackRef() {
+ return fsBackRef;
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
index 66f072501dc4db..2ac58fbcb1668e 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
@@ -32,6 +32,8 @@
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+import org.assertj.core.api.Assertions;
import org.junit.Test;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
@@ -106,6 +108,25 @@ public void testExceptionInOptimization() throws Exception {
}
}
+ /**
+ * Testing the back reference being passed down to AbfsInputStream.
+ */
+ @Test
+ public void testAzureBlobFileSystemBackReferenceInInputStream()
+ throws IOException {
+ Path path = path(getMethodName());
+ // Create a file then open it to verify if this input stream contains any
+ // back reference.
+ try (FSDataOutputStream out = getFileSystem().create(path);
+ FSDataInputStream in = getFileSystem().open(path)) {
+ AbfsInputStream abfsInputStream = (AbfsInputStream) in.getWrappedStream();
+
+ Assertions.assertThat(abfsInputStream.getFsBackRef().isNull())
+ .describedAs("BackReference in input stream should not be null")
+ .isFalse();
+ }
+ }
+
private void testExceptionInOptimization(final FileSystem fs,
final Path testFilePath,
final int seekPos, final int length, final byte[] fileContent)
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
index 431c456ae3daaf..eee0c177c33b33 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
@@ -18,19 +18,28 @@
package org.apache.hadoop.fs.azurebfs.services;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
+import org.apache.hadoop.test.LambdaTestUtils;
/**
* Test create operation.
*/
public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {
+
+ private static final int TEST_EXECUTION_TIMEOUT = 2 * 60 * 1000;
private static final String TEST_FILE_PATH = "testfile";
public ITestAbfsOutputStream() throws Exception {
@@ -84,4 +93,73 @@ public void testMaxRequestsAndQueueCapacity() throws Exception {
}
}
+ /**
+ * Verify the passing of AzureBlobFileSystem reference to AbfsOutputStream
+ * to make sure that the FS instance is not eligible for GC while writing.
+ */
+ @Test(timeout = TEST_EXECUTION_TIMEOUT)
+ public void testAzureBlobFileSystemBackReferenceInOutputStream()
+ throws Exception {
+ byte[] testBytes = new byte[5 * 1024];
+ // Creating an output stream using a FS in a separate method to make the
+ // FS instance used eligible for GC. Since when a method is popped from
+ // the stack frame, it's variables become anonymous, this creates higher
+ // chance of getting Garbage collected.
+ try (AbfsOutputStream out = getStream()) {
+ // Every 5KB block written is flushed and a GC is hinted, if the
+ // executor service is shut down in between, the test should fail
+ // indicating premature shutdown while writing.
+ for (int i = 0; i < 5; i++) {
+ out.write(testBytes);
+ out.flush();
+ System.gc();
+ Assertions.assertThat(
+ out.getExecutorService().isShutdown() || out.getExecutorService()
+ .isTerminated())
+ .describedAs("Executor Service should not be closed before "
+ + "OutputStream while writing")
+ .isFalse();
+ Assertions.assertThat(out.getFsBackRef().isNull())
+ .describedAs("BackReference in output stream should not be null")
+ .isFalse();
+ }
+ }
+ }
+
+ /**
+ * Verify AbfsOutputStream close() behaviour of throwing a PathIOE when the
+ * FS instance is closed before the stream.
+ */
+ @Test
+ public void testAbfsOutputStreamClosingFsBeforeStream()
+ throws Exception {
+ AzureBlobFileSystem fs = new AzureBlobFileSystem();
+ fs.initialize(new URI(getTestUrl()), new Configuration());
+ Path pathFs = path(getMethodName());
+ byte[] inputBytes = new byte[5 * 1024];
+ try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
+ pathFs)) {
+ out.write(inputBytes);
+ fs.close();
+ // verify that output stream close after fs.close() would raise a
+ // pathIOE containing the path being written to.
+ LambdaTestUtils
+ .intercept(PathIOException.class, getMethodName(), out::close);
+ }
+ }
+
+ /**
+ * Separate method to create an outputStream using a local FS instance so
+ * that once this method has returned, the FS instance can be eligible for GC.
+ *
+ * @return AbfsOutputStream used for writing.
+ */
+ private AbfsOutputStream getStream() throws URISyntaxException, IOException {
+ AzureBlobFileSystem fs1 = new AzureBlobFileSystem();
+ fs1.initialize(new URI(getTestUrl()), new Configuration());
+ Path pathFs1 = path(getMethodName() + "1");
+
+ return createAbfsOutputStreamWithFlushEnabled(fs1, pathFs1);
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 282c271164d9f3..90d734a7ae0d57 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -4058,6 +4058,20 @@ public static boolean isAclEnabled(Configuration conf) {
public static final long DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT =
60000; // one minute
+ // AMRMProxy Register UAM Retry-Num
+ public static final String FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT =
+ FEDERATION_PREFIX + "amrmproxy.register.uam.retry-count";
+ // Register a UAM , we will retry a maximum of 3 times.
+ public static final int DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT =
+ 3;
+
+ // AMRMProxy Register UAM Retry Interval
+ public static final String FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL =
+ FEDERATION_PREFIX + "amrmproxy.register.uam.interval";
+ // Retry Interval, default 100 ms
+ public static final long DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL =
+ TimeUnit.MILLISECONDS.toMillis(100);
+
public static final String DEFAULT_FEDERATION_POLICY_KEY = "*";
public static final String FEDERATION_POLICY_MANAGER = FEDERATION_PREFIX
+ "policy-manager";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-catalog/hadoop-yarn-applications-catalog-webapp/package.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-catalog/hadoop-yarn-applications-catalog-webapp/package.json
index f09442cfc4e87a..59cc3da179fd02 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-catalog/hadoop-yarn-applications-catalog-webapp/package.json
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-catalog/hadoop-yarn-applications-catalog-webapp/package.json
@@ -19,6 +19,9 @@
"shelljs": "^0.2.6",
"apidoc": "0.17.7"
},
+ "resolutions": {
+ "triple-beam": "1.3.0"
+ },
"scripts": {
"prestart": "npm install & mvn clean package",
"pretest": "npm install"
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index f1ccdb3b56913a..7f669487641af2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -5415,6 +5415,15 @@
yarn.federation.gpg.policy.generator.interval-ms
1h
+
+
+
+ The number of retry for Register UAM.
+ The default value is 3.
+
+ yarn.federation.amrmproxy.register.uam.retry-count
+ 3
+
@@ -5439,5 +5448,13 @@
yarn.federation.gpg.policy.generator.blacklist
+
+
+ Interval between retry for Register UAM.
+ The default value is 100ms.
+
+ yarn.federation.amrmproxy.register.uam.interval
+ 100ms
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index 14a2d60c2b5bc0..32c5bf217e233e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -36,6 +36,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
@@ -87,6 +88,7 @@
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
+import org.apache.hadoop.yarn.server.federation.retry.FederationActionRetry;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
@@ -251,6 +253,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
// the maximum wait time for the first async heart beat response
private long heartbeatMaxWaitTimeMs;
+ private int registerUamRetryNum;
+
+ private long registerUamRetryInterval;
+
private boolean waitUamRegisterDone;
private MonotonicClock clock = new MonotonicClock();
@@ -355,6 +361,24 @@ public void init(AMRMProxyApplicationContext appContext) {
this.subClusterTimeOut =
YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT;
}
+
+ this.registerUamRetryNum = conf.getInt(
+ YarnConfiguration.FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT,
+ YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT);
+ if (this.registerUamRetryNum <= 0) {
+ LOG.info("{} configured to be {}, should be positive. Using default of {}.",
+ YarnConfiguration.FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT,
+ this.subClusterTimeOut,
+ YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT);
+ this.registerUamRetryNum =
+ YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT;
+ }
+
+ this.registerUamRetryInterval = conf.getTimeDuration(
+ YarnConfiguration.FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL,
+ YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL,
+ TimeUnit.MILLISECONDS);
+
this.waitUamRegisterDone = conf.getBoolean(YarnConfiguration.AMRM_PROXY_WAIT_UAM_REGISTER_DONE,
YarnConfiguration.DEFAULT_AMRM_PROXY_WAIT_UAM_REGISTER_DONE);
}
@@ -701,7 +725,7 @@ public AllocateResponse allocate(AllocateRequest request)
if (this.finishAMCalled) {
LOG.warn("FinishApplicationMaster already called by {}, skip heartbeat "
- + "processing and return dummy response" + this.attemptId);
+ + "processing and return dummy response.", this.attemptId);
return RECORD_FACTORY.newRecordInstance(AllocateResponse.class);
}
@@ -1255,85 +1279,77 @@ private List registerAndAllocateWithNewSubClusters(
// Check to see if there are any new sub-clusters in this request
// list and create and register Unmanaged AM instance for the new ones
List newSubClusters = new ArrayList<>();
- for (SubClusterId subClusterId : requests.keySet()) {
- if (!subClusterId.equals(this.homeSubClusterId)
- && !this.uamPool.hasUAMId(subClusterId.getId())) {
- newSubClusters.add(subClusterId);
+ requests.keySet().stream().forEach(subClusterId -> {
+ String id = subClusterId.getId();
+ if (!subClusterId.equals(this.homeSubClusterId) && !this.uamPool.hasUAMId(id)) {
+ newSubClusters.add(subClusterId);
// Set sub-cluster to be timed out initially
- lastSCResponseTime.put(subClusterId,
- clock.getTime() - subClusterTimeOut);
+ lastSCResponseTime.put(subClusterId, clock.getTime() - subClusterTimeOut);
}
- }
+ });
this.uamRegisterFutures.clear();
+
for (final SubClusterId scId : newSubClusters) {
- Future> future = this.threadpool.submit(new Runnable() {
- @Override
- public void run() {
- String subClusterId = scId.getId();
-
- // Create a config loaded with federation on and subclusterId
- // for each UAM
- YarnConfiguration config = new YarnConfiguration(getConf());
- FederationProxyProviderUtil.updateConfForFederation(config,
- subClusterId);
-
- RegisterApplicationMasterResponse uamResponse = null;
- Token token = null;
- try {
- ApplicationId applicationId = attemptId.getApplicationId();
- ApplicationSubmissionContext originalSubmissionContext =
- federationFacade.getApplicationSubmissionContext(applicationId);
-
- // For appNameSuffix, use subClusterId of the home sub-cluster
- token = uamPool.launchUAM(subClusterId, config,
- applicationId, amRegistrationResponse.getQueue(),
- getApplicationContext().getUser(), homeSubClusterId.toString(),
- true, subClusterId, originalSubmissionContext);
-
- secondaryRelayers.put(subClusterId,
- uamPool.getAMRMClientRelayer(subClusterId));
-
- uamResponse = uamPool.registerApplicationMaster(subClusterId,
- amRegistrationRequest);
- } catch (Throwable e) {
- LOG.error("Failed to register application master: " + subClusterId
- + " Application: " + attemptId, e);
- // TODO: UAM registration for this sub-cluster RM
- // failed. For now, we ignore the resource requests and continue
- // but we need to fix this and handle this situation. One way would
- // be to send the request to another RM by consulting the policy.
- return;
- }
- uamRegistrations.put(scId, uamResponse);
- LOG.info("Successfully registered unmanaged application master: "
- + subClusterId + " ApplicationId: " + attemptId);
- try {
- uamPool.allocateAsync(subClusterId, requests.get(scId),
- new HeartbeatCallBack(scId, true));
- } catch (Throwable e) {
- LOG.error("Failed to allocate async to " + subClusterId
- + " Application: " + attemptId, e);
- }
+ Future> future = this.threadpool.submit(() -> {
- // Save the UAM token in registry or NMSS
- try {
- if (registryClient != null) {
- registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(),
- subClusterId, token);
- } else if (getNMStateStore() != null) {
- getNMStateStore().storeAMRMProxyAppContextEntry(attemptId,
- NMSS_SECONDARY_SC_PREFIX + subClusterId,
- token.encodeToUrlString().getBytes(STRING_TO_BYTE_FORMAT));
- }
- } catch (Throwable e) {
- LOG.error("Failed to persist UAM token from " + subClusterId
- + " Application: " + attemptId, e);
+ String subClusterId = scId.getId();
+
+ // Create a config loaded with federation on and subclusterId
+ // for each UAM
+ YarnConfiguration config = new YarnConfiguration(getConf());
+ FederationProxyProviderUtil.updateConfForFederation(config, subClusterId);
+ ApplicationId applicationId = attemptId.getApplicationId();
+
+ RegisterApplicationMasterResponse uamResponse;
+ Token token;
+
+ // LaunchUAM And RegisterApplicationMaster
+ try {
+ TokenAndRegisterResponse result =
+ ((FederationActionRetry) (retryCount) ->
+ launchUAMAndRegisterApplicationMaster(config, subClusterId, applicationId)).
+ runWithRetries(registerUamRetryNum, registerUamRetryInterval);
+
+ token = result.getToken();
+ uamResponse = result.getResponse();
+ } catch (Throwable e) {
+ LOG.error("Failed to register application master: {} Application: {}.",
+ subClusterId, attemptId, e);
+ return;
+ }
+
+ uamRegistrations.put(scId, uamResponse);
+
+ LOG.info("Successfully registered unmanaged application master: {} " +
+ "ApplicationId: {}.", subClusterId, attemptId);
+
+ // Allocate Request
+ try {
+ uamPool.allocateAsync(subClusterId, requests.get(scId),
+ new HeartbeatCallBack(scId, true));
+ } catch (Throwable e) {
+ LOG.error("Failed to allocate async to {} Application: {}.",
+ subClusterId, attemptId, e);
+ }
+
+ // Save the UAM token in registry or NMSS
+ try {
+ if (registryClient != null) {
+ registryClient.writeAMRMTokenForUAM(applicationId, subClusterId, token);
+ } else if (getNMStateStore() != null) {
+ getNMStateStore().storeAMRMProxyAppContextEntry(attemptId,
+ NMSS_SECONDARY_SC_PREFIX + subClusterId,
+ token.encodeToUrlString().getBytes(STRING_TO_BYTE_FORMAT));
}
+ } catch (Throwable e) {
+ LOG.error("Failed to persist UAM token from {} Application {}",
+ subClusterId, attemptId, e);
}
});
+
this.uamRegisterFutures.put(scId, future);
}
@@ -1347,10 +1363,34 @@ public void run() {
}
}
-
return newSubClusters;
}
+ protected TokenAndRegisterResponse launchUAMAndRegisterApplicationMaster(
+ YarnConfiguration config, String subClusterId, ApplicationId applicationId)
+ throws IOException, YarnException {
+
+ // Prepare parameter information
+ ApplicationSubmissionContext originalSubmissionContext =
+ federationFacade.getApplicationSubmissionContext(applicationId);
+ String submitter = getApplicationContext().getUser();
+ String homeRM = homeSubClusterId.toString();
+ String queue = amRegistrationResponse.getQueue();
+
+ // For appNameSuffix, use subClusterId of the home sub-cluster
+ Token token = uamPool.launchUAM(subClusterId, config, applicationId,
+ queue, submitter, homeRM, true, subClusterId, originalSubmissionContext);
+
+ // Set the relationship between SubCluster and AMRMClientRelayer.
+ secondaryRelayers.put(subClusterId, uamPool.getAMRMClientRelayer(subClusterId));
+
+ // RegisterApplicationMaster
+ RegisterApplicationMasterResponse uamResponse =
+ uamPool.registerApplicationMaster(subClusterId, amRegistrationRequest);
+
+ return new TokenAndRegisterResponse(token, uamResponse);
+ }
+
/**
* Prepare the base allocation response. Use lastSCResponse and
* lastHeartbeatTimeStamp to assemble entries about cluster-wide info, e.g.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TokenAndRegisterResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TokenAndRegisterResponse.java
new file mode 100644
index 00000000000000..d67ecab9a99122
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TokenAndRegisterResponse.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+
+/**
+ * This class contains information about the AMRM token and the RegisterApplicationMasterResponse.
+ */
+public class TokenAndRegisterResponse {
+ private Token token;
+ private RegisterApplicationMasterResponse response;
+
+ public TokenAndRegisterResponse(Token pToken,
+ RegisterApplicationMasterResponse pResponse) {
+ this.token = pToken;
+ this.response = pResponse;
+ }
+
+ public Token getToken() {
+ return token;
+ }
+
+ public RegisterApplicationMasterResponse getResponse() {
+ return response;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index 9e875437234a16..9e3e73f7f99956 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -38,7 +38,6 @@
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -179,9 +178,8 @@ protected YarnConfiguration createConfiguration() {
conf.setLong(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
500);
- // Wait UAM Register Down
- conf.setBoolean(YarnConfiguration.AMRM_PROXY_WAIT_UAM_REGISTER_DONE, true);
-
+ // Register UAM Retry Interval 1ms
+ conf.setLong(YarnConfiguration.FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL, 1);
return conf;
}
@@ -597,10 +595,6 @@ public Object run() throws Exception {
interceptor.recover(recoveredDataMap);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
-
- // Waiting for SC-1 to time out.
- GenericTestUtils.waitFor(() -> interceptor.getTimedOutSCs(true).size() == 1, 100, 1000);
-
// SC1 should be initialized to be timed out
Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());
@@ -859,7 +853,7 @@ public Object run() throws Exception {
List containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
for (Container c : containers) {
- LOG.info("Allocated container {}", c.getId());
+ LOG.info("Allocated container " + c.getId());
}
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
@@ -893,10 +887,6 @@ public Object run() throws Exception {
int numberOfContainers = 3;
// Should re-attach secondaries and get the three running containers
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
-
- // Waiting for SC-1 to time out.
- GenericTestUtils.waitFor(() -> interceptor.getTimedOutSCs(true).size() == 1, 100, 1000);
-
// SC1 should be initialized to be timed out
Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());
Assert.assertEquals(numberOfContainers,
@@ -1444,4 +1434,53 @@ private void finishApplication() throws IOException, YarnException {
Assert.assertNotNull(finishResponse);
Assert.assertTrue(finishResponse.getIsUnregistered());
}
+
+ @Test
+ public void testLaunchUAMAndRegisterApplicationMasterRetry() throws Exception {
+
+ UserGroupInformation ugi = interceptor.getUGIWithToken(interceptor.getAttemptId());
+ interceptor.setRetryCount(2);
+
+ ugi.doAs((PrivilegedExceptionAction) () -> {
+ // Register the application
+ RegisterApplicationMasterRequest registerReq =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
+ registerReq.setHost(Integer.toString(testAppId));
+ registerReq.setRpcPort(0);
+ registerReq.setTrackingUrl("");
+
+ RegisterApplicationMasterResponse registerResponse =
+ interceptor.registerApplicationMaster(registerReq);
+ Assert.assertNotNull(registerResponse);
+ lastResponseId = 0;
+
+ Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+ // Allocate the first batch of containers, with sc1 active
+ registerSubCluster(SubClusterId.newInstance("SC-1"));
+
+ int numberOfContainers = 3;
+ List containers = getContainersAndAssert(numberOfContainers, numberOfContainers);
+ Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+
+ // Release all containers
+ releaseContainersAndAssert(containers);
+
+ // Finish the application
+ FinishApplicationMasterRequest finishReq =
+ Records.newRecord(FinishApplicationMasterRequest.class);
+ finishReq.setDiagnostics("");
+ finishReq.setTrackingUrl("");
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+
+ FinishApplicationMasterResponse finishResponse =
+ interceptor.finishApplicationMaster(finishReq);
+ Assert.assertNotNull(finishResponse);
+ Assert.assertTrue(finishResponse.getIsUnregistered());
+
+ return null;
+ });
+
+ Assert.assertEquals(0, interceptor.getRetryCount());
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
index a15587442836f3..070131ef39aacb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
@@ -55,6 +55,7 @@ public class TestableFederationInterceptor extends FederationInterceptor {
private MockResourceManagerFacade mockRm;
private boolean isClientRPC = false;
+ private int retryCount = 0;
public TestableFederationInterceptor() {
}
@@ -258,6 +259,24 @@ protected T createRMProxy(Class protocol, Configuration config,
}
}
+ @Override
+ protected TokenAndRegisterResponse launchUAMAndRegisterApplicationMaster(YarnConfiguration config,
+ String subClusterId, ApplicationId applicationId) throws IOException, YarnException {
+ if (retryCount > 0) {
+ retryCount--;
+ throw new YarnException("launchUAMAndRegisterApplicationMaster will retry");
+ }
+ return super.launchUAMAndRegisterApplicationMaster(config, subClusterId, applicationId);
+ }
+
+ public void setRetryCount(int retryCount) {
+ this.retryCount = retryCount;
+ }
+
+ public int getRetryCount() {
+ return retryCount;
+ }
+
/**
* Wrap the handler thread, so it calls from the same user.
*/
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSQueueConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSQueueConverter.java
index 7d615bbb2540b3..b1cc4a6ccdf598 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSQueueConverter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSQueueConverter.java
@@ -344,6 +344,10 @@ public void testAutoCreateV2FlagsInWeightMode() {
assertTrue("root.admins autocreate v2 flag",
csConfig.getBoolean(
PREFIX + "root.admins.auto-queue-creation-v2.enabled", false));
+ assertTrue("root.admins.alice autocreate v2 flag",
+ csConfig.getBoolean(
+ PREFIX + "root.admins.alice.auto-queue-creation-v2.enabled",
+ false));
assertTrue("root.users autocreate v2 flag",
csConfig.getBoolean(
PREFIX + "root.users.auto-queue-creation-v2.enabled", false));
@@ -351,13 +355,15 @@ public void testAutoCreateV2FlagsInWeightMode() {
csConfig.getBoolean(
PREFIX + "root.misc.auto-queue-creation-v2.enabled", false));
+ //leaf queue root.admins.alice is removed from the below list
+ //adding reservation to a leaf, it's queueType changes to FSParentQueue
Set leafs = Sets.difference(ALL_QUEUES,
Sets.newHashSet("root",
- "root.default",
"root.admins",
"root.users",
- "root.misc"));
- assertNoValueForQueues(leafs, "auto-queue-creation-v2.enabled",
+ "root.misc",
+ "root.admins.alice"));
+ assertNoValueForQueues(leafs, ".auto-queue-creation-v2.enabled",
csConfig);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java
index 8defc73b3ea278..e7cc024b64001e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java
@@ -98,6 +98,8 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.classification.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Extends the {@code AbstractRequestInterceptorClient} class and provides an
@@ -107,25 +109,26 @@
*/
public class DefaultClientRequestInterceptor
extends AbstractClientRequestInterceptor {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DefaultClientRequestInterceptor.class);
private ApplicationClientProtocol clientRMProxy;
@Override
public void init(String userName) {
super.init(userName);
-
- final Configuration conf = this.getConf();
try {
- clientRMProxy =
- user.doAs(new PrivilegedExceptionAction() {
- @Override
- public ApplicationClientProtocol run() throws Exception {
- return ClientRMProxy.createRMProxy(conf,
- ApplicationClientProtocol.class);
- }
- });
+ final Configuration conf = this.getConf();
+ clientRMProxy = user.doAs(
+ (PrivilegedExceptionAction) () ->
+ ClientRMProxy.createRMProxy(conf, ApplicationClientProtocol.class));
} catch (Exception e) {
- throw new YarnRuntimeException(
- "Unable to create the interface to reach the YarnRM", e);
+ StringBuilder message = new StringBuilder();
+ message.append("Error while creating Router RMClient Service");
+ if (user != null) {
+ message.append(", user: " + user);
+ }
+ LOG.error(message.toString(), e);
+ throw new YarnRuntimeException(message.toString(), e);
}
}
@@ -355,6 +358,5 @@ public GetNodesToAttributesResponse getNodesToAttributes(
@VisibleForTesting
public void setRMClient(ApplicationClientProtocol clientRM) {
this.clientRMProxy = clientRM;
-
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java
index 8388af66d288ed..df5b4d5835df9c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java
@@ -80,38 +80,18 @@ public class DefaultRMAdminRequestInterceptor
public void init(String userName) {
super.init(userName);
try {
- // Do not create a proxy user if user name matches the user name on
- // current UGI
- if (UserGroupInformation.isSecurityEnabled()) {
- user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser());
- } else if (userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName())) {
- user = UserGroupInformation.getCurrentUser();
- } else {
- user = UserGroupInformation.createProxyUser(userName,
- UserGroupInformation.getCurrentUser());
- }
-
final Configuration conf = this.getConf();
-
rmAdminProxy = user.doAs(
- new PrivilegedExceptionAction() {
- @Override
- public ResourceManagerAdministrationProtocol run()
- throws Exception {
- return ClientRMProxy.createRMProxy(conf,
- ResourceManagerAdministrationProtocol.class);
- }
- });
- } catch (IOException e) {
- String message = "Error while creating Router RMAdmin Service for user:";
+ (PrivilegedExceptionAction) () ->
+ ClientRMProxy.createRMProxy(conf, ResourceManagerAdministrationProtocol.class));
+ } catch (Exception e) {
+ StringBuilder message = new StringBuilder();
+ message.append("Error while creating Router RMAdmin Service");
if (user != null) {
- message += ", user: " + user;
+ message.append(", user: " + user);
}
-
- LOG.info(message);
- throw new YarnRuntimeException(message, e);
- } catch (Exception e) {
- throw new YarnRuntimeException(e);
+ LOG.error(message.toString(), e);
+ throw new YarnRuntimeException(message.toString(), e);
}
}
@@ -126,7 +106,6 @@ public void setNextInterceptor(RMAdminRequestInterceptor next) {
@VisibleForTesting
public void setRMAdmin(ResourceManagerAdministrationProtocol rmAdmin) {
this.rmAdminProxy = rmAdmin;
-
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java
index 918865da4675e7..9d3e3be6f6e53d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java
@@ -103,6 +103,7 @@ protected SubClusterId getSubClusterId() {
@Override
public void init(String user) {
+ super.init(user);
webAppAddress = WebAppUtils.getRMWebAppURLWithScheme(getConf());
client = RouterWebServiceUtil.createJerseyClient(getConf());
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
index d4e1b5145cf0dc..f162ab6be65935 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
@@ -211,7 +211,7 @@ public Response createNewApplication(HttpServletRequest hsr)
validateRunning();
ApplicationId applicationId =
- ApplicationId.newInstance(Integer.valueOf(getSubClusterId().getId()),
+ ApplicationId.newInstance(Integer.parseInt(getSubClusterId().getId()),
applicationCounter.incrementAndGet());
NewApplication appId =
new NewApplication(applicationId.toString(), new ResourceInfo());
@@ -275,7 +275,7 @@ public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
AppInfo appInfo = new AppInfo();
appInfo.setAppId(
- ApplicationId.newInstance(Integer.valueOf(getSubClusterId().getId()),
+ ApplicationId.newInstance(Integer.parseInt(getSubClusterId().getId()),
applicationCounter.incrementAndGet()).toString());
appInfo.setAMHostHttpAddress("http://i_am_the_AM:1234");
@@ -316,7 +316,7 @@ public NodeInfo getNode(String nodeId) {
if (nodeId.contains(subClusterId) || nodeId.contains("test")) {
node = new NodeInfo();
node.setId(nodeId);
- node.setLastHealthUpdate(Integer.valueOf(getSubClusterId().getId()));
+ node.setLastHealthUpdate(Integer.parseInt(getSubClusterId().getId()));
}
return node;
}
@@ -328,7 +328,7 @@ public NodesInfo getNodes(String states) {
}
NodeInfo node = new NodeInfo();
node.setId("Node " + Integer.valueOf(getSubClusterId().getId()));
- node.setLastHealthUpdate(Integer.valueOf(getSubClusterId().getId()));
+ node.setLastHealthUpdate(Integer.parseInt(getSubClusterId().getId()));
NodesInfo nodes = new NodesInfo();
nodes.add(node);
return nodes;
@@ -350,12 +350,12 @@ public ClusterMetricsInfo getClusterMetricsInfo() {
throw new RuntimeException("RM is stopped");
}
ClusterMetricsInfo metrics = new ClusterMetricsInfo();
- metrics.setAppsSubmitted(Integer.valueOf(getSubClusterId().getId()));
- metrics.setAppsCompleted(Integer.valueOf(getSubClusterId().getId()));
- metrics.setAppsPending(Integer.valueOf(getSubClusterId().getId()));
- metrics.setAppsRunning(Integer.valueOf(getSubClusterId().getId()));
- metrics.setAppsFailed(Integer.valueOf(getSubClusterId().getId()));
- metrics.setAppsKilled(Integer.valueOf(getSubClusterId().getId()));
+ metrics.setAppsSubmitted(Integer.parseInt(getSubClusterId().getId()));
+ metrics.setAppsCompleted(Integer.parseInt(getSubClusterId().getId()));
+ metrics.setAppsPending(Integer.parseInt(getSubClusterId().getId()));
+ metrics.setAppsRunning(Integer.parseInt(getSubClusterId().getId()));
+ metrics.setAppsFailed(Integer.parseInt(getSubClusterId().getId()));
+ metrics.setAppsKilled(Integer.parseInt(getSubClusterId().getId()));
return metrics;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
index 1209eb95c1281e..e24f30e70958e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
@@ -157,7 +157,7 @@ Configuration
To configure the `YARN` to use the `Federation`, set the following property in the **conf/yarn-site.xml**:
-###EVERYWHERE:
+### EVERYWHERE:
These are common configurations that should appear in the **conf/yarn-site.xml** at each machine in the federation.
@@ -167,7 +167,7 @@ These are common configurations that should appear in the **conf/yarn-site.xml**
|`yarn.federation.enabled` | `true` | Whether federation is enabled or not |
|`yarn.resourcemanager.cluster-id` | `` | The unique subcluster identifier for this RM (same as the one used for HA). |
-####State-Store:
+#### State-Store:
Currently, we support ZooKeeper and SQL based implementations of the state-store.
@@ -192,7 +192,7 @@ SQL: one must setup the following parameters:
We provide scripts for **MySQL** and **Microsoft SQL Server**.
-> MySQL
+- MySQL
For MySQL, one must download the latest jar version 5.x from [MVN Repository](https://mvnrepository.com/artifact/mysql/mysql-connector-java) and add it to the CLASSPATH.
Then the DB schema is created by executing the following SQL scripts in the database:
@@ -211,7 +211,7 @@ In the same directory we provide scripts to drop the Stored Procedures, the Tabl
1. MySQL 5.7
2. MySQL 8.0
-> Microsoft SQL Server
+- Microsoft SQL Server
For SQL-Server, the process is similar, but the jdbc driver is already included.
SQL-Server scripts are located in **sbin/FederationStateStore/SQLServer/**.
@@ -221,10 +221,10 @@ SQL-Server scripts are located in **sbin/FederationStateStore/SQLServer/**.
1. SQL Server 2008 R2 Enterprise
2. SQL Server 2012 Enterprise
3. SQL Server 2016 Enterprise
-4. SQL Server 2017 Enterprise
+4. SQL Server 2017 Enterprise
5. SQL Server 2019 Enterprise
-####Optional:
+#### Optional:
| Property | Example | Description |
|:---- |:---- |:---- |
@@ -235,7 +235,88 @@ SQL-Server scripts are located in **sbin/FederationStateStore/SQLServer/**.
|`yarn.federation.subcluster-resolver.class` | `org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl` | The class used to resolve which subcluster a node belongs to, and which subcluster(s) a rack belongs to. |
|`yarn.federation.machine-list` | `` | Path of machine-list file used by `SubClusterResolver`. Each line of the file is a node with sub-cluster and rack information. Below is the example: node1, subcluster1, rack1 node2, subcluster2, rack1 node3, subcluster3, rack2 node4, subcluster3, rack2 |
-###ON RMs:
+**How to configure the policy-manager?**
+
+- Router Policy
+
+ Router Policy defines the logic for determining the routing of an application submission and determines the HomeSubCluster for the application.
+
+ - HashBasedRouterPolicy
+ - This policy selects a sub-cluster based on the hash of the job's queue name. It is particularly useful when dealing with a large number of queues in a system, providing a default behavior. Furthermore, it ensures that all jobs belonging to the same queue are consistently mapped to the same sub-cluster, which can improve locality and performance.
+ - LoadBasedRouterPolicy
+ - This is a simplified load-balancing policy implementation. The policy utilizes binary weights (0/1 values) to enable or disable each sub-cluster. It selects the sub-cluster with the least load to forward the application traffic, ensuring optimal distribution.
+ - LocalityRouterPolicy
+ - This policy selects the sub-cluster based on the node specified by the client for running its application. Follows these conditions:
+ - It succeeds if
+ - There are three AMContainerResourceRequests in the order NODE, RACK, ANY
+ - Falls back to WeightedRandomRouterPolicy
+ - Null or empty AMContainerResourceRequests;
+ - One AMContainerResourceRequests and it has ANY as ResourceName;
+ - The node is in blacklisted SubClusters.
+ - It fails if
+ - The node does not exist and RelaxLocality is False;
+ - We have an invalid number (not 0, 1 or 3) resource requests
+ - RejectRouterPolicy
+ - This policy simply rejects all incoming requests.
+ - UniformRandomRouterPolicy
+ - This simple policy picks at uniform random among any of the currently active sub-clusters. This policy is easy to use and good for testing.
+ - WeightedRandomRouterPolicy
+ - This policy implements a weighted random sample among currently active sub-clusters.
+
+- AMRM Policy
+
+ AMRM Proxy defines the logic to split the resource request list received by AM among RMs.
+
+ - BroadcastAMRMProxyPolicy
+ - This policy simply broadcasts each ResourceRequest to all the available sub-clusters.
+ - HomeAMRMProxyPolicy
+ - This policy simply sends the ResourceRequest to the home sub-cluster.
+ - LocalityMulticastAMRMProxyPolicy
+ - Host localized ResourceRequests are always forwarded to the RM that owns the corresponding node, based on the feedback of a SubClusterResolver
+ If the SubClusterResolver cannot resolve this node we default to forwarding the ResourceRequest to the home sub-cluster.
+ - Rack localized ResourceRequests are forwarded to the RMs that owns the corresponding rack. Note that in some deployments each rack could be
+ striped across multiple RMs. This policy respects that. If the SubClusterResolver cannot resolve this rack we default to forwarding
+ the ResourceRequest to the home sub-cluster.
+ - ANY requests corresponding to node/rack local requests are forwarded only to the set of RMs that owns the corresponding localized requests. The number of
+ containers listed in each ANY is proportional to the number of localized container requests (associated to this ANY via the same allocateRequestId).
+ - RejectAMRMProxyPolicy
+ - This policy simply rejects all requests. Useful to prevent apps from accessing any sub-cluster.
+
+- Policy Manager
+
+ The PolicyManager is providing a combination of RouterPolicy and AMRMPolicy.
+
+ We can set policy-manager like this:
+ ```xml
+
+
+ yarn.federation.policy-manager
+ org.apache.hadoop.yarn.server.federation.policies.manager.HashBroadcastPolicyManager
+
+ ```
+
+ - HashBroadcastPolicyManager
+ - Policy that routes applications via hashing of their queuename, and broadcast resource requests. This picks a HashBasedRouterPolicy for the router and a BroadcastAMRMProxyPolicy for the amrmproxy as they are designed to work together.
+ - HomePolicyManager
+ - Policy manager which uses the UniformRandomRouterPolicy for the Router and HomeAMRMProxyPolicy as the AMRMProxy policy to find the RM.
+ - PriorityBroadcastPolicyManager
+ - Policy that allows operator to configure "weights" for routing. This picks a PriorityRouterPolicy for the router and a BroadcastAMRMProxyPolicy for the amrmproxy as they are designed to work together.
+ - RejectAllPolicyManager
+ - This policy rejects all requests for both router and amrmproxy routing. This picks a RejectRouterPolicy for the router and a RejectAMRMProxyPolicy for the amrmproxy as they are designed to work together.
+ - UniformBroadcastPolicyManager
+ - It combines the basic policies: UniformRandomRouterPolicy and BroadcastAMRMProxyPolicy, which are designed to work together and "spread" the load among sub-clusters uniformly. This simple policy might impose heavy load on the RMs and return more containers than a job requested as all requests are (replicated and) broadcasted.
+ - WeightedLocalityPolicyManager
+ - Policy that allows operator to configure "weights" for routing. This picks a LocalityRouterPolicy for the router and a LocalityMulticastAMRMProxyPolicy for the amrmproxy as they are designed to work together.
+
+### ON RMs:
These are extra configurations that should appear in the **conf/yarn-site.xml** at each ResourceManager.