From 660530205e6134a938b4a18de35b9d45ec25f798 Mon Sep 17 00:00:00 2001 From: Takanobu Asanuma Date: Mon, 7 Nov 2022 10:49:48 +0900 Subject: [PATCH 1/7] HDFS-16833. NameNode should log internal EC blocks instead of the EC block group when it receives block reports. (#5106) Reviewed-by: Tao Li --- .../hdfs/server/blockmanagement/BlockManager.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index dfe48f7bde1f4..ded489308326f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -3616,7 +3616,7 @@ private Block addStoredBlock(final BlockInfo block, if (storedBlock == null || storedBlock.isDeleted()) { // If this block does not belong to anyfile, then we are done. blockLog.debug("BLOCK* addStoredBlock: {} on {} size {} but it does not belong to any file", - block, node, block.getNumBytes()); + reportedBlock, node, reportedBlock.getNumBytes()); // we could add this block to invalidate set of this datanode. // it will happen in next block report otherwise. return block; @@ -3631,12 +3631,12 @@ private Block addStoredBlock(final BlockInfo block, (node.isDecommissioned() || node.isDecommissionInProgress()) ? 0 : 1; if (logEveryBlock) { blockLog.info("BLOCK* addStoredBlock: {} is added to {} (size={})", - node, storedBlock, storedBlock.getNumBytes()); + node, reportedBlock, reportedBlock.getNumBytes()); } } else if (result == AddBlockResult.REPLACED) { curReplicaDelta = 0; blockLog.warn("BLOCK* addStoredBlock: block {} moved to storageType " + - "{} on node {}", storedBlock, storageInfo.getStorageType(), node); + "{} on node {}", reportedBlock, storageInfo.getStorageType(), node); } else { // if the same block is added again and the replica was corrupt // previously because of a wrong gen stamp, remove it from the @@ -3646,8 +3646,8 @@ private Block addStoredBlock(final BlockInfo block, curReplicaDelta = 0; if (blockLog.isDebugEnabled()) { blockLog.debug("BLOCK* addStoredBlock: Redundant addStoredBlock request" - + " received for {} on node {} size {}", storedBlock, node, - storedBlock.getNumBytes()); + + " received for {} on node {} size {}", reportedBlock, node, + reportedBlock.getNumBytes()); } } From 44b8bb7224f296c5202b3a8cbad937875b01a270 Mon Sep 17 00:00:00 2001 From: Simbarashe Dzinamarira Date: Mon, 7 Nov 2022 08:43:04 -0800 Subject: [PATCH 2/7] HDFS-16821: Fixes regression in HDFS-13522 that enables observer reads by default (#5078) --- .../hadoop/hdfs/NameNodeProxiesClient.java | 3 - .../federation/MiniRouterDFSCluster.java | 15 +++ .../router/TestObserverWithRouter.java | 107 ++++++++++-------- 3 files changed, 77 insertions(+), 48 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java index 4acec82824238..aa9577330cfae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java @@ -349,9 +349,6 @@ public static ClientProtocol createProxyWithAlignmentContext( boolean withRetries, AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext) throws IOException { - if (alignmentContext == null) { - alignmentContext = new ClientGSIContext(); - } RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine2.class); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java index 4fcdf6595e4ad..bdf4697d2aa92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java @@ -91,6 +91,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; +import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.net.NetUtils; @@ -233,6 +234,20 @@ public FileSystem getFileSystem() throws IOException { return DistributedFileSystem.get(conf); } + public FileSystem getFileSystemWithObserverReadsEnabled() throws IOException { + Configuration observerReadConf = new Configuration(conf); + observerReadConf.set(DFS_NAMESERVICES, + observerReadConf.get(DFS_NAMESERVICES)+ ",router-service"); + observerReadConf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".router-service", "router1"); + observerReadConf.set(DFS_NAMENODE_RPC_ADDRESS_KEY+ ".router-service.router1", + getFileSystemURI().toString()); + observerReadConf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + + "." + "router-service", ObserverReadProxyProvider.class.getName()); + DistributedFileSystem.setDefaultUri(observerReadConf, "hdfs://router-service"); + + return DistributedFileSystem.get(observerReadConf); + } + public DFSClient getClient(UserGroupInformation user) throws IOException, URISyntaxException, InterruptedException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java index fbd731c073f4b..23095186d0133 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertThrows; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; @@ -41,15 +42,40 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.junit.After; -import org.junit.Test; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.TestInfo; -public class TestObserverWithRouter { +public class TestObserverWithRouter { + private static final String SKIP_BEFORE_EACH_CLUSTER_STARTUP = "SkipBeforeEachClusterStartup"; private MiniRouterDFSCluster cluster; + private RouterContext routerContext; + private FileSystem fileSystem; - public void startUpCluster(int numberOfObserver) throws Exception { - startUpCluster(numberOfObserver, null); + @BeforeEach + void init(TestInfo info) throws Exception { + if (info.getTags().contains(SKIP_BEFORE_EACH_CLUSTER_STARTUP)) { + return; + } + startUpCluster(2, null); + } + + @AfterEach + public void teardown() throws IOException { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + + routerContext = null; + + if (fileSystem != null) { + fileSystem.close(); + fileSystem = null; + } } public void startUpCluster(int numberOfObserver, Configuration confOverrides) throws Exception { @@ -95,31 +121,39 @@ public void startUpCluster(int numberOfObserver, Configuration confOverrides) th cluster.installMockLocations(); cluster.waitActiveNamespaces(); + routerContext = cluster.getRandomRouter(); + fileSystem = routerContext.getFileSystemWithObserverReadsEnabled(); } - @After - public void teardown() throws IOException { - if (cluster != null) { - cluster.shutdown(); - cluster = null; - } + @Test + public void testObserverRead() throws Exception { + internalTestObserverRead(); } + /** + * Tests that without adding config to use ObserverProxyProvider, the client shouldn't + * have reads served by Observers. + * Fixes regression in HDFS-13522. + */ @Test - public void testObserverRead() throws Exception { - startUpCluster(1); - RouterContext routerContext = cluster.getRandomRouter(); + public void testReadWithoutObserverClientConfigurations() throws Exception { + fileSystem.close(); + fileSystem = routerContext.getFileSystem(); + assertThrows(AssertionError.class, this::internalTestObserverRead); + } + + public void internalTestObserverRead() + throws Exception { List namenodes = routerContext .getRouter().getNamenodeResolver() .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); assertEquals("First namenode should be observer", namenodes.get(0).getState(), FederationNamenodeServiceState.OBSERVER); - FileSystem fileSystem = routerContext.getFileSystem(); Path path = new Path("/testFile"); - // Send Create call to active + // Send create call fileSystem.create(path).close(); - // Send read request to observer + // Send read request fileSystem.open(path).close(); long rpcCountForActive = routerContext.getRouter().getRpcServer() @@ -131,21 +165,19 @@ public void testObserverRead() throws Exception { .getRPCMetrics().getObserverProxyOps(); // getBlockLocations should be sent to observer assertEquals("One call should be sent to observer", 1, rpcCountForObserver); - fileSystem.close(); } @Test + @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) public void testObserverReadWithoutFederatedStatePropagation() throws Exception { Configuration confOverrides = new Configuration(false); confOverrides.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, 0); - startUpCluster(1, confOverrides); - RouterContext routerContext = cluster.getRandomRouter(); + startUpCluster(2, confOverrides); List namenodes = routerContext .getRouter().getNamenodeResolver() .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); assertEquals("First namenode should be observer", namenodes.get(0).getState(), FederationNamenodeServiceState.OBSERVER); - FileSystem fileSystem = routerContext.getFileSystem(); Path path = new Path("/testFile"); // Send Create call to active fileSystem.create(path).close(); @@ -161,22 +193,19 @@ public void testObserverReadWithoutFederatedStatePropagation() throws Exception long rpcCountForObserver = routerContext.getRouter().getRpcServer() .getRPCMetrics().getObserverProxyOps(); assertEquals("No call should be sent to observer", 0, rpcCountForObserver); - fileSystem.close(); } @Test + @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) public void testDisablingObserverReadUsingNameserviceOverride() throws Exception { // Disable observer reads using per-nameservice override Configuration confOverrides = new Configuration(false); confOverrides.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns0"); - startUpCluster(1, confOverrides); + startUpCluster(2, confOverrides); - RouterContext routerContext = cluster.getRandomRouter(); - FileSystem fileSystem = routerContext.getFileSystem(); Path path = new Path("/testFile"); fileSystem.create(path).close(); fileSystem.open(path).close(); - fileSystem.close(); long rpcCountForActive = routerContext.getRouter().getRpcServer() .getRPCMetrics().getActiveProxyOps(); @@ -190,17 +219,15 @@ public void testDisablingObserverReadUsingNameserviceOverride() throws Exception @Test public void testReadWhenObserverIsDown() throws Exception { - startUpCluster(1); - RouterContext routerContext = cluster.getRandomRouter(); - FileSystem fileSystem = routerContext.getFileSystem(); Path path = new Path("/testFile1"); // Send Create call to active fileSystem.create(path).close(); // Stop observer NN int nnIndex = stopObserver(1); - assertNotEquals("No observer found", 3, nnIndex); + nnIndex = stopObserver(1); + assertNotEquals("No observer found", 4, nnIndex); // Send read request fileSystem.open(path).close(); @@ -215,14 +242,10 @@ public void testReadWhenObserverIsDown() throws Exception { .getRPCMetrics().getObserverProxyOps(); assertEquals("No call should send to observer", 0, rpcCountForObserver); - fileSystem.close(); } @Test public void testMultipleObserver() throws Exception { - startUpCluster(2); - RouterContext routerContext = cluster.getRandomRouter(); - FileSystem fileSystem = routerContext.getFileSystem(); Path path = new Path("/testFile1"); // Send Create call to active fileSystem.create(path).close(); @@ -267,7 +290,6 @@ public void testMultipleObserver() throws Exception { .getRpcServer().getRPCMetrics().getObserverProxyOps(); assertEquals("No call should send to observer", expectedObserverRpc, rpcCountForObserver); - fileSystem.close(); } private int stopObserver(int num) { @@ -288,9 +310,9 @@ private int stopObserver(int num) { // test router observer with multiple to know which observer NN received // requests @Test + @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) public void testMultipleObserverRouter() throws Exception { StateStoreDFSCluster innerCluster; - RouterContext routerContext; MembershipNamenodeResolver resolver; String ns0; @@ -356,14 +378,12 @@ public void testMultipleObserverRouter() throws Exception { namespaceInfo0.get(1).getNamenodeId()); assertEquals(namespaceInfo1.get(0).getState(), FederationNamenodeServiceState.OBSERVER); + + innerCluster.shutdown(); } @Test public void testUnavailableObserverNN() throws Exception { - startUpCluster(2); - RouterContext routerContext = cluster.getRandomRouter(); - FileSystem fileSystem = routerContext.getFileSystem(); - stopObserver(2); Path path = new Path("/testFile"); @@ -397,12 +417,10 @@ public void testUnavailableObserverNN() throws Exception { assertTrue("There must be unavailable namenodes", hasUnavailable); } + + @Test public void testRouterMsync() throws Exception { - startUpCluster(1); - RouterContext routerContext = cluster.getRandomRouter(); - - FileSystem fileSystem = routerContext.getFileSystem(); Path path = new Path("/testFile"); // Send Create call to active @@ -420,6 +438,5 @@ public void testRouterMsync() throws Exception { // 2 msync calls should be sent. One to each active namenode in the two namespaces. assertEquals("Four calls should be sent to active", 4, rpcCountForActive); - fileSystem.close(); } } \ No newline at end of file From 845cf8bc2868bd2c990b3b077caa24a0b7971211 Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Tue, 8 Nov 2022 02:13:23 +0800 Subject: [PATCH 3/7] YARN-11368. [Federation] Improve Yarn Router's Federation Page style. (#5105) --- .../webapps/static/federation/federation.js | 13 +++--- .../server/router/webapp/FederationBlock.java | 40 ++++++++++++++----- .../router/webapp/TestFederationWebApp.java | 13 ++++++ 3 files changed, 50 insertions(+), 16 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/webapps/static/federation/federation.js b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/webapps/static/federation/federation.js index b3b1e9d392c85..ac8e3efe11e9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/webapps/static/federation/federation.js +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/webapps/static/federation/federation.js @@ -28,7 +28,7 @@ $(document).ready(function() { var capabilityArr = scTableData.filter(item => (item.subcluster === row.id())); var capabilityObj = JSON.parse(capabilityArr[0].capability).clusterMetrics; row.child( - '' + + '
' + ' ' + '
' + '

Application Metrics

' + @@ -42,11 +42,12 @@ $(document).ready(function() { '
' + '

Resource Metrics

' + '

Memory

' + - ' TotalMB : ' + capabilityObj.totalMB + '

' + - ' ReservedMB : ' + capabilityObj.reservedMB + '

' + - ' AvailableMB : ' + capabilityObj.availableMB + '

' + - ' AllocatedMB : ' + capabilityObj.allocatedMB + '

' + - ' PendingMB : ' + capabilityObj.pendingMB + '

' + + ' Total Memory : ' + capabilityArr[0].totalmemory + '

' + + ' Reserved Memory : ' + capabilityArr[0].reservedmemory + '

' + + ' Available Memory : ' + capabilityArr[0].availablememory + '

' + + ' Allocated Memory : ' + capabilityArr[0].allocatedmemory + '

' + + ' Pending Memory : ' + capabilityArr[0].pendingmemory + '

' + + '
' + '

VirtualCores

' + ' TotalVirtualCores : ' + capabilityObj.totalVirtualCores + '

' + ' ReservedVirtualCores : ' + capabilityObj.reservedVirtualCores + '

' + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationBlock.java index f80442714f352..44f8d7407bf20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationBlock.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationBlock.java @@ -23,12 +23,13 @@ import java.util.List; import java.util.Map; import java.util.HashMap; +import java.util.Date; import com.google.gson.Gson; import org.apache.hadoop.util.StringUtils; -import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; import org.apache.hadoop.yarn.server.router.Router; import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet; @@ -149,32 +150,51 @@ private void initHtmlPageFederation(Block html, boolean isEnabled) { ClusterMetricsInfo subClusterInfo = getClusterMetricsInfo(capability); // Prepare LastStartTime & LastHeartBeat - String lastStartTime = - DateFormatUtils.format(subcluster.getLastStartTime(), DATE_PATTERN); - String lastHeartBeat = - DateFormatUtils.format(subcluster.getLastHeartBeat(), DATE_PATTERN); + Date lastStartTime = new Date(subcluster.getLastStartTime()); + Date lastHeartBeat = new Date(subcluster.getLastHeartBeat()); // Prepare Resource long totalMB = subClusterInfo.getTotalMB(); String totalMBDesc = StringUtils.byteDesc(totalMB * BYTES_IN_MB); long totalVirtualCores = subClusterInfo.getTotalVirtualCores(); - String resources = String.format("", totalMBDesc, totalVirtualCores); + String resources = String.format("", totalMBDesc, totalVirtualCores); // Prepare Node long totalNodes = subClusterInfo.getTotalNodes(); long activeNodes = subClusterInfo.getActiveNodes(); - String nodes = String.format("", totalNodes, activeNodes); + String nodes = String.format("", totalNodes, activeNodes); // Prepare HTML Table + String stateStyle = "color:#dc3545;font-weight:bolder"; + SubClusterState state = subcluster.getState(); + if (SubClusterState.SC_RUNNING == state) { + stateStyle = "color:#28a745;font-weight:bolder"; + } + tbody.tr().$id(subClusterIdText) .td().$class("details-control").a(herfWebAppAddress, subClusterIdText).__() - .td(subcluster.getState().name()) - .td(lastStartTime) - .td(lastHeartBeat) + .td().$style(stateStyle).__(state.name()).__() + .td().__(lastStartTime).__() + .td().__(lastHeartBeat).__() .td(resources) .td(nodes) .__(); + // Formatted memory information + long allocatedMB = subClusterInfo.getAllocatedMB(); + String allocatedMBDesc = StringUtils.byteDesc(allocatedMB * BYTES_IN_MB); + long availableMB = subClusterInfo.getAvailableMB(); + String availableMBDesc = StringUtils.byteDesc(availableMB * BYTES_IN_MB); + long pendingMB = subClusterInfo.getPendingMB(); + String pendingMBDesc = StringUtils.byteDesc(pendingMB * BYTES_IN_MB); + long reservedMB = subClusterInfo.getReservedMB(); + String reservedMBDesc = StringUtils.byteDesc(reservedMB * BYTES_IN_MB); + + subclusterMap.put("totalmemory", totalMBDesc); + subclusterMap.put("allocatedmemory", allocatedMBDesc); + subclusterMap.put("availablememory", availableMBDesc); + subclusterMap.put("pendingmemory", pendingMBDesc); + subclusterMap.put("reservedmemory", reservedMBDesc); subclusterMap.put("subcluster", subClusterId.getId()); subclusterMap.put("capability", capability); lists.add(subclusterMap); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationWebApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationWebApp.java index 4ec482c615a58..f1501fe1e7a24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationWebApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationWebApp.java @@ -23,14 +23,20 @@ import org.apache.hadoop.yarn.server.router.Router; import org.apache.hadoop.yarn.webapp.test.WebAppTests; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; public class TestFederationWebApp extends TestRouterWebServicesREST { + private static final Logger LOG = + LoggerFactory.getLogger(TestFederationWebApp.class); + @Test public void testFederationWebViewNotEnable() throws InterruptedException, YarnException, IOException { + LOG.info("testFederationWebView - NotEnable Federation."); // Test Federation is not Enabled Configuration config = new YarnConfiguration(); config.setBoolean(YarnConfiguration.FEDERATION_ENABLED, false); @@ -40,6 +46,7 @@ public void testFederationWebViewNotEnable() @Test public void testFederationWebViewEnable() throws InterruptedException, YarnException, IOException { + LOG.info("testFederationWebView - Enable Federation."); // Test Federation Enabled Configuration config = new YarnConfiguration(); config.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); @@ -49,6 +56,7 @@ public void testFederationWebViewEnable() @Test public void testFederationAboutViewEnable() throws InterruptedException, YarnException, IOException { + LOG.info("testFederationAboutViewEnable - Enable Federation."); // Test Federation Enabled Configuration config = new YarnConfiguration(); config.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); @@ -58,6 +66,7 @@ public void testFederationAboutViewEnable() @Test public void testFederationAboutViewNotEnable() throws InterruptedException, YarnException, IOException { + LOG.info("testFederationAboutViewNotEnable - NotEnable Federation."); // Test Federation Not Enabled Configuration config = new YarnConfiguration(); config.setBoolean(YarnConfiguration.FEDERATION_ENABLED, false); @@ -67,6 +76,7 @@ public void testFederationAboutViewNotEnable() @Test public void testFederationNodeViewEnable() throws InterruptedException, YarnException, IOException { + LOG.info("testFederationNodeViewEnable - Enable Federation."); // Test Federation Enabled Configuration config = new YarnConfiguration(); config.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); @@ -76,6 +86,7 @@ public void testFederationNodeViewEnable() @Test public void testFederationNodeViewNotEnable() throws InterruptedException, YarnException, IOException { + LOG.info("testFederationNodeViewNotEnable - NotEnable Federation."); // Test Federation Not Enabled Configuration config = new YarnConfiguration(); config.setBoolean(YarnConfiguration.FEDERATION_ENABLED, false); @@ -85,6 +96,7 @@ public void testFederationNodeViewNotEnable() @Test public void testFederationAppViewEnable() throws InterruptedException, YarnException, IOException { + LOG.info("testFederationAppViewEnable - Enable Federation."); // Test Federation Enabled Configuration config = new YarnConfiguration(); config.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); @@ -94,6 +106,7 @@ public void testFederationAppViewEnable() @Test public void testFederationAppViewNotEnable() throws InterruptedException, YarnException, IOException { + LOG.info("testFederationAppViewNotEnable - NotEnable Federation."); // Test Federation Not Enabled Configuration config = new YarnConfiguration(); config.setBoolean(YarnConfiguration.FEDERATION_ENABLED, false); From 7f9ca101e2ae057a42829883596085732f8d5fa6 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 8 Nov 2022 11:43:04 +0000 Subject: [PATCH 4/7] HADOOP-18517. ABFS: Add fs.azure.enable.readahead option to disable readahead (#5103) * HADOOP-18517. ABFS: Add fs.azure.enable.readahead option to disable readahead Adds new config option to turn off readahead * also allows it to be passed in through openFile(), * extends ITestAbfsReadWriteAndSeek to use the option, including one replicated test...that shows that turning it off is slower. Important: this does not address the critical data corruption issue HADOOP-18521. ABFS ReadBufferManager buffer sharing across concurrent HTTP requests What is does do is provide a way to completely bypass the ReadBufferManager. To mitigate the problem, either fs.azure.enable.readahead needs to be set to false, or set "fs.azure.readaheadqueue.depth" to 0 -this still goes near the (broken) ReadBufferManager code, but does't trigger the bug. For safe reading of files through the ABFS connector, readahead MUST be disabled or the followup fix to HADOOP-18521 applied Contributed by Steve Loughran --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 14 ++++++++ .../fs/azurebfs/AzureBlobFileSystemStore.java | 1 + .../azurebfs/constants/ConfigurationKeys.java | 7 ++++ .../constants/FileSystemConfigurations.java | 1 + .../fs/azurebfs/services/AbfsInputStream.java | 7 +++- .../services/AbfsInputStreamContext.java | 12 +++++++ .../azurebfs/ITestAbfsReadWriteAndSeek.java | 32 +++++++++++++------ .../fs/azurebfs/TestTracingContext.java | 4 +-- 8 files changed, 66 insertions(+), 12 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index fafc30372b4a5..ecfeb41a34697 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -302,6 +302,11 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_ABFS_LATENCY_TRACK) private boolean trackLatency; + @BooleanConfigurationValidatorAnnotation( + ConfigurationKey = FS_AZURE_ENABLE_READAHEAD, + DefaultValue = DEFAULT_ENABLE_READAHEAD) + private boolean enabledReadAhead; + @LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS, MinValue = 0, DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS) @@ -915,6 +920,15 @@ public SASTokenProvider getSASTokenProvider() throws AzureBlobFileSystemExceptio } } + public boolean isReadAheadEnabled() { + return this.enabledReadAhead; + } + + @VisibleForTesting + void setReadAheadEnabled(final boolean enabledReadAhead) { + this.enabledReadAhead = enabledReadAhead; + } + public int getReadAheadRange() { return this.readAheadRange; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 11397e03e5c5b..e5e7056126564 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -808,6 +808,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext( .withReadBufferSize(abfsConfiguration.getReadBufferSize()) .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth()) .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends()) + .isReadAheadEnabled(abfsConfiguration.isReadAheadEnabled()) .withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely()) .withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead()) .withReadAheadRange(abfsConfiguration.getReadAheadRange()) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 9d3b2d5e82c6e..0353f3e01ffb1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -186,6 +186,13 @@ public final class ConfigurationKeys { public static final String FS_AZURE_SKIP_SUPER_USER_REPLACEMENT = "fs.azure.identity.transformer.skip.superuser.replacement"; public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER = "fs.azure.account.keyprovider"; public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script"; + + /** + * Enable or disable readahead buffer in AbfsInputStream. + * Value: {@value}. + */ + public static final String FS_AZURE_ENABLE_READAHEAD = "fs.azure.enable.readahead"; + /** Setting this true will make the driver use it's own RemoteIterator implementation */ public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator"; /** Server side encryption key */ diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 63d62a33b1819..42f3b7503e03d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -106,6 +106,7 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false; public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120; + public static final boolean DEFAULT_ENABLE_READAHEAD = true; public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING; public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 553ccdcbc0a43..8f12484a55c9d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -137,7 +137,7 @@ public AbfsInputStream( this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends(); this.eTag = eTag; this.readAheadRange = abfsInputStreamContext.getReadAheadRange(); - this.readAheadEnabled = true; + this.readAheadEnabled = abfsInputStreamContext.isReadAheadEnabled(); this.alwaysReadBufferSize = abfsInputStreamContext.shouldReadBufferSizeAlways(); this.bufferedPreadDisabled = abfsInputStreamContext @@ -745,6 +745,11 @@ byte[] getBuffer() { return buffer; } + @VisibleForTesting + public boolean isReadAheadEnabled() { + return readAheadEnabled; + } + @VisibleForTesting public int getReadAheadRange() { return readAheadRange; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index ae69cde6efac1..e258958b1a111 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -35,6 +35,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext { private boolean tolerateOobAppends; + private boolean isReadAheadEnabled = true; + private boolean alwaysReadBufferSize; private int readAheadBlockSize; @@ -72,6 +74,12 @@ public AbfsInputStreamContext withTolerateOobAppends( return this; } + public AbfsInputStreamContext isReadAheadEnabled( + final boolean isReadAheadEnabled) { + this.isReadAheadEnabled = isReadAheadEnabled; + return this; + } + public AbfsInputStreamContext withReadAheadRange( final int readAheadRange) { this.readAheadRange = readAheadRange; @@ -141,6 +149,10 @@ public boolean isTolerateOobAppends() { return tolerateOobAppends; } + public boolean isReadAheadEnabled() { + return isReadAheadEnabled; + } + public int getReadAheadRange() { return readAheadRange; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java index 5bd6eaff42e84..beada775ae87b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java @@ -32,7 +32,6 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator; -import org.apache.hadoop.fs.statistics.IOStatisticsLogging; import org.apache.hadoop.fs.statistics.IOStatisticsSource; import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO; @@ -40,6 +39,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel; /** * Test read, write and seek. @@ -50,18 +50,27 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest { private static final String TEST_PATH = "/testfile"; - @Parameterized.Parameters(name = "Size={0}") + /** + * Parameterize on read buffer size and readahead. + * For test performance, a full x*y test matrix is not used. + * @return the test parameters + */ + @Parameterized.Parameters(name = "Size={0}-readahead={1}") public static Iterable sizes() { - return Arrays.asList(new Object[][]{{MIN_BUFFER_SIZE}, - {DEFAULT_READ_BUFFER_SIZE}, - {APPENDBLOB_MAX_WRITE_BUFFER_SIZE}, - {MAX_BUFFER_SIZE}}); + return Arrays.asList(new Object[][]{{MIN_BUFFER_SIZE, true}, + {DEFAULT_READ_BUFFER_SIZE, false}, + {DEFAULT_READ_BUFFER_SIZE, true}, + {APPENDBLOB_MAX_WRITE_BUFFER_SIZE, false}, + {MAX_BUFFER_SIZE, true}}); } private final int size; + private final boolean readaheadEnabled; - public ITestAbfsReadWriteAndSeek(final int size) throws Exception { + public ITestAbfsReadWriteAndSeek(final int size, + final boolean readaheadEnabled) throws Exception { this.size = size; + this.readaheadEnabled = readaheadEnabled; } @Test @@ -74,6 +83,7 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception { final AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration(); abfsConfiguration.setWriteBufferSize(bufferSize); abfsConfiguration.setReadBufferSize(bufferSize); + abfsConfiguration.setReadAheadEnabled(readaheadEnabled); final byte[] b = new byte[2 * bufferSize]; new Random().nextBytes(b); @@ -85,7 +95,7 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception { } finally{ stream.close(); } - IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream); + logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream); final byte[] readBuffer = new byte[2 * bufferSize]; int result; @@ -109,7 +119,7 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception { inputStream.seek(0); result = inputStream.read(readBuffer, 0, bufferSize); } - IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource); + logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource); assertNotEquals("data read in final read()", -1, result); assertArrayEquals(readBuffer, b); @@ -121,6 +131,7 @@ public void testReadAheadRequestID() throws java.io.IOException { final AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration(); int bufferSize = MIN_BUFFER_SIZE; abfsConfiguration.setReadBufferSize(bufferSize); + abfsConfiguration.setReadAheadEnabled(readaheadEnabled); final byte[] b = new byte[bufferSize * 10]; new Random().nextBytes(b); @@ -132,8 +143,10 @@ public void testReadAheadRequestID() throws java.io.IOException { ((AbfsOutputStream) stream.getWrappedStream()) .getStreamID())); stream.write(b); + logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream); } + final byte[] readBuffer = new byte[4 * bufferSize]; int result; fs.registerListener( @@ -146,6 +159,7 @@ public void testReadAheadRequestID() throws java.io.IOException { ((AbfsInputStream) inputStream.getWrappedStream()) .getStreamID())); result = inputStream.read(readBuffer, 0, bufferSize*4); + logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, inputStream); } fs.registerListener(null); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java index cf1a89dd1eaba..b91a3e2208b57 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java @@ -130,10 +130,10 @@ public void runCorrelationTestForAllMethods() throws Exception { testClasses.put(new ITestAzureBlobFileSystemListStatus(), //liststatus ITestAzureBlobFileSystemListStatus.class.getMethod("testListPath")); - testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE), //open, + testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE, true), //open, // read, write ITestAbfsReadWriteAndSeek.class.getMethod("testReadAheadRequestID")); - testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE), //read (bypassreadahead) + testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE, false), //read (bypassreadahead) ITestAbfsReadWriteAndSeek.class .getMethod("testReadAndWriteWithDifferentBufferSizesAndSeek")); testClasses.put(new ITestAzureBlobFileSystemAppend(), //append From 7002e214b8861e007a0279a8fb5dfedad3ffe3c4 Mon Sep 17 00:00:00 2001 From: ted12138 <67770300+ted12138@users.noreply.github.com> Date: Wed, 9 Nov 2022 10:21:43 +0800 Subject: [PATCH 5/7] HADOOP-18502. MutableStat should return 0 when there is no change (#5058) --- .../hadoop/metrics2/lib/MutableStat.java | 10 ++++----- .../metrics2/lib/TestMutableMetrics.java | 21 +++++++++++++++++++ 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableStat.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableStat.java index f2e072545ad28..b130aa6ada398 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableStat.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableStat.java @@ -140,14 +140,14 @@ public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) { if (all || changed()) { numSamples += intervalStat.numSamples(); builder.addCounter(numInfo, numSamples) - .addGauge(avgInfo, lastStat().mean()); + .addGauge(avgInfo, intervalStat.mean()); if (extended) { - builder.addGauge(stdevInfo, lastStat().stddev()) - .addGauge(iMinInfo, lastStat().min()) - .addGauge(iMaxInfo, lastStat().max()) + builder.addGauge(stdevInfo, intervalStat.stddev()) + .addGauge(iMinInfo, intervalStat.min()) + .addGauge(iMaxInfo, intervalStat.max()) .addGauge(minInfo, minMax.min()) .addGauge(maxInfo, minMax.max()) - .addGauge(iNumInfo, lastStat().numSamples()); + .addGauge(iNumInfo, intervalStat.numSamples()); } if (changed()) { if (numSamples > 0) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java index 10c8057c69e04..0938aa92a90d1 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java @@ -290,6 +290,27 @@ private static void snapshotMutableRatesWithAggregation( } } + /** + * MutableStat should output 0 instead of the previous state when there is no change. + */ + @Test public void testMutableWithoutChanged() { + MetricsRecordBuilder builderWithChange = mockMetricsRecordBuilder(); + MetricsRecordBuilder builderWithoutChange = mockMetricsRecordBuilder(); + MetricsRegistry registry = new MetricsRegistry("test"); + MutableStat stat = registry.newStat("Test", "Test", "Ops", "Val", true); + stat.add(1000, 1000); + stat.add(1000, 2000); + registry.snapshot(builderWithChange, true); + + assertCounter("TestNumOps", 2000L, builderWithChange); + assertGauge("TestINumOps", 2000L, builderWithChange); + assertGauge("TestAvgVal", 1.5, builderWithChange); + + registry.snapshot(builderWithoutChange, true); + assertGauge("TestINumOps", 0L, builderWithoutChange); + assertGauge("TestAvgVal", 0.0, builderWithoutChange); + } + @Test public void testDuplicateMetrics() { MutableRatesWithAggregation rates = new MutableRatesWithAggregation(); From f68f1a45783d7eb3f982fadef800b58ab8b763f3 Mon Sep 17 00:00:00 2001 From: zhengchenyu Date: Wed, 9 Nov 2022 19:18:31 +0800 Subject: [PATCH 6/7] HADOOP-18433. Fix main thread name for . (#4838) --- .../src/main/java/org/apache/hadoop/ipc/Server.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 17366eb9569f1..e10e7bfd7c17a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -1393,8 +1393,7 @@ private class Listener extends Thread { bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig); //Could be an ephemeral port this.listenPort = acceptChannel.socket().getLocalPort(); - Thread.currentThread().setName("Listener at " + - bindAddress + "/" + this.listenPort); + LOG.info("Listener at {}:{}", bindAddress, this.listenPort); // create a selector; selector= Selector.open(); readers = new Reader[readThreads]; From b398a7b003153ad9952d445ea2e899c8fecb7aa5 Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Thu, 10 Nov 2022 02:25:10 +0800 Subject: [PATCH 7/7] YARN-11367. [Federation] Fix DefaultRequestInterceptorREST Client NPE. (#5100) --- .../AbstractRESTRequestInterceptor.java | 41 ++++++++++++++++++- .../webapp/DefaultRequestInterceptorREST.java | 6 +++ .../webapp/FederationInterceptorREST.java | 6 ++- .../webapp/TestFederationInterceptorREST.java | 19 +++++++++ 4 files changed, 69 insertions(+), 3 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/AbstractRESTRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/AbstractRESTRequestInterceptor.java index f1919c2e5a174..66b2495e2c9b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/AbstractRESTRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/AbstractRESTRequestInterceptor.java @@ -19,6 +19,10 @@ package org.apache.hadoop.yarn.server.router.webapp; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +import java.io.IOException; /** * Extends the RequestInterceptor class and provides common functionality which @@ -29,6 +33,7 @@ public abstract class AbstractRESTRequestInterceptor private Configuration conf; private RESTRequestInterceptor nextInterceptor; + private UserGroupInformation user = null; /** * Sets the {@link RESTRequestInterceptor} in the chain. @@ -62,9 +67,10 @@ public Configuration getConf() { * Initializes the {@link RESTRequestInterceptor}. */ @Override - public void init(String user) { + public void init(String userName) { + setupUser(userName); if (this.nextInterceptor != null) { - this.nextInterceptor.init(user); + this.nextInterceptor.init(userName); } } @@ -86,4 +92,35 @@ public RESTRequestInterceptor getNextInterceptor() { return this.nextInterceptor; } + /** + * Set User information. + * + * If the username is empty, we will use the Yarn Router user directly. + * Do not create a proxy user if user name matches the user name on current UGI. + * @param userName userName. + */ + private void setupUser(final String userName) { + try { + if (userName == null || userName.isEmpty()) { + user = UserGroupInformation.getCurrentUser(); + } else if (UserGroupInformation.isSecurityEnabled()) { + user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser()); + } else if (userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName())) { + user = UserGroupInformation.getCurrentUser(); + } else { + user = UserGroupInformation.createProxyUser(userName, + UserGroupInformation.getCurrentUser()); + } + } catch (IOException e) { + String message = "Error while creating Router RMAdmin Service for user:"; + if (user != null) { + message += ", user: " + user; + } + throw new YarnRuntimeException(message, e); + } + } + + public UserGroupInformation getUser() { + return user; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java index 9046fb8cc9c65..918865da4675e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java @@ -28,6 +28,7 @@ import javax.ws.rs.core.Response; import com.sun.jersey.api.client.Client; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -603,6 +604,11 @@ public Response signalToContainer(String containerId, String command, null, getConf(), client); } + @VisibleForTesting + public Client getClient() { + return client; + } + @Override public NodeLabelsInfo getRMNodeLabels(HttpServletRequest hsr) { return RouterWebServiceUtil.genericForward(webAppAddress, hsr, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index 2db7416524b89..dc3508e0dfaed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -152,6 +152,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { @Override public void init(String user) { + + super.init(user); + federationFacade = FederationStateStoreFacade.getInstance(); rand = new Random(); @@ -239,7 +242,8 @@ private DefaultRequestInterceptorREST createInterceptorForSubCluster( .isAssignableFrom(interceptorClass)) { interceptorInstance = (DefaultRequestInterceptorREST) ReflectionUtils .newInstance(interceptorClass, conf); - + String userName = getUser().getUserName(); + interceptorInstance.init(userName); } else { throw new YarnRuntimeException( "Class: " + interceptorClassName + " not instance of " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java index c769148d8e4b3..4c50e5198dc03 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java @@ -1257,4 +1257,23 @@ private HttpServletRequest mockHttpServletRequestByUserName(String username) { when(mockHsr.getUserPrincipal()).thenReturn(principal); return mockHsr; } + + @Test + public void testCheckFederationInterceptorRESTClient() { + SubClusterId subClusterId = SubClusterId.newInstance("SC-1"); + String webAppSocket = "SC-1:WebAddress"; + String webAppAddress = "http://" + webAppSocket; + + Configuration configuration = new Configuration(); + FederationInterceptorREST rest = new FederationInterceptorREST(); + rest.setConf(configuration); + rest.init("router"); + + DefaultRequestInterceptorREST interceptorREST = + rest.getOrCreateInterceptorForSubCluster(subClusterId, webAppSocket); + + Assert.assertNotNull(interceptorREST); + Assert.assertNotNull(interceptorREST.getClient()); + Assert.assertEquals(webAppAddress, interceptorREST.getWebAppAddress()); + } } \ No newline at end of file