Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into YARN-11158-V3
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Nov 10, 2022
2 parents 3877ee8 + b398a7b commit 8e5a381
Show file tree
Hide file tree
Showing 22 changed files with 294 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1393,8 +1393,7 @@ private class Listener extends Thread {
bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
//Could be an ephemeral port
this.listenPort = acceptChannel.socket().getLocalPort();
Thread.currentThread().setName("Listener at " +
bindAddress + "/" + this.listenPort);
LOG.info("Listener at {}:{}", bindAddress, this.listenPort);
// create a selector;
selector= Selector.open();
readers = new Reader[readThreads];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,14 @@ public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) {
if (all || changed()) {
numSamples += intervalStat.numSamples();
builder.addCounter(numInfo, numSamples)
.addGauge(avgInfo, lastStat().mean());
.addGauge(avgInfo, intervalStat.mean());
if (extended) {
builder.addGauge(stdevInfo, lastStat().stddev())
.addGauge(iMinInfo, lastStat().min())
.addGauge(iMaxInfo, lastStat().max())
builder.addGauge(stdevInfo, intervalStat.stddev())
.addGauge(iMinInfo, intervalStat.min())
.addGauge(iMaxInfo, intervalStat.max())
.addGauge(minInfo, minMax.min())
.addGauge(maxInfo, minMax.max())
.addGauge(iNumInfo, lastStat().numSamples());
.addGauge(iNumInfo, intervalStat.numSamples());
}
if (changed()) {
if (numSamples > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,27 @@ private static void snapshotMutableRatesWithAggregation(
}
}

/**
* MutableStat should output 0 instead of the previous state when there is no change.
*/
@Test public void testMutableWithoutChanged() {
MetricsRecordBuilder builderWithChange = mockMetricsRecordBuilder();
MetricsRecordBuilder builderWithoutChange = mockMetricsRecordBuilder();
MetricsRegistry registry = new MetricsRegistry("test");
MutableStat stat = registry.newStat("Test", "Test", "Ops", "Val", true);
stat.add(1000, 1000);
stat.add(1000, 2000);
registry.snapshot(builderWithChange, true);

assertCounter("TestNumOps", 2000L, builderWithChange);
assertGauge("TestINumOps", 2000L, builderWithChange);
assertGauge("TestAvgVal", 1.5, builderWithChange);

registry.snapshot(builderWithoutChange, true);
assertGauge("TestINumOps", 0L, builderWithoutChange);
assertGauge("TestAvgVal", 0.0, builderWithoutChange);
}

@Test
public void testDuplicateMetrics() {
MutableRatesWithAggregation rates = new MutableRatesWithAggregation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,6 @@ public static ClientProtocol createProxyWithAlignmentContext(
boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
AlignmentContext alignmentContext)
throws IOException {
if (alignmentContext == null) {
alignmentContext = new ClientGSIContext();
}
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
ProtobufRpcEngine2.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.net.NetUtils;
Expand Down Expand Up @@ -233,6 +234,20 @@ public FileSystem getFileSystem() throws IOException {
return DistributedFileSystem.get(conf);
}

public FileSystem getFileSystemWithObserverReadsEnabled() throws IOException {
Configuration observerReadConf = new Configuration(conf);
observerReadConf.set(DFS_NAMESERVICES,
observerReadConf.get(DFS_NAMESERVICES)+ ",router-service");
observerReadConf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".router-service", "router1");
observerReadConf.set(DFS_NAMENODE_RPC_ADDRESS_KEY+ ".router-service.router1",
getFileSystemURI().toString());
observerReadConf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX
+ "." + "router-service", ObserverReadProxyProvider.class.getName());
DistributedFileSystem.setDefaultUri(observerReadConf, "hdfs://router-service");

return DistributedFileSystem.get(observerReadConf);
}

public DFSClient getClient(UserGroupInformation user)
throws IOException, URISyntaxException, InterruptedException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertThrows;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
Expand All @@ -41,15 +42,40 @@
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.junit.After;
import org.junit.Test;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;

public class TestObserverWithRouter {

public class TestObserverWithRouter {
private static final String SKIP_BEFORE_EACH_CLUSTER_STARTUP = "SkipBeforeEachClusterStartup";
private MiniRouterDFSCluster cluster;
private RouterContext routerContext;
private FileSystem fileSystem;

public void startUpCluster(int numberOfObserver) throws Exception {
startUpCluster(numberOfObserver, null);
@BeforeEach
void init(TestInfo info) throws Exception {
if (info.getTags().contains(SKIP_BEFORE_EACH_CLUSTER_STARTUP)) {
return;
}
startUpCluster(2, null);
}

@AfterEach
public void teardown() throws IOException {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}

routerContext = null;

if (fileSystem != null) {
fileSystem.close();
fileSystem = null;
}
}

public void startUpCluster(int numberOfObserver, Configuration confOverrides) throws Exception {
Expand Down Expand Up @@ -95,31 +121,39 @@ public void startUpCluster(int numberOfObserver, Configuration confOverrides) th
cluster.installMockLocations();

cluster.waitActiveNamespaces();
routerContext = cluster.getRandomRouter();
fileSystem = routerContext.getFileSystemWithObserverReadsEnabled();
}

@After
public void teardown() throws IOException {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
@Test
public void testObserverRead() throws Exception {
internalTestObserverRead();
}

/**
* Tests that without adding config to use ObserverProxyProvider, the client shouldn't
* have reads served by Observers.
* Fixes regression in HDFS-13522.
*/
@Test
public void testObserverRead() throws Exception {
startUpCluster(1);
RouterContext routerContext = cluster.getRandomRouter();
public void testReadWithoutObserverClientConfigurations() throws Exception {
fileSystem.close();
fileSystem = routerContext.getFileSystem();
assertThrows(AssertionError.class, this::internalTestObserverRead);
}

public void internalTestObserverRead()
throws Exception {
List<? extends FederationNamenodeContext> namenodes = routerContext
.getRouter().getNamenodeResolver()
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
assertEquals("First namenode should be observer", namenodes.get(0).getState(),
FederationNamenodeServiceState.OBSERVER);
FileSystem fileSystem = routerContext.getFileSystem();
Path path = new Path("/testFile");
// Send Create call to active
// Send create call
fileSystem.create(path).close();

// Send read request to observer
// Send read request
fileSystem.open(path).close();

long rpcCountForActive = routerContext.getRouter().getRpcServer()
Expand All @@ -131,21 +165,19 @@ public void testObserverRead() throws Exception {
.getRPCMetrics().getObserverProxyOps();
// getBlockLocations should be sent to observer
assertEquals("One call should be sent to observer", 1, rpcCountForObserver);
fileSystem.close();
}

@Test
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testObserverReadWithoutFederatedStatePropagation() throws Exception {
Configuration confOverrides = new Configuration(false);
confOverrides.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, 0);
startUpCluster(1, confOverrides);
RouterContext routerContext = cluster.getRandomRouter();
startUpCluster(2, confOverrides);
List<? extends FederationNamenodeContext> namenodes = routerContext
.getRouter().getNamenodeResolver()
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
assertEquals("First namenode should be observer", namenodes.get(0).getState(),
FederationNamenodeServiceState.OBSERVER);
FileSystem fileSystem = routerContext.getFileSystem();
Path path = new Path("/testFile");
// Send Create call to active
fileSystem.create(path).close();
Expand All @@ -161,22 +193,19 @@ public void testObserverReadWithoutFederatedStatePropagation() throws Exception
long rpcCountForObserver = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getObserverProxyOps();
assertEquals("No call should be sent to observer", 0, rpcCountForObserver);
fileSystem.close();
}

@Test
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testDisablingObserverReadUsingNameserviceOverride() throws Exception {
// Disable observer reads using per-nameservice override
Configuration confOverrides = new Configuration(false);
confOverrides.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns0");
startUpCluster(1, confOverrides);
startUpCluster(2, confOverrides);

RouterContext routerContext = cluster.getRandomRouter();
FileSystem fileSystem = routerContext.getFileSystem();
Path path = new Path("/testFile");
fileSystem.create(path).close();
fileSystem.open(path).close();
fileSystem.close();

long rpcCountForActive = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getActiveProxyOps();
Expand All @@ -190,17 +219,15 @@ public void testDisablingObserverReadUsingNameserviceOverride() throws Exception

@Test
public void testReadWhenObserverIsDown() throws Exception {
startUpCluster(1);
RouterContext routerContext = cluster.getRandomRouter();
FileSystem fileSystem = routerContext.getFileSystem();
Path path = new Path("/testFile1");
// Send Create call to active
fileSystem.create(path).close();

// Stop observer NN
int nnIndex = stopObserver(1);

assertNotEquals("No observer found", 3, nnIndex);
nnIndex = stopObserver(1);
assertNotEquals("No observer found", 4, nnIndex);

// Send read request
fileSystem.open(path).close();
Expand All @@ -215,14 +242,10 @@ public void testReadWhenObserverIsDown() throws Exception {
.getRPCMetrics().getObserverProxyOps();
assertEquals("No call should send to observer", 0,
rpcCountForObserver);
fileSystem.close();
}

@Test
public void testMultipleObserver() throws Exception {
startUpCluster(2);
RouterContext routerContext = cluster.getRandomRouter();
FileSystem fileSystem = routerContext.getFileSystem();
Path path = new Path("/testFile1");
// Send Create call to active
fileSystem.create(path).close();
Expand Down Expand Up @@ -267,7 +290,6 @@ public void testMultipleObserver() throws Exception {
.getRpcServer().getRPCMetrics().getObserverProxyOps();
assertEquals("No call should send to observer",
expectedObserverRpc, rpcCountForObserver);
fileSystem.close();
}

private int stopObserver(int num) {
Expand All @@ -288,9 +310,9 @@ private int stopObserver(int num) {
// test router observer with multiple to know which observer NN received
// requests
@Test
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testMultipleObserverRouter() throws Exception {
StateStoreDFSCluster innerCluster;
RouterContext routerContext;
MembershipNamenodeResolver resolver;

String ns0;
Expand Down Expand Up @@ -356,14 +378,12 @@ public void testMultipleObserverRouter() throws Exception {
namespaceInfo0.get(1).getNamenodeId());
assertEquals(namespaceInfo1.get(0).getState(),
FederationNamenodeServiceState.OBSERVER);

innerCluster.shutdown();
}

@Test
public void testUnavailableObserverNN() throws Exception {
startUpCluster(2);
RouterContext routerContext = cluster.getRandomRouter();
FileSystem fileSystem = routerContext.getFileSystem();

stopObserver(2);

Path path = new Path("/testFile");
Expand Down Expand Up @@ -397,12 +417,10 @@ public void testUnavailableObserverNN() throws Exception {
assertTrue("There must be unavailable namenodes", hasUnavailable);
}



@Test
public void testRouterMsync() throws Exception {
startUpCluster(1);
RouterContext routerContext = cluster.getRandomRouter();

FileSystem fileSystem = routerContext.getFileSystem();
Path path = new Path("/testFile");

// Send Create call to active
Expand All @@ -420,6 +438,5 @@ public void testRouterMsync() throws Exception {
// 2 msync calls should be sent. One to each active namenode in the two namespaces.
assertEquals("Four calls should be sent to active", 4,
rpcCountForActive);
fileSystem.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3616,7 +3616,7 @@ private Block addStoredBlock(final BlockInfo block,
if (storedBlock == null || storedBlock.isDeleted()) {
// If this block does not belong to anyfile, then we are done.
blockLog.debug("BLOCK* addStoredBlock: {} on {} size {} but it does not belong to any file",
block, node, block.getNumBytes());
reportedBlock, node, reportedBlock.getNumBytes());
// we could add this block to invalidate set of this datanode.
// it will happen in next block report otherwise.
return block;
Expand All @@ -3631,12 +3631,12 @@ private Block addStoredBlock(final BlockInfo block,
(node.isDecommissioned() || node.isDecommissionInProgress()) ? 0 : 1;
if (logEveryBlock) {
blockLog.info("BLOCK* addStoredBlock: {} is added to {} (size={})",
node, storedBlock, storedBlock.getNumBytes());
node, reportedBlock, reportedBlock.getNumBytes());
}
} else if (result == AddBlockResult.REPLACED) {
curReplicaDelta = 0;
blockLog.warn("BLOCK* addStoredBlock: block {} moved to storageType " +
"{} on node {}", storedBlock, storageInfo.getStorageType(), node);
"{} on node {}", reportedBlock, storageInfo.getStorageType(), node);
} else {
// if the same block is added again and the replica was corrupt
// previously because of a wrong gen stamp, remove it from the
Expand All @@ -3646,8 +3646,8 @@ private Block addStoredBlock(final BlockInfo block,
curReplicaDelta = 0;
if (blockLog.isDebugEnabled()) {
blockLog.debug("BLOCK* addStoredBlock: Redundant addStoredBlock request"
+ " received for {} on node {} size {}", storedBlock, node,
storedBlock.getNumBytes());
+ " received for {} on node {} size {}", reportedBlock, node,
reportedBlock.getNumBytes());
}
}

Expand Down
Loading

0 comments on commit 8e5a381

Please sign in to comment.