From a822a3c70bac8bb25b7b5b926030a2cd9499f52e Mon Sep 17 00:00:00 2001 From: Masatake Iwasaki Date: Tue, 11 Jul 2023 15:38:09 +0900 Subject: [PATCH 1/8] YARN-11528. Lock triple-beam to the version compatible with node.js 12 to avoid compilation error. (#5827). Contributed by Masatake Iwasaki Reviewed-by: Shilun Fan Signed-off-by: Ayush Saxena --- .../hadoop-yarn-applications-catalog-webapp/package.json | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-catalog/hadoop-yarn-applications-catalog-webapp/package.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-catalog/hadoop-yarn-applications-catalog-webapp/package.json index f09442cfc4e87a..59cc3da179fd02 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-catalog/hadoop-yarn-applications-catalog-webapp/package.json +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-catalog/hadoop-yarn-applications-catalog-webapp/package.json @@ -19,6 +19,9 @@ "shelljs": "^0.2.6", "apidoc": "0.17.7" }, + "resolutions": { + "triple-beam": "1.3.0" + }, "scripts": { "prestart": "npm install & mvn clean package", "pretest": "npm install" From 4e699f0383590d6c72cb3ee2294da29a4945922f Mon Sep 17 00:00:00 2001 From: huhaiyang Date: Tue, 11 Jul 2023 15:46:06 +0800 Subject: [PATCH 2/8] HDFS-17076. Remove the unused method isSlownodeByNameserviceId in DataNode (#5824). Contributed by Haiyang Hu. Signed-off-by: Ayush Saxena --- .../hadoop/hdfs/server/datanode/BlockPoolManager.java | 7 ------- .../org/apache/hadoop/hdfs/server/datanode/DataNode.java | 4 ---- 2 files changed, 11 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java index 073576546c790e..1611c3c9ce532a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java @@ -308,13 +308,6 @@ Map getBpByNameserviceId() { return bpByNameserviceId; } - boolean isSlownodeByNameserviceId(String nsId) { - if (bpByNameserviceId.containsKey(nsId)) { - return bpByNameserviceId.get(nsId).isSlownode(); - } - return false; - } - boolean isSlownodeByBlockPoolId(String bpId) { if (bpByBlockPoolId.containsKey(bpId)) { return bpByBlockPoolId.get(bpId).isSlownode(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 41088cf59ce4b2..2096f18d31abff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -4240,10 +4240,6 @@ public DataSetLockManager getDataSetLockManager() { return dataSetLockManager; } - boolean isSlownodeByNameserviceId(String nsId) { - return blockPoolManager.isSlownodeByNameserviceId(nsId); - } - boolean isSlownodeByBlockPoolId(String bpId) { return blockPoolManager.isSlownodeByBlockPoolId(bpId); } From 6843f8e4e0f174e3df4cfd06d8bb65cfce31eaa8 Mon Sep 17 00:00:00 2001 From: WangYuanben <48795318+YuanbenWang@users.noreply.github.com> Date: Tue, 11 Jul 2023 19:09:50 +0800 Subject: [PATCH 3/8] HADOOP-18794. ipc.server.handler.queue.size missing from core-default.xml (#5819). Contributed by WangYuanben. Reviewed-by: Hualong Zhang Reviewed-by: Shilun Fan Signed-off-by: Ayush Saxena --- .../hadoop-common/src/main/resources/core-default.xml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 438e2df1372065..dd543deb8a5a53 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -2321,6 +2321,15 @@ The switch to turn S3A auditing on or off. + + ipc.server.handler.queue.size + 100 + + Indicates how many calls per handler are allowed in the queue. This value can + determine the maximum call queue size by multiplying the number of handler threads. + + + ipc.server.listen.queue.size 256 From fac7d26c5d7f791565cc3ab45d079e2cca725f95 Mon Sep 17 00:00:00 2001 From: Mehakmeet Singh Date: Tue, 11 Jul 2023 17:57:05 +0530 Subject: [PATCH 4/8] HADOOP-18781. ABFS backReference passed down to streams to avoid GC closing the FS. (#5780) To avoid the ABFS instance getting closed due to GC while the streams are working, attach the ABFS instance to a backReference opaque object and passing down to the streams so that we have a hard reference while the streams are working. Contributed by: Mehakmeet Singh --- .../apache/hadoop/fs/impl/BackReference.java | 48 ++++++++++++ .../fs/azurebfs/AzureBlobFileSystem.java | 9 ++- .../fs/azurebfs/AzureBlobFileSystemStore.java | 14 ++++ .../fs/azurebfs/services/AbfsInputStream.java | 10 +++ .../services/AbfsInputStreamContext.java | 15 ++++ .../azurebfs/services/AbfsOutputStream.java | 21 +++++ .../services/AbfsOutputStreamContext.java | 14 ++++ .../services/ITestAbfsInputStream.java | 21 +++++ .../services/ITestAbfsOutputStream.java | 78 +++++++++++++++++++ 9 files changed, 228 insertions(+), 2 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/BackReference.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/BackReference.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/BackReference.java new file mode 100644 index 00000000000000..04e39875b60bfc --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/BackReference.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import javax.annotation.Nullable; + +/** + * Holds reference to an object to be attached to a stream or store to avoid + * the reference being lost to GC. + */ +public class BackReference { + private final Object reference; + + public BackReference(@Nullable Object reference) { + this.reference = reference; + } + + /** + * is the reference null? + * @return true if the ref. is null, else false. + */ + public boolean isNull() { + return reference == null; + } + + @Override + public String toString() { + return "BackReference{" + + "reference=" + reference + + '}'; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 5fb2c6e1700a81..426ad8ca1e1912 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -45,6 +45,7 @@ import javax.annotation.Nullable; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.impl.BackReference; import org.apache.hadoop.security.ProviderUtils; import org.apache.hadoop.util.Preconditions; import org.slf4j.Logger; @@ -155,6 +156,9 @@ public class AzureBlobFileSystem extends FileSystem /** Rate limiting for operations which use it to throttle their IO. */ private RateLimiting rateLimiting; + /** Storing full path uri for better logging. */ + private URI fullPathUri; + @Override public void initialize(URI uri, Configuration configuration) throws IOException { @@ -165,7 +169,7 @@ public void initialize(URI uri, Configuration configuration) setConf(configuration); LOG.debug("Initializing AzureBlobFileSystem for {}", uri); - + this.fullPathUri = uri; this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); abfsCounters = new AbfsCountersImpl(uri); // name of the blockFactory to be used. @@ -192,6 +196,7 @@ public void initialize(URI uri, Configuration configuration) .withAbfsCounters(abfsCounters) .withBlockFactory(blockFactory) .withBlockOutputActiveBlocks(blockOutputActiveBlocks) + .withBackReference(new BackReference(this)) .build(); this.abfsStore = new AzureBlobFileSystemStore(systemStoreBuilder); @@ -236,7 +241,7 @@ public void initialize(URI uri, Configuration configuration) public String toString() { final StringBuilder sb = new StringBuilder( "AzureBlobFileSystem{"); - sb.append("uri=").append(uri); + sb.append("uri=").append(fullPathUri); sb.append(", user='").append(abfsStore.getUser()).append('\''); sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\''); sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]"); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 79ffc796c3ae59..5c06270fa5bb81 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -55,6 +55,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.impl.BackReference; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.base.Strings; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures; @@ -189,6 +190,9 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { /** Bounded ThreadPool for this instance. */ private ExecutorService boundedThreadPool; + /** ABFS instance reference to be held by the store to avoid GC close. */ + private BackReference fsBackRef; + /** * FileSystem Store for {@link AzureBlobFileSystem} for Abfs operations. * Built using the {@link AzureBlobFileSystemStoreBuilder} with parameters @@ -202,6 +206,7 @@ public AzureBlobFileSystemStore( String[] authorityParts = authorityParts(uri); final String fileSystemName = authorityParts[0]; final String accountName = authorityParts[1]; + this.fsBackRef = abfsStoreBuilder.fsBackRef; leaseRefs = Collections.synchronizedMap(new WeakHashMap<>()); @@ -711,6 +716,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext( .withExecutorService(new SemaphoredDelegatingExecutor(boundedThreadPool, blockOutputActiveBlocks, true)) .withTracingContext(tracingContext) + .withAbfsBackRef(fsBackRef) .build(); } @@ -818,6 +824,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext( abfsConfiguration.shouldReadBufferSizeAlways()) .withReadAheadBlockSize(abfsConfiguration.getReadAheadBlockSize()) .withBufferedPreadDisabled(bufferedPreadDisabled) + .withAbfsBackRef(fsBackRef) .build(); } @@ -1871,6 +1878,7 @@ public static final class AzureBlobFileSystemStoreBuilder { private AbfsCounters abfsCounters; private DataBlocks.BlockFactory blockFactory; private int blockOutputActiveBlocks; + private BackReference fsBackRef; public AzureBlobFileSystemStoreBuilder withUri(URI value) { this.uri = value; @@ -1906,6 +1914,12 @@ public AzureBlobFileSystemStoreBuilder withBlockOutputActiveBlocks( return this; } + public AzureBlobFileSystemStoreBuilder withBackReference( + BackReference fsBackRef) { + this.fsBackRef = fsBackRef; + return this; + } + public AzureBlobFileSystemStoreBuilder build() { return this; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index fdeaf701775712..86442dac9aaf70 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -26,6 +26,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.impl.BackReference; import org.apache.hadoop.util.Preconditions; import org.slf4j.Logger; @@ -121,6 +122,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, */ private long nextReadPos; + /** ABFS instance to be held by the input stream to avoid GC close. */ + private final BackReference fsBackRef; + public AbfsInputStream( final AbfsClient client, final Statistics statistics, @@ -152,6 +156,7 @@ public AbfsInputStream( this.tracingContext.setStreamID(inputStreamId); this.context = abfsInputStreamContext; readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize(); + this.fsBackRef = abfsInputStreamContext.getFsBackRef(); // Propagate the config values to ReadBufferManager so that the first instance // to initialize can set the readAheadBlockSize @@ -857,4 +862,9 @@ long getFCursorAfterLastRead() { long getLimit() { return this.limit; } + + @VisibleForTesting + BackReference getFsBackRef() { + return fsBackRef; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index e258958b1a1116..b78a899340f875 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -20,6 +20,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.impl.BackReference; import org.apache.hadoop.util.Preconditions; /** @@ -51,6 +53,9 @@ public class AbfsInputStreamContext extends AbfsStreamContext { private boolean bufferedPreadDisabled; + /** A BackReference to the FS instance that created this OutputStream. */ + private BackReference fsBackRef; + public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { super(sasTokenRenewPeriodForStreamsInSeconds); } @@ -122,6 +127,12 @@ public AbfsInputStreamContext withBufferedPreadDisabled( return this; } + public AbfsInputStreamContext withAbfsBackRef( + final BackReference fsBackRef) { + this.fsBackRef = fsBackRef; + return this; + } + public AbfsInputStreamContext build() { if (readBufferSize > readAheadBlockSize) { LOG.debug( @@ -180,4 +191,8 @@ public int getReadAheadBlockSize() { public boolean isBufferedPreadDisabled() { return bufferedPreadDisabled; } + + public BackReference getFsBackRef() { + return fsBackRef; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 82e20ce5b76842..4268dc3f918a12 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -27,6 +27,7 @@ import java.util.UUID; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.impl.BackReference; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; @@ -126,6 +127,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable, /** Executor service to carry out the parallel upload requests. */ private final ListeningExecutorService executorService; + /** ABFS instance to be held by the output stream to avoid GC close. */ + private final BackReference fsBackRef; + public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext) throws IOException { this.client = abfsOutputStreamContext.getClient(); @@ -147,6 +151,7 @@ public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext) this.numOfAppendsToServerSinceLastFlush = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics(); + this.fsBackRef = abfsOutputStreamContext.getFsBackRef(); if (this.isAppendBlob) { this.maxConcurrentRequestCount = 1; @@ -488,6 +493,12 @@ public synchronized void close() throws IOException { } try { + // Check if Executor Service got shutdown before the writes could be + // completed. + if (hasActiveBlockDataToUpload() && executorService.isShutdown()) { + throw new PathIOException(path, "Executor Service closed before " + + "writes could be completed."); + } flushInternal(true); } catch (IOException e) { // Problems surface in try-with-resources clauses if @@ -766,4 +777,14 @@ public String toString() { sb.append("}"); return sb.toString(); } + + @VisibleForTesting + BackReference getFsBackRef() { + return fsBackRef; + } + + @VisibleForTesting + ListeningExecutorService getExecutorService() { + return executorService; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java index ed897330367418..1d1a99c7d9f6f9 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.impl.BackReference; import org.apache.hadoop.fs.store.DataBlocks; /** @@ -65,6 +66,9 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { private TracingContext tracingContext; + /** A BackReference to the FS instance that created this OutputStream. */ + private BackReference fsBackRef; + public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { super(sasTokenRenewPeriodForStreamsInSeconds); } @@ -157,6 +161,12 @@ public AbfsOutputStreamContext withTracingContext( return this; } + public AbfsOutputStreamContext withAbfsBackRef( + final BackReference fsBackRef) { + this.fsBackRef = fsBackRef; + return this; + } + public AbfsOutputStreamContext build() { // Validation of parameters to be done here. if (streamStatistics == null) { @@ -261,4 +271,8 @@ public ExecutorService getExecutorService() { public TracingContext getTracingContext() { return tracingContext; } + + public BackReference getFsBackRef() { + return fsBackRef; + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java index 66f072501dc4db..2ac58fbcb1668e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java @@ -32,6 +32,8 @@ import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; + +import org.assertj.core.api.Assertions; import org.junit.Test; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; @@ -106,6 +108,25 @@ public void testExceptionInOptimization() throws Exception { } } + /** + * Testing the back reference being passed down to AbfsInputStream. + */ + @Test + public void testAzureBlobFileSystemBackReferenceInInputStream() + throws IOException { + Path path = path(getMethodName()); + // Create a file then open it to verify if this input stream contains any + // back reference. + try (FSDataOutputStream out = getFileSystem().create(path); + FSDataInputStream in = getFileSystem().open(path)) { + AbfsInputStream abfsInputStream = (AbfsInputStream) in.getWrappedStream(); + + Assertions.assertThat(abfsInputStream.getFsBackRef().isNull()) + .describedAs("BackReference in input stream should not be null") + .isFalse(); + } + } + private void testExceptionInOptimization(final FileSystem fs, final Path testFilePath, final int seekPos, final int length, final byte[] fileContent) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java index 431c456ae3daaf..eee0c177c33b33 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java @@ -18,19 +18,28 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + import org.assertj.core.api.Assertions; import org.junit.Test; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; +import org.apache.hadoop.test.LambdaTestUtils; /** * Test create operation. */ public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest { + + private static final int TEST_EXECUTION_TIMEOUT = 2 * 60 * 1000; private static final String TEST_FILE_PATH = "testfile"; public ITestAbfsOutputStream() throws Exception { @@ -84,4 +93,73 @@ public void testMaxRequestsAndQueueCapacity() throws Exception { } } + /** + * Verify the passing of AzureBlobFileSystem reference to AbfsOutputStream + * to make sure that the FS instance is not eligible for GC while writing. + */ + @Test(timeout = TEST_EXECUTION_TIMEOUT) + public void testAzureBlobFileSystemBackReferenceInOutputStream() + throws Exception { + byte[] testBytes = new byte[5 * 1024]; + // Creating an output stream using a FS in a separate method to make the + // FS instance used eligible for GC. Since when a method is popped from + // the stack frame, it's variables become anonymous, this creates higher + // chance of getting Garbage collected. + try (AbfsOutputStream out = getStream()) { + // Every 5KB block written is flushed and a GC is hinted, if the + // executor service is shut down in between, the test should fail + // indicating premature shutdown while writing. + for (int i = 0; i < 5; i++) { + out.write(testBytes); + out.flush(); + System.gc(); + Assertions.assertThat( + out.getExecutorService().isShutdown() || out.getExecutorService() + .isTerminated()) + .describedAs("Executor Service should not be closed before " + + "OutputStream while writing") + .isFalse(); + Assertions.assertThat(out.getFsBackRef().isNull()) + .describedAs("BackReference in output stream should not be null") + .isFalse(); + } + } + } + + /** + * Verify AbfsOutputStream close() behaviour of throwing a PathIOE when the + * FS instance is closed before the stream. + */ + @Test + public void testAbfsOutputStreamClosingFsBeforeStream() + throws Exception { + AzureBlobFileSystem fs = new AzureBlobFileSystem(); + fs.initialize(new URI(getTestUrl()), new Configuration()); + Path pathFs = path(getMethodName()); + byte[] inputBytes = new byte[5 * 1024]; + try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs, + pathFs)) { + out.write(inputBytes); + fs.close(); + // verify that output stream close after fs.close() would raise a + // pathIOE containing the path being written to. + LambdaTestUtils + .intercept(PathIOException.class, getMethodName(), out::close); + } + } + + /** + * Separate method to create an outputStream using a local FS instance so + * that once this method has returned, the FS instance can be eligible for GC. + * + * @return AbfsOutputStream used for writing. + */ + private AbfsOutputStream getStream() throws URISyntaxException, IOException { + AzureBlobFileSystem fs1 = new AzureBlobFileSystem(); + fs1.initialize(new URI(getTestUrl()), new Configuration()); + Path pathFs1 = path(getMethodName() + "1"); + + return createAbfsOutputStreamWithFlushEnabled(fs1, pathFs1); + } + } From c13d92996dfd7e18761ed47ad22e78f42ce64746 Mon Sep 17 00:00:00 2001 From: Susheel Gupta <38013283+susheelgupta7@users.noreply.github.com> Date: Tue, 11 Jul 2023 18:50:51 +0530 Subject: [PATCH 5/8] YARN-11464: TestFSQueueConverter#testAutoCreateV2FlagsInWeightMode has a missing dot before auto-queue-creation-v2.enabled for method call assertNoValueForQueues (#5792) Co-authored-by: Susheel Gupta --- .../fair/converter/TestFSQueueConverter.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSQueueConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSQueueConverter.java index 7d615bbb2540b3..b1cc4a6ccdf598 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSQueueConverter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSQueueConverter.java @@ -344,6 +344,10 @@ public void testAutoCreateV2FlagsInWeightMode() { assertTrue("root.admins autocreate v2 flag", csConfig.getBoolean( PREFIX + "root.admins.auto-queue-creation-v2.enabled", false)); + assertTrue("root.admins.alice autocreate v2 flag", + csConfig.getBoolean( + PREFIX + "root.admins.alice.auto-queue-creation-v2.enabled", + false)); assertTrue("root.users autocreate v2 flag", csConfig.getBoolean( PREFIX + "root.users.auto-queue-creation-v2.enabled", false)); @@ -351,13 +355,15 @@ public void testAutoCreateV2FlagsInWeightMode() { csConfig.getBoolean( PREFIX + "root.misc.auto-queue-creation-v2.enabled", false)); + //leaf queue root.admins.alice is removed from the below list + //adding reservation to a leaf, it's queueType changes to FSParentQueue Set leafs = Sets.difference(ALL_QUEUES, Sets.newHashSet("root", - "root.default", "root.admins", "root.users", - "root.misc")); - assertNoValueForQueues(leafs, "auto-queue-creation-v2.enabled", + "root.misc", + "root.admins.alice")); + assertNoValueForQueues(leafs, ".auto-queue-creation-v2.enabled", csConfig); } From 33b1677e9e4e8aa4a41e7b7a242a3fbab7010f53 Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Wed, 12 Jul 2023 00:54:06 +0800 Subject: [PATCH 6/8] YARN-11524. Improve the Policy Description in Federation.md. (#5797) --- .../src/site/markdown/Federation.md | 95 +++++++++++++++++-- 1 file changed, 88 insertions(+), 7 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md index 1209eb95c1281e..e24f30e70958e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md @@ -157,7 +157,7 @@ Configuration To configure the `YARN` to use the `Federation`, set the following property in the **conf/yarn-site.xml**: -###EVERYWHERE: +### EVERYWHERE: These are common configurations that should appear in the **conf/yarn-site.xml** at each machine in the federation. @@ -167,7 +167,7 @@ These are common configurations that should appear in the **conf/yarn-site.xml** |`yarn.federation.enabled` | `true` | Whether federation is enabled or not | |`yarn.resourcemanager.cluster-id` | `` | The unique subcluster identifier for this RM (same as the one used for HA). | -####State-Store: +#### State-Store: Currently, we support ZooKeeper and SQL based implementations of the state-store. @@ -192,7 +192,7 @@ SQL: one must setup the following parameters: We provide scripts for **MySQL** and **Microsoft SQL Server**. -> MySQL +- MySQL For MySQL, one must download the latest jar version 5.x from [MVN Repository](https://mvnrepository.com/artifact/mysql/mysql-connector-java) and add it to the CLASSPATH. Then the DB schema is created by executing the following SQL scripts in the database: @@ -211,7 +211,7 @@ In the same directory we provide scripts to drop the Stored Procedures, the Tabl 1. MySQL 5.7 2. MySQL 8.0 -> Microsoft SQL Server +- Microsoft SQL Server For SQL-Server, the process is similar, but the jdbc driver is already included. SQL-Server scripts are located in **sbin/FederationStateStore/SQLServer/**. @@ -221,10 +221,10 @@ SQL-Server scripts are located in **sbin/FederationStateStore/SQLServer/**. 1. SQL Server 2008 R2 Enterprise 2. SQL Server 2012 Enterprise 3. SQL Server 2016 Enterprise -4. SQL Server 2017 Enterprise +4. SQL Server 2017 Enterprise 5. SQL Server 2019 Enterprise -####Optional: +#### Optional: | Property | Example | Description | |:---- |:---- |:---- | @@ -235,7 +235,88 @@ SQL-Server scripts are located in **sbin/FederationStateStore/SQLServer/**. |`yarn.federation.subcluster-resolver.class` | `org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl` | The class used to resolve which subcluster a node belongs to, and which subcluster(s) a rack belongs to. | |`yarn.federation.machine-list` | `` | Path of machine-list file used by `SubClusterResolver`. Each line of the file is a node with sub-cluster and rack information. Below is the example:

node1, subcluster1, rack1
node2, subcluster2, rack1
node3, subcluster3, rack2
node4, subcluster3, rack2 | -###ON RMs: +**How to configure the policy-manager?** + +- Router Policy + + Router Policy defines the logic for determining the routing of an application submission and determines the HomeSubCluster for the application. + + - HashBasedRouterPolicy + - This policy selects a sub-cluster based on the hash of the job's queue name. It is particularly useful when dealing with a large number of queues in a system, providing a default behavior. Furthermore, it ensures that all jobs belonging to the same queue are consistently mapped to the same sub-cluster, which can improve locality and performance. + - LoadBasedRouterPolicy + - This is a simplified load-balancing policy implementation. The policy utilizes binary weights (0/1 values) to enable or disable each sub-cluster. It selects the sub-cluster with the least load to forward the application traffic, ensuring optimal distribution. + - LocalityRouterPolicy + - This policy selects the sub-cluster based on the node specified by the client for running its application. Follows these conditions: + - It succeeds if + - There are three AMContainerResourceRequests in the order NODE, RACK, ANY + - Falls back to WeightedRandomRouterPolicy + - Null or empty AMContainerResourceRequests; + - One AMContainerResourceRequests and it has ANY as ResourceName; + - The node is in blacklisted SubClusters. + - It fails if + - The node does not exist and RelaxLocality is False; + - We have an invalid number (not 0, 1 or 3) resource requests + - RejectRouterPolicy + - This policy simply rejects all incoming requests. + - UniformRandomRouterPolicy + - This simple policy picks at uniform random among any of the currently active sub-clusters. This policy is easy to use and good for testing. + - WeightedRandomRouterPolicy + - This policy implements a weighted random sample among currently active sub-clusters. + +- AMRM Policy + + AMRM Proxy defines the logic to split the resource request list received by AM among RMs. + + - BroadcastAMRMProxyPolicy + - This policy simply broadcasts each ResourceRequest to all the available sub-clusters. + - HomeAMRMProxyPolicy + - This policy simply sends the ResourceRequest to the home sub-cluster. + - LocalityMulticastAMRMProxyPolicy + - Host localized ResourceRequests are always forwarded to the RM that owns the corresponding node, based on the feedback of a SubClusterResolver + If the SubClusterResolver cannot resolve this node we default to forwarding the ResourceRequest to the home sub-cluster. + - Rack localized ResourceRequests are forwarded to the RMs that owns the corresponding rack. Note that in some deployments each rack could be + striped across multiple RMs. This policy respects that. If the SubClusterResolver cannot resolve this rack we default to forwarding + the ResourceRequest to the home sub-cluster. + - ANY requests corresponding to node/rack local requests are forwarded only to the set of RMs that owns the corresponding localized requests. The number of + containers listed in each ANY is proportional to the number of localized container requests (associated to this ANY via the same allocateRequestId). + - RejectAMRMProxyPolicy + - This policy simply rejects all requests. Useful to prevent apps from accessing any sub-cluster. + +- Policy Manager + + The PolicyManager is providing a combination of RouterPolicy and AMRMPolicy. + + We can set policy-manager like this: + ```xml + + + yarn.federation.policy-manager + org.apache.hadoop.yarn.server.federation.policies.manager.HashBroadcastPolicyManager + + ``` + + - HashBroadcastPolicyManager + - Policy that routes applications via hashing of their queuename, and broadcast resource requests. This picks a HashBasedRouterPolicy for the router and a BroadcastAMRMProxyPolicy for the amrmproxy as they are designed to work together. + - HomePolicyManager + - Policy manager which uses the UniformRandomRouterPolicy for the Router and HomeAMRMProxyPolicy as the AMRMProxy policy to find the RM. + - PriorityBroadcastPolicyManager + - Policy that allows operator to configure "weights" for routing. This picks a PriorityRouterPolicy for the router and a BroadcastAMRMProxyPolicy for the amrmproxy as they are designed to work together. + - RejectAllPolicyManager + - This policy rejects all requests for both router and amrmproxy routing. This picks a RejectRouterPolicy for the router and a RejectAMRMProxyPolicy for the amrmproxy as they are designed to work together. + - UniformBroadcastPolicyManager + - It combines the basic policies: UniformRandomRouterPolicy and BroadcastAMRMProxyPolicy, which are designed to work together and "spread" the load among sub-clusters uniformly. This simple policy might impose heavy load on the RMs and return more containers than a job requested as all requests are (replicated and) broadcasted. + - WeightedLocalityPolicyManager + - Policy that allows operator to configure "weights" for routing. This picks a LocalityRouterPolicy for the router and a LocalityMulticastAMRMProxyPolicy for the amrmproxy as they are designed to work together. + +### ON RMs: These are extra configurations that should appear in the **conf/yarn-site.xml** at each ResourceManager. From 8b88e9f8f4fa6a98fba71f1fb5bd8a674cc8a400 Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Wed, 12 Jul 2023 09:47:07 +0800 Subject: [PATCH 7/8] YARN-11509. The FederationInterceptor#launchUAM Added retry logic. (#5727) --- .../hadoop/yarn/conf/YarnConfiguration.java | 14 ++ .../src/main/resources/yarn-default.xml | 18 ++ .../amrmproxy/FederationInterceptor.java | 180 +++++++++++------- .../amrmproxy/TokenAndRegisterResponse.java | 45 +++++ .../amrmproxy/TestFederationInterceptor.java | 65 +++++-- .../TestableFederationInterceptor.java | 19 ++ 6 files changed, 258 insertions(+), 83 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TokenAndRegisterResponse.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 1e1fbb59939987..648fddbbbe90bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -4058,6 +4058,20 @@ public static boolean isAclEnabled(Configuration conf) { public static final long DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT = 60000; // one minute + // AMRMProxy Register UAM Retry-Num + public static final String FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT = + FEDERATION_PREFIX + "amrmproxy.register.uam.retry-count"; + // Register a UAM , we will retry a maximum of 3 times. + public static final int DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT = + 3; + + // AMRMProxy Register UAM Retry Interval + public static final String FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL = + FEDERATION_PREFIX + "amrmproxy.register.uam.interval"; + // Retry Interval, default 100 ms + public static final long DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL = + TimeUnit.MILLISECONDS.toMillis(100); + public static final String DEFAULT_FEDERATION_POLICY_KEY = "*"; public static final String FEDERATION_POLICY_MANAGER = FEDERATION_PREFIX + "policy-manager"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 132f08f6b07ef0..f722af852f4a8e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -5408,4 +5408,22 @@ + + + The number of retry for Register UAM. + The default value is 3. + + yarn.federation.amrmproxy.register.uam.retry-count + 3 + + + + + Interval between retry for Register UAM. + The default value is 100ms. + + yarn.federation.amrmproxy.register.uam.interval + 100ms + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index 14a2d60c2b5bc0..32c5bf217e233e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -36,6 +36,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.stream.Collectors; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; @@ -87,6 +88,7 @@ import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; +import org.apache.hadoop.yarn.server.federation.retry.FederationActionRetry; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; @@ -251,6 +253,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor { // the maximum wait time for the first async heart beat response private long heartbeatMaxWaitTimeMs; + private int registerUamRetryNum; + + private long registerUamRetryInterval; + private boolean waitUamRegisterDone; private MonotonicClock clock = new MonotonicClock(); @@ -355,6 +361,24 @@ public void init(AMRMProxyApplicationContext appContext) { this.subClusterTimeOut = YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT; } + + this.registerUamRetryNum = conf.getInt( + YarnConfiguration.FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT, + YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT); + if (this.registerUamRetryNum <= 0) { + LOG.info("{} configured to be {}, should be positive. Using default of {}.", + YarnConfiguration.FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT, + this.subClusterTimeOut, + YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT); + this.registerUamRetryNum = + YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT; + } + + this.registerUamRetryInterval = conf.getTimeDuration( + YarnConfiguration.FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL, + YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL, + TimeUnit.MILLISECONDS); + this.waitUamRegisterDone = conf.getBoolean(YarnConfiguration.AMRM_PROXY_WAIT_UAM_REGISTER_DONE, YarnConfiguration.DEFAULT_AMRM_PROXY_WAIT_UAM_REGISTER_DONE); } @@ -701,7 +725,7 @@ public AllocateResponse allocate(AllocateRequest request) if (this.finishAMCalled) { LOG.warn("FinishApplicationMaster already called by {}, skip heartbeat " - + "processing and return dummy response" + this.attemptId); + + "processing and return dummy response.", this.attemptId); return RECORD_FACTORY.newRecordInstance(AllocateResponse.class); } @@ -1255,85 +1279,77 @@ private List registerAndAllocateWithNewSubClusters( // Check to see if there are any new sub-clusters in this request // list and create and register Unmanaged AM instance for the new ones List newSubClusters = new ArrayList<>(); - for (SubClusterId subClusterId : requests.keySet()) { - if (!subClusterId.equals(this.homeSubClusterId) - && !this.uamPool.hasUAMId(subClusterId.getId())) { - newSubClusters.add(subClusterId); + requests.keySet().stream().forEach(subClusterId -> { + String id = subClusterId.getId(); + if (!subClusterId.equals(this.homeSubClusterId) && !this.uamPool.hasUAMId(id)) { + newSubClusters.add(subClusterId); // Set sub-cluster to be timed out initially - lastSCResponseTime.put(subClusterId, - clock.getTime() - subClusterTimeOut); + lastSCResponseTime.put(subClusterId, clock.getTime() - subClusterTimeOut); } - } + }); this.uamRegisterFutures.clear(); + for (final SubClusterId scId : newSubClusters) { - Future future = this.threadpool.submit(new Runnable() { - @Override - public void run() { - String subClusterId = scId.getId(); - - // Create a config loaded with federation on and subclusterId - // for each UAM - YarnConfiguration config = new YarnConfiguration(getConf()); - FederationProxyProviderUtil.updateConfForFederation(config, - subClusterId); - - RegisterApplicationMasterResponse uamResponse = null; - Token token = null; - try { - ApplicationId applicationId = attemptId.getApplicationId(); - ApplicationSubmissionContext originalSubmissionContext = - federationFacade.getApplicationSubmissionContext(applicationId); - - // For appNameSuffix, use subClusterId of the home sub-cluster - token = uamPool.launchUAM(subClusterId, config, - applicationId, amRegistrationResponse.getQueue(), - getApplicationContext().getUser(), homeSubClusterId.toString(), - true, subClusterId, originalSubmissionContext); - - secondaryRelayers.put(subClusterId, - uamPool.getAMRMClientRelayer(subClusterId)); - - uamResponse = uamPool.registerApplicationMaster(subClusterId, - amRegistrationRequest); - } catch (Throwable e) { - LOG.error("Failed to register application master: " + subClusterId - + " Application: " + attemptId, e); - // TODO: UAM registration for this sub-cluster RM - // failed. For now, we ignore the resource requests and continue - // but we need to fix this and handle this situation. One way would - // be to send the request to another RM by consulting the policy. - return; - } - uamRegistrations.put(scId, uamResponse); - LOG.info("Successfully registered unmanaged application master: " - + subClusterId + " ApplicationId: " + attemptId); - try { - uamPool.allocateAsync(subClusterId, requests.get(scId), - new HeartbeatCallBack(scId, true)); - } catch (Throwable e) { - LOG.error("Failed to allocate async to " + subClusterId - + " Application: " + attemptId, e); - } + Future future = this.threadpool.submit(() -> { - // Save the UAM token in registry or NMSS - try { - if (registryClient != null) { - registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(), - subClusterId, token); - } else if (getNMStateStore() != null) { - getNMStateStore().storeAMRMProxyAppContextEntry(attemptId, - NMSS_SECONDARY_SC_PREFIX + subClusterId, - token.encodeToUrlString().getBytes(STRING_TO_BYTE_FORMAT)); - } - } catch (Throwable e) { - LOG.error("Failed to persist UAM token from " + subClusterId - + " Application: " + attemptId, e); + String subClusterId = scId.getId(); + + // Create a config loaded with federation on and subclusterId + // for each UAM + YarnConfiguration config = new YarnConfiguration(getConf()); + FederationProxyProviderUtil.updateConfForFederation(config, subClusterId); + ApplicationId applicationId = attemptId.getApplicationId(); + + RegisterApplicationMasterResponse uamResponse; + Token token; + + // LaunchUAM And RegisterApplicationMaster + try { + TokenAndRegisterResponse result = + ((FederationActionRetry) (retryCount) -> + launchUAMAndRegisterApplicationMaster(config, subClusterId, applicationId)). + runWithRetries(registerUamRetryNum, registerUamRetryInterval); + + token = result.getToken(); + uamResponse = result.getResponse(); + } catch (Throwable e) { + LOG.error("Failed to register application master: {} Application: {}.", + subClusterId, attemptId, e); + return; + } + + uamRegistrations.put(scId, uamResponse); + + LOG.info("Successfully registered unmanaged application master: {} " + + "ApplicationId: {}.", subClusterId, attemptId); + + // Allocate Request + try { + uamPool.allocateAsync(subClusterId, requests.get(scId), + new HeartbeatCallBack(scId, true)); + } catch (Throwable e) { + LOG.error("Failed to allocate async to {} Application: {}.", + subClusterId, attemptId, e); + } + + // Save the UAM token in registry or NMSS + try { + if (registryClient != null) { + registryClient.writeAMRMTokenForUAM(applicationId, subClusterId, token); + } else if (getNMStateStore() != null) { + getNMStateStore().storeAMRMProxyAppContextEntry(attemptId, + NMSS_SECONDARY_SC_PREFIX + subClusterId, + token.encodeToUrlString().getBytes(STRING_TO_BYTE_FORMAT)); } + } catch (Throwable e) { + LOG.error("Failed to persist UAM token from {} Application {}", + subClusterId, attemptId, e); } }); + this.uamRegisterFutures.put(scId, future); } @@ -1347,10 +1363,34 @@ public void run() { } } - return newSubClusters; } + protected TokenAndRegisterResponse launchUAMAndRegisterApplicationMaster( + YarnConfiguration config, String subClusterId, ApplicationId applicationId) + throws IOException, YarnException { + + // Prepare parameter information + ApplicationSubmissionContext originalSubmissionContext = + federationFacade.getApplicationSubmissionContext(applicationId); + String submitter = getApplicationContext().getUser(); + String homeRM = homeSubClusterId.toString(); + String queue = amRegistrationResponse.getQueue(); + + // For appNameSuffix, use subClusterId of the home sub-cluster + Token token = uamPool.launchUAM(subClusterId, config, applicationId, + queue, submitter, homeRM, true, subClusterId, originalSubmissionContext); + + // Set the relationship between SubCluster and AMRMClientRelayer. + secondaryRelayers.put(subClusterId, uamPool.getAMRMClientRelayer(subClusterId)); + + // RegisterApplicationMaster + RegisterApplicationMasterResponse uamResponse = + uamPool.registerApplicationMaster(subClusterId, amRegistrationRequest); + + return new TokenAndRegisterResponse(token, uamResponse); + } + /** * Prepare the base allocation response. Use lastSCResponse and * lastHeartbeatTimeStamp to assemble entries about cluster-wide info, e.g. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TokenAndRegisterResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TokenAndRegisterResponse.java new file mode 100644 index 00000000000000..d67ecab9a99122 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TokenAndRegisterResponse.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; + +/** + * This class contains information about the AMRM token and the RegisterApplicationMasterResponse. + */ +public class TokenAndRegisterResponse { + private Token token; + private RegisterApplicationMasterResponse response; + + public TokenAndRegisterResponse(Token pToken, + RegisterApplicationMasterResponse pResponse) { + this.token = pToken; + this.response = pResponse; + } + + public Token getToken() { + return token; + } + + public RegisterApplicationMasterResponse getResponse() { + return response; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index 9e875437234a16..9e3e73f7f99956 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -38,7 +38,6 @@ import org.apache.hadoop.registry.client.api.RegistryOperations; import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -179,9 +178,8 @@ protected YarnConfiguration createConfiguration() { conf.setLong(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT, 500); - // Wait UAM Register Down - conf.setBoolean(YarnConfiguration.AMRM_PROXY_WAIT_UAM_REGISTER_DONE, true); - + // Register UAM Retry Interval 1ms + conf.setLong(YarnConfiguration.FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL, 1); return conf; } @@ -597,10 +595,6 @@ public Object run() throws Exception { interceptor.recover(recoveredDataMap); Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); - - // Waiting for SC-1 to time out. - GenericTestUtils.waitFor(() -> interceptor.getTimedOutSCs(true).size() == 1, 100, 1000); - // SC1 should be initialized to be timed out Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size()); @@ -859,7 +853,7 @@ public Object run() throws Exception { List containers = getContainersAndAssert(numberOfContainers, numberOfContainers * 2); for (Container c : containers) { - LOG.info("Allocated container {}", c.getId()); + LOG.info("Allocated container " + c.getId()); } Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); @@ -893,10 +887,6 @@ public Object run() throws Exception { int numberOfContainers = 3; // Should re-attach secondaries and get the three running containers Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); - - // Waiting for SC-1 to time out. - GenericTestUtils.waitFor(() -> interceptor.getTimedOutSCs(true).size() == 1, 100, 1000); - // SC1 should be initialized to be timed out Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size()); Assert.assertEquals(numberOfContainers, @@ -1444,4 +1434,53 @@ private void finishApplication() throws IOException, YarnException { Assert.assertNotNull(finishResponse); Assert.assertTrue(finishResponse.getIsUnregistered()); } + + @Test + public void testLaunchUAMAndRegisterApplicationMasterRetry() throws Exception { + + UserGroupInformation ugi = interceptor.getUGIWithToken(interceptor.getAttemptId()); + interceptor.setRetryCount(2); + + ugi.doAs((PrivilegedExceptionAction) () -> { + // Register the application + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(0); + registerReq.setTrackingUrl(""); + + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + lastResponseId = 0; + + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + + // Allocate the first batch of containers, with sc1 active + registerSubCluster(SubClusterId.newInstance("SC-1")); + + int numberOfContainers = 3; + List containers = getContainersAndAssert(numberOfContainers, numberOfContainers); + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + + // Release all containers + releaseContainersAndAssert(containers); + + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + + FinishApplicationMasterResponse finishResponse = + interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finishResponse); + Assert.assertTrue(finishResponse.getIsUnregistered()); + + return null; + }); + + Assert.assertEquals(0, interceptor.getRetryCount()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java index a15587442836f3..070131ef39aacb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java @@ -55,6 +55,7 @@ public class TestableFederationInterceptor extends FederationInterceptor { private MockResourceManagerFacade mockRm; private boolean isClientRPC = false; + private int retryCount = 0; public TestableFederationInterceptor() { } @@ -258,6 +259,24 @@ protected T createRMProxy(Class protocol, Configuration config, } } + @Override + protected TokenAndRegisterResponse launchUAMAndRegisterApplicationMaster(YarnConfiguration config, + String subClusterId, ApplicationId applicationId) throws IOException, YarnException { + if (retryCount > 0) { + retryCount--; + throw new YarnException("launchUAMAndRegisterApplicationMaster will retry"); + } + return super.launchUAMAndRegisterApplicationMaster(config, subClusterId, applicationId); + } + + public void setRetryCount(int retryCount) { + this.retryCount = retryCount; + } + + public int getRetryCount() { + return retryCount; + } + /** * Wrap the handler thread, so it calls from the same user. */ From 680af873777f7fce0224781b9ecc547f8f8ded77 Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Wed, 12 Jul 2023 09:47:59 +0800 Subject: [PATCH 8/8] YARN-11515. [Federation] Improve DefaultRequestInterceptor#init Code. (#5752) --- .../DefaultClientRequestInterceptor.java | 28 +++++++------- .../DefaultRMAdminRequestInterceptor.java | 37 ++++--------------- .../webapp/DefaultRequestInterceptorREST.java | 1 + .../MockDefaultRequestInterceptorREST.java | 20 +++++----- 4 files changed, 34 insertions(+), 52 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java index 8defc73b3ea278..e7cc024b64001e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java @@ -98,6 +98,8 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.classification.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Extends the {@code AbstractRequestInterceptorClient} class and provides an @@ -107,25 +109,26 @@ */ public class DefaultClientRequestInterceptor extends AbstractClientRequestInterceptor { + private static final Logger LOG = + LoggerFactory.getLogger(DefaultClientRequestInterceptor.class); private ApplicationClientProtocol clientRMProxy; @Override public void init(String userName) { super.init(userName); - - final Configuration conf = this.getConf(); try { - clientRMProxy = - user.doAs(new PrivilegedExceptionAction() { - @Override - public ApplicationClientProtocol run() throws Exception { - return ClientRMProxy.createRMProxy(conf, - ApplicationClientProtocol.class); - } - }); + final Configuration conf = this.getConf(); + clientRMProxy = user.doAs( + (PrivilegedExceptionAction) () -> + ClientRMProxy.createRMProxy(conf, ApplicationClientProtocol.class)); } catch (Exception e) { - throw new YarnRuntimeException( - "Unable to create the interface to reach the YarnRM", e); + StringBuilder message = new StringBuilder(); + message.append("Error while creating Router RMClient Service"); + if (user != null) { + message.append(", user: " + user); + } + LOG.error(message.toString(), e); + throw new YarnRuntimeException(message.toString(), e); } } @@ -355,6 +358,5 @@ public GetNodesToAttributesResponse getNodesToAttributes( @VisibleForTesting public void setRMClient(ApplicationClientProtocol clientRM) { this.clientRMProxy = clientRM; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java index 8388af66d288ed..df5b4d5835df9c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java @@ -80,38 +80,18 @@ public class DefaultRMAdminRequestInterceptor public void init(String userName) { super.init(userName); try { - // Do not create a proxy user if user name matches the user name on - // current UGI - if (UserGroupInformation.isSecurityEnabled()) { - user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser()); - } else if (userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName())) { - user = UserGroupInformation.getCurrentUser(); - } else { - user = UserGroupInformation.createProxyUser(userName, - UserGroupInformation.getCurrentUser()); - } - final Configuration conf = this.getConf(); - rmAdminProxy = user.doAs( - new PrivilegedExceptionAction() { - @Override - public ResourceManagerAdministrationProtocol run() - throws Exception { - return ClientRMProxy.createRMProxy(conf, - ResourceManagerAdministrationProtocol.class); - } - }); - } catch (IOException e) { - String message = "Error while creating Router RMAdmin Service for user:"; + (PrivilegedExceptionAction) () -> + ClientRMProxy.createRMProxy(conf, ResourceManagerAdministrationProtocol.class)); + } catch (Exception e) { + StringBuilder message = new StringBuilder(); + message.append("Error while creating Router RMAdmin Service"); if (user != null) { - message += ", user: " + user; + message.append(", user: " + user); } - - LOG.info(message); - throw new YarnRuntimeException(message, e); - } catch (Exception e) { - throw new YarnRuntimeException(e); + LOG.error(message.toString(), e); + throw new YarnRuntimeException(message.toString(), e); } } @@ -126,7 +106,6 @@ public void setNextInterceptor(RMAdminRequestInterceptor next) { @VisibleForTesting public void setRMAdmin(ResourceManagerAdministrationProtocol rmAdmin) { this.rmAdminProxy = rmAdmin; - } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java index 918865da4675e7..9d3e3be6f6e53d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java @@ -103,6 +103,7 @@ protected SubClusterId getSubClusterId() { @Override public void init(String user) { + super.init(user); webAppAddress = WebAppUtils.getRMWebAppURLWithScheme(getConf()); client = RouterWebServiceUtil.createJerseyClient(getConf()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java index d4e1b5145cf0dc..f162ab6be65935 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java @@ -211,7 +211,7 @@ public Response createNewApplication(HttpServletRequest hsr) validateRunning(); ApplicationId applicationId = - ApplicationId.newInstance(Integer.valueOf(getSubClusterId().getId()), + ApplicationId.newInstance(Integer.parseInt(getSubClusterId().getId()), applicationCounter.incrementAndGet()); NewApplication appId = new NewApplication(applicationId.toString(), new ResourceInfo()); @@ -275,7 +275,7 @@ public AppsInfo getApps(HttpServletRequest hsr, String stateQuery, AppInfo appInfo = new AppInfo(); appInfo.setAppId( - ApplicationId.newInstance(Integer.valueOf(getSubClusterId().getId()), + ApplicationId.newInstance(Integer.parseInt(getSubClusterId().getId()), applicationCounter.incrementAndGet()).toString()); appInfo.setAMHostHttpAddress("http://i_am_the_AM:1234"); @@ -316,7 +316,7 @@ public NodeInfo getNode(String nodeId) { if (nodeId.contains(subClusterId) || nodeId.contains("test")) { node = new NodeInfo(); node.setId(nodeId); - node.setLastHealthUpdate(Integer.valueOf(getSubClusterId().getId())); + node.setLastHealthUpdate(Integer.parseInt(getSubClusterId().getId())); } return node; } @@ -328,7 +328,7 @@ public NodesInfo getNodes(String states) { } NodeInfo node = new NodeInfo(); node.setId("Node " + Integer.valueOf(getSubClusterId().getId())); - node.setLastHealthUpdate(Integer.valueOf(getSubClusterId().getId())); + node.setLastHealthUpdate(Integer.parseInt(getSubClusterId().getId())); NodesInfo nodes = new NodesInfo(); nodes.add(node); return nodes; @@ -350,12 +350,12 @@ public ClusterMetricsInfo getClusterMetricsInfo() { throw new RuntimeException("RM is stopped"); } ClusterMetricsInfo metrics = new ClusterMetricsInfo(); - metrics.setAppsSubmitted(Integer.valueOf(getSubClusterId().getId())); - metrics.setAppsCompleted(Integer.valueOf(getSubClusterId().getId())); - metrics.setAppsPending(Integer.valueOf(getSubClusterId().getId())); - metrics.setAppsRunning(Integer.valueOf(getSubClusterId().getId())); - metrics.setAppsFailed(Integer.valueOf(getSubClusterId().getId())); - metrics.setAppsKilled(Integer.valueOf(getSubClusterId().getId())); + metrics.setAppsSubmitted(Integer.parseInt(getSubClusterId().getId())); + metrics.setAppsCompleted(Integer.parseInt(getSubClusterId().getId())); + metrics.setAppsPending(Integer.parseInt(getSubClusterId().getId())); + metrics.setAppsRunning(Integer.parseInt(getSubClusterId().getId())); + metrics.setAppsFailed(Integer.parseInt(getSubClusterId().getId())); + metrics.setAppsKilled(Integer.parseInt(getSubClusterId().getId())); return metrics; }