Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into HADOOP-18359
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Nov 8, 2022
2 parents 87e9cf3 + 845cf8b commit 33e630b
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,6 @@ public static ClientProtocol createProxyWithAlignmentContext(
boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
AlignmentContext alignmentContext)
throws IOException {
if (alignmentContext == null) {
alignmentContext = new ClientGSIContext();
}
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
ProtobufRpcEngine2.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.net.NetUtils;
Expand Down Expand Up @@ -233,6 +234,20 @@ public FileSystem getFileSystem() throws IOException {
return DistributedFileSystem.get(conf);
}

public FileSystem getFileSystemWithObserverReadsEnabled() throws IOException {
Configuration observerReadConf = new Configuration(conf);
observerReadConf.set(DFS_NAMESERVICES,
observerReadConf.get(DFS_NAMESERVICES)+ ",router-service");
observerReadConf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".router-service", "router1");
observerReadConf.set(DFS_NAMENODE_RPC_ADDRESS_KEY+ ".router-service.router1",
getFileSystemURI().toString());
observerReadConf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX
+ "." + "router-service", ObserverReadProxyProvider.class.getName());
DistributedFileSystem.setDefaultUri(observerReadConf, "hdfs://router-service");

return DistributedFileSystem.get(observerReadConf);
}

public DFSClient getClient(UserGroupInformation user)
throws IOException, URISyntaxException, InterruptedException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertThrows;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
Expand All @@ -41,15 +42,40 @@
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.junit.After;
import org.junit.Test;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;

public class TestObserverWithRouter {

public class TestObserverWithRouter {
private static final String SKIP_BEFORE_EACH_CLUSTER_STARTUP = "SkipBeforeEachClusterStartup";
private MiniRouterDFSCluster cluster;
private RouterContext routerContext;
private FileSystem fileSystem;

public void startUpCluster(int numberOfObserver) throws Exception {
startUpCluster(numberOfObserver, null);
@BeforeEach
void init(TestInfo info) throws Exception {
if (info.getTags().contains(SKIP_BEFORE_EACH_CLUSTER_STARTUP)) {
return;
}
startUpCluster(2, null);
}

@AfterEach
public void teardown() throws IOException {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}

routerContext = null;

if (fileSystem != null) {
fileSystem.close();
fileSystem = null;
}
}

public void startUpCluster(int numberOfObserver, Configuration confOverrides) throws Exception {
Expand Down Expand Up @@ -95,31 +121,39 @@ public void startUpCluster(int numberOfObserver, Configuration confOverrides) th
cluster.installMockLocations();

cluster.waitActiveNamespaces();
routerContext = cluster.getRandomRouter();
fileSystem = routerContext.getFileSystemWithObserverReadsEnabled();
}

@After
public void teardown() throws IOException {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
@Test
public void testObserverRead() throws Exception {
internalTestObserverRead();
}

/**
* Tests that without adding config to use ObserverProxyProvider, the client shouldn't
* have reads served by Observers.
* Fixes regression in HDFS-13522.
*/
@Test
public void testObserverRead() throws Exception {
startUpCluster(1);
RouterContext routerContext = cluster.getRandomRouter();
public void testReadWithoutObserverClientConfigurations() throws Exception {
fileSystem.close();
fileSystem = routerContext.getFileSystem();
assertThrows(AssertionError.class, this::internalTestObserverRead);
}

public void internalTestObserverRead()
throws Exception {
List<? extends FederationNamenodeContext> namenodes = routerContext
.getRouter().getNamenodeResolver()
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
assertEquals("First namenode should be observer", namenodes.get(0).getState(),
FederationNamenodeServiceState.OBSERVER);
FileSystem fileSystem = routerContext.getFileSystem();
Path path = new Path("/testFile");
// Send Create call to active
// Send create call
fileSystem.create(path).close();

// Send read request to observer
// Send read request
fileSystem.open(path).close();

long rpcCountForActive = routerContext.getRouter().getRpcServer()
Expand All @@ -131,21 +165,19 @@ public void testObserverRead() throws Exception {
.getRPCMetrics().getObserverProxyOps();
// getBlockLocations should be sent to observer
assertEquals("One call should be sent to observer", 1, rpcCountForObserver);
fileSystem.close();
}

@Test
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testObserverReadWithoutFederatedStatePropagation() throws Exception {
Configuration confOverrides = new Configuration(false);
confOverrides.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, 0);
startUpCluster(1, confOverrides);
RouterContext routerContext = cluster.getRandomRouter();
startUpCluster(2, confOverrides);
List<? extends FederationNamenodeContext> namenodes = routerContext
.getRouter().getNamenodeResolver()
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
assertEquals("First namenode should be observer", namenodes.get(0).getState(),
FederationNamenodeServiceState.OBSERVER);
FileSystem fileSystem = routerContext.getFileSystem();
Path path = new Path("/testFile");
// Send Create call to active
fileSystem.create(path).close();
Expand All @@ -161,22 +193,19 @@ public void testObserverReadWithoutFederatedStatePropagation() throws Exception
long rpcCountForObserver = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getObserverProxyOps();
assertEquals("No call should be sent to observer", 0, rpcCountForObserver);
fileSystem.close();
}

@Test
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testDisablingObserverReadUsingNameserviceOverride() throws Exception {
// Disable observer reads using per-nameservice override
Configuration confOverrides = new Configuration(false);
confOverrides.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns0");
startUpCluster(1, confOverrides);
startUpCluster(2, confOverrides);

RouterContext routerContext = cluster.getRandomRouter();
FileSystem fileSystem = routerContext.getFileSystem();
Path path = new Path("/testFile");
fileSystem.create(path).close();
fileSystem.open(path).close();
fileSystem.close();

long rpcCountForActive = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getActiveProxyOps();
Expand All @@ -190,17 +219,15 @@ public void testDisablingObserverReadUsingNameserviceOverride() throws Exception

@Test
public void testReadWhenObserverIsDown() throws Exception {
startUpCluster(1);
RouterContext routerContext = cluster.getRandomRouter();
FileSystem fileSystem = routerContext.getFileSystem();
Path path = new Path("/testFile1");
// Send Create call to active
fileSystem.create(path).close();

// Stop observer NN
int nnIndex = stopObserver(1);

assertNotEquals("No observer found", 3, nnIndex);
nnIndex = stopObserver(1);
assertNotEquals("No observer found", 4, nnIndex);

// Send read request
fileSystem.open(path).close();
Expand All @@ -215,14 +242,10 @@ public void testReadWhenObserverIsDown() throws Exception {
.getRPCMetrics().getObserverProxyOps();
assertEquals("No call should send to observer", 0,
rpcCountForObserver);
fileSystem.close();
}

@Test
public void testMultipleObserver() throws Exception {
startUpCluster(2);
RouterContext routerContext = cluster.getRandomRouter();
FileSystem fileSystem = routerContext.getFileSystem();
Path path = new Path("/testFile1");
// Send Create call to active
fileSystem.create(path).close();
Expand Down Expand Up @@ -267,7 +290,6 @@ public void testMultipleObserver() throws Exception {
.getRpcServer().getRPCMetrics().getObserverProxyOps();
assertEquals("No call should send to observer",
expectedObserverRpc, rpcCountForObserver);
fileSystem.close();
}

private int stopObserver(int num) {
Expand All @@ -288,9 +310,9 @@ private int stopObserver(int num) {
// test router observer with multiple to know which observer NN received
// requests
@Test
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testMultipleObserverRouter() throws Exception {
StateStoreDFSCluster innerCluster;
RouterContext routerContext;
MembershipNamenodeResolver resolver;

String ns0;
Expand Down Expand Up @@ -356,14 +378,12 @@ public void testMultipleObserverRouter() throws Exception {
namespaceInfo0.get(1).getNamenodeId());
assertEquals(namespaceInfo1.get(0).getState(),
FederationNamenodeServiceState.OBSERVER);

innerCluster.shutdown();
}

@Test
public void testUnavailableObserverNN() throws Exception {
startUpCluster(2);
RouterContext routerContext = cluster.getRandomRouter();
FileSystem fileSystem = routerContext.getFileSystem();

stopObserver(2);

Path path = new Path("/testFile");
Expand Down Expand Up @@ -397,12 +417,10 @@ public void testUnavailableObserverNN() throws Exception {
assertTrue("There must be unavailable namenodes", hasUnavailable);
}



@Test
public void testRouterMsync() throws Exception {
startUpCluster(1);
RouterContext routerContext = cluster.getRandomRouter();

FileSystem fileSystem = routerContext.getFileSystem();
Path path = new Path("/testFile");

// Send Create call to active
Expand All @@ -420,6 +438,5 @@ public void testRouterMsync() throws Exception {
// 2 msync calls should be sent. One to each active namenode in the two namespaces.
assertEquals("Four calls should be sent to active", 4,
rpcCountForActive);
fileSystem.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ $(document).ready(function() {
var capabilityArr = scTableData.filter(item => (item.subcluster === row.id()));
var capabilityObj = JSON.parse(capabilityArr[0].capability).clusterMetrics;
row.child(
'<table>' +
'<table style="line-height:25px;" >' +
' <tr>' +
' <td>' +
' <h3>Application Metrics</h3>' +
Expand All @@ -42,11 +42,12 @@ $(document).ready(function() {
' <td>' +
' <h3>Resource Metrics</h3>' +
' <h4>Memory</h4>' +
' TotalMB : ' + capabilityObj.totalMB + ' </p>' +
' ReservedMB : ' + capabilityObj.reservedMB + ' </p>' +
' AvailableMB : ' + capabilityObj.availableMB + ' </p>' +
' AllocatedMB : ' + capabilityObj.allocatedMB + ' </p>' +
' PendingMB : ' + capabilityObj.pendingMB + ' </p>' +
' Total Memory : ' + capabilityArr[0].totalmemory + ' </p>' +
' Reserved Memory : ' + capabilityArr[0].reservedmemory + ' </p>' +
' Available Memory : ' + capabilityArr[0].availablememory + ' </p>' +
' Allocated Memory : ' + capabilityArr[0].allocatedmemory + ' </p>' +
' Pending Memory : ' + capabilityArr[0].pendingmemory + ' </p>' +
' <hr />' +
' <h4>VirtualCores</h4>' +
' TotalVirtualCores : ' + capabilityObj.totalVirtualCores + ' </p>' +
' ReservedVirtualCores : ' + capabilityObj.reservedVirtualCores + ' </p>' +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Date;

import com.google.gson.Gson;
import org.apache.hadoop.util.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.router.Router;
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;
Expand Down Expand Up @@ -149,32 +150,51 @@ private void initHtmlPageFederation(Block html, boolean isEnabled) {
ClusterMetricsInfo subClusterInfo = getClusterMetricsInfo(capability);

// Prepare LastStartTime & LastHeartBeat
String lastStartTime =
DateFormatUtils.format(subcluster.getLastStartTime(), DATE_PATTERN);
String lastHeartBeat =
DateFormatUtils.format(subcluster.getLastHeartBeat(), DATE_PATTERN);
Date lastStartTime = new Date(subcluster.getLastStartTime());
Date lastHeartBeat = new Date(subcluster.getLastHeartBeat());

// Prepare Resource
long totalMB = subClusterInfo.getTotalMB();
String totalMBDesc = StringUtils.byteDesc(totalMB * BYTES_IN_MB);
long totalVirtualCores = subClusterInfo.getTotalVirtualCores();
String resources = String.format("<Memory:%s, VCore:%s>", totalMBDesc, totalVirtualCores);
String resources = String.format("<memory:%s, vCores:%s>", totalMBDesc, totalVirtualCores);

// Prepare Node
long totalNodes = subClusterInfo.getTotalNodes();
long activeNodes = subClusterInfo.getActiveNodes();
String nodes = String.format("<Total Nodes:%s, Active Nodes:%s>", totalNodes, activeNodes);
String nodes = String.format("<totalNodes:%s, activeNodes:%s>", totalNodes, activeNodes);

// Prepare HTML Table
String stateStyle = "color:#dc3545;font-weight:bolder";
SubClusterState state = subcluster.getState();
if (SubClusterState.SC_RUNNING == state) {
stateStyle = "color:#28a745;font-weight:bolder";
}

tbody.tr().$id(subClusterIdText)
.td().$class("details-control").a(herfWebAppAddress, subClusterIdText).__()
.td(subcluster.getState().name())
.td(lastStartTime)
.td(lastHeartBeat)
.td().$style(stateStyle).__(state.name()).__()
.td().__(lastStartTime).__()
.td().__(lastHeartBeat).__()
.td(resources)
.td(nodes)
.__();

// Formatted memory information
long allocatedMB = subClusterInfo.getAllocatedMB();
String allocatedMBDesc = StringUtils.byteDesc(allocatedMB * BYTES_IN_MB);
long availableMB = subClusterInfo.getAvailableMB();
String availableMBDesc = StringUtils.byteDesc(availableMB * BYTES_IN_MB);
long pendingMB = subClusterInfo.getPendingMB();
String pendingMBDesc = StringUtils.byteDesc(pendingMB * BYTES_IN_MB);
long reservedMB = subClusterInfo.getReservedMB();
String reservedMBDesc = StringUtils.byteDesc(reservedMB * BYTES_IN_MB);

subclusterMap.put("totalmemory", totalMBDesc);
subclusterMap.put("allocatedmemory", allocatedMBDesc);
subclusterMap.put("availablememory", availableMBDesc);
subclusterMap.put("pendingmemory", pendingMBDesc);
subclusterMap.put("reservedmemory", reservedMBDesc);
subclusterMap.put("subcluster", subClusterId.getId());
subclusterMap.put("capability", capability);
lists.add(subclusterMap);
Expand Down
Loading

0 comments on commit 33e630b

Please sign in to comment.