Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into YARN-11525
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Jul 11, 2023
2 parents c31170e + c13d929 commit 33c7dc1
Show file tree
Hide file tree
Showing 102 changed files with 46,662 additions and 1,555 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.impl;

import javax.annotation.Nullable;

/**
* Holds reference to an object to be attached to a stream or store to avoid
* the reference being lost to GC.
*/
public class BackReference {
private final Object reference;

public BackReference(@Nullable Object reference) {
this.reference = reference;
}

/**
* is the reference null?
* @return true if the ref. is null, else false.
*/
public boolean isNull() {
return reference == null;
}

@Override
public String toString() {
return "BackReference{" +
"reference=" + reference +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2321,6 +2321,15 @@ The switch to turn S3A auditing on or off.
</description>
</property>

<property>
<name>ipc.server.handler.queue.size</name>
<value>100</value>
<description>
Indicates how many calls per handler are allowed in the queue. This value can
determine the maximum call queue size by multiplying the number of handler threads.
</description>
</property>

<property>
<name>ipc.server.listen.queue.size</name>
<value>256</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,9 @@ public class Balancer {
+ "on over-utilized machines."
+ "\n\t[-asService]\tRun as a long running service."
+ "\n\t[-sortTopNodes]"
+ "\n\t[-hotBlockTimeInterval]\tprefer to move cold blocks."
+ "\tSort datanodes based on the utilization so "
+ "that highly utilized datanodes get scheduled first.";
+ "that highly utilized datanodes get scheduled first."
+ "\n\t[-hotBlockTimeInterval]\tprefer to move cold blocks.";

@VisibleForTesting
private static volatile boolean serviceRunning = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,9 @@ void compute() throws IOException {
checksumBlock(block, idx, liveBlkInfo.getToken(),
liveBlkInfo.getDn());
} catch (IOException ioe) {
LOG.warn("Exception while reading checksum", ioe);
String msg = String.format("Exception while reading checksum for block %s at index " +
"%d in blockGroup %s", block, idx, blockGroup);
LOG.warn(msg, ioe);
// reconstruct block and calculate checksum for the failed node
recalculateChecksum(idx, block.getNumBytes());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,13 +308,6 @@ Map<String, BPOfferService> getBpByNameserviceId() {
return bpByNameserviceId;
}

boolean isSlownodeByNameserviceId(String nsId) {
if (bpByNameserviceId.containsKey(nsId)) {
return bpByNameserviceId.get(nsId).isSlownode();
}
return false;
}

boolean isSlownodeByBlockPoolId(String bpId) {
if (bpByBlockPoolId.containsKey(bpId)) {
return bpByBlockPoolId.get(bpId).isSlownode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4240,10 +4240,6 @@ public DataSetLockManager getDataSetLockManager() {
return dataSetLockManager;
}

boolean isSlownodeByNameserviceId(String nsId) {
return blockPoolManager.isSlownodeByNameserviceId(nsId);
}

boolean isSlownodeByBlockPoolId(String bpId) {
return blockPoolManager.isSlownodeByBlockPoolId(bpId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.datanode.metrics;

import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsTag;
import org.apache.hadoop.metrics2.lib.Interns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
public static final String ENTERING_MAINTENANCE_STATUS =
"is ENTERING MAINTENANCE";
public static final String IN_MAINTENANCE_STATUS = "is IN MAINTENANCE";
public static final String STALE_STATUS = "is STALE";
public static final String NONEXISTENT_STATUS = "does not exist";
public static final String FAILURE_STATUS = "FAILED";
public static final String UNDEFINED = "undefined";
Expand Down Expand Up @@ -370,6 +371,8 @@ private void printDatanodeReplicaStatus(Block block,
out.print(ENTERING_MAINTENANCE_STATUS);
} else if (this.showMaintenanceState && dn.isInMaintenance()) {
out.print(IN_MAINTENANCE_STATUS);
} else if (dn.isStale(this.staleInterval)) {
out.print(STALE_STATUS);
} else {
out.print(HEALTHY_STATUS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,8 @@ else if (args[idx].equals("-replicaDetails")) {
errCode = 4;
} else if (lastLine.endsWith(NamenodeFsck.ENTERING_MAINTENANCE_STATUS)) {
errCode = 5;
} else if (lastLine.endsWith(NamenodeFsck.STALE_STATUS)) {
errCode = 6;
}
return errCode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ Usage:
[-idleiterations <idleiterations>]
[-runDuringUpgrade]
[-asService]
[-sortTopNodes]
[-hotBlockTimeInterval <specified time interval>]

| COMMAND\_OPTION | Description |
|:---- |:---- |
Expand All @@ -304,6 +306,7 @@ Usage:
| `-idleiterations` \<iterations\> | Maximum number of idle iterations before exit. This overwrites the default idleiterations(5). |
| `-runDuringUpgrade` | Whether to run the balancer during an ongoing HDFS upgrade. This is usually not desired since it will not affect used space on over-utilized machines. |
| `-asService` | Run Balancer as a long running service. |
| `-sortTopNodes` | Sort datanodes based on the utilization so that highly utilized datanodes get scheduled first. |
| `-hotBlockTimeInterval` | Prefer moving cold blocks i.e blocks associated with files accessed or modified before the specified time interval. |
| `-h`\|`--help` | Display the tool usage and help information and exit. |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.server.namenode;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.MiniDFSCluster.HDFS_MINIDFS_BASEDIR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -1681,6 +1682,91 @@ public Boolean get() {
assertFalse(fsckOut.contains(NamenodeFsck.IN_MAINTENANCE_STATUS));
}

/**
* Test for blockIdCK with datanode staleness.
*/
@Test
public void testBlockIdCKStaleness() throws Exception {
final short replFactor = 1;
final long blockSize = 512;
Configuration configuration = new Configuration();

// Shorten dfs.namenode.stale.datanode.interval for easier testing.
configuration.setLong(DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, 5000);
configuration.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
configuration.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, replFactor);

String[] racks = {"/rack1", "/rack2"};
String[] hosts = {"host1", "host2"};

File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(configuration, builderBaseDir)
.hosts(hosts).racks(racks).build();
assertNotNull("Failed Cluster Creation", cluster);
cluster.waitClusterUp();
DistributedFileSystem fs = cluster.getFileSystem();
assertNotNull("Failed to get FileSystem", fs);

try {
DFSTestUtil util = new DFSTestUtil.Builder().
setName(getClass().getSimpleName()).setNumFiles(1).build();

// Create one file.
final String pathString = new String("/testfile");
final Path path = new Path(pathString);
util.createFile(fs, path, 1024L, replFactor, 1024L);
util.waitReplication(fs, path, replFactor);
StringBuilder sb = new StringBuilder();
for (LocatedBlock lb: util.getAllBlocks(fs, path)) {
sb.append(lb.getBlock().getLocalBlock().getBlockName() + " ");
}
String[] bIds = sb.toString().split(" ");

// Make sure datanode is HEALTHY before down.
String outStr = runFsck(configuration, 0, true, "/", "-blockId", bIds[0]);
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));

FSNamesystem fsn = cluster.getNameNode().getNamesystem();
BlockManager bm = fsn.getBlockManager();
DatanodeManager dnm = bm.getDatanodeManager();
DatanodeDescriptor dn = dnm.getDatanode(cluster.getDataNodes().get(0)
.getDatanodeId());
final String dnName = dn.getXferAddr();

// Make the block on datanode enter stale state.
cluster.stopDataNode(0);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
DatanodeInfo datanodeInfo = null;
for (DatanodeInfo info : fs.getDataNodeStats()) {
if (dnName.equals(info.getXferAddr())) {
datanodeInfo = info;
}
}
if (datanodeInfo != null && datanodeInfo.isStale(5000)) {
return true;
}
} catch (Exception e) {
LOG.warn("Unexpected exception: " + e);
return false;
}
return false;
}
}, 500, 30000);
outStr = runFsck(configuration, 6, true, "/", "-blockId", bIds[0]);
assertTrue(outStr.contains(NamenodeFsck.STALE_STATUS));
} finally {
if (fs != null) {
fs.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
}

/**
* Test for blockIdCK with block corruption.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,19 @@ private static int getAppAttemptId(Configuration conf) {
MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
}

/**
* Build a qualified parent path for the temporary multipart upload commit
* directory built by {@link #getMultipartUploadCommitsDirectory(Configuration, String)}.
* @param conf configuration defining default FS.
* @param uuid uuid of job
* @return a path which can be used for temporary work
* @throws IOException on an IO failure.
*/
public static Path getStagingUploadsParentDirectory(Configuration conf,
String uuid) throws IOException {
return getMultipartUploadCommitsDirectory(conf, uuid).getParent();
}

/**
* Build a qualified temporary path for the multipart upload commit
* information in the cluster filesystem.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,8 @@ public void cleanupStagingDirs() {

/**
* Staging committer cleanup includes calling wrapped committer's
* cleanup method, and removing all destination paths in the final
* filesystem.
* cleanup method, and removing staging uploads path and all
* destination paths in the final filesystem.
* @param commitContext commit context
* @param suppressExceptions should exceptions be suppressed?
* @throws IOException IO failures if exceptions are not suppressed.
Expand All @@ -515,6 +515,9 @@ protected void cleanup(CommitContext commitContext,
maybeIgnore(suppressExceptions, "Cleanup wrapped committer",
() -> wrappedCommitter.cleanupJob(
commitContext.getJobContext()));
maybeIgnore(suppressExceptions, "Delete staging uploads path",
() -> deleteStagingUploadsParentDirectory(
commitContext.getJobContext()));
maybeIgnore(suppressExceptions, "Delete destination paths",
() -> deleteDestinationPaths(
commitContext.getJobContext()));
Expand Down Expand Up @@ -543,11 +546,26 @@ protected void abortJobInternal(CommitContext commitContext,
}
}

/**
* Delete the multipart upload staging directory.
* @param context job context
* @throws IOException IO failure
*/
protected void deleteStagingUploadsParentDirectory(JobContext context)
throws IOException {
Path stagingUploadsPath = Paths.getStagingUploadsParentDirectory(
context.getConfiguration(), getUUID());
ignoreIOExceptions(LOG,
"Deleting staging uploads path", stagingUploadsPath.toString(),
() -> deleteWithWarning(
stagingUploadsPath.getFileSystem(getConf()),
stagingUploadsPath,
true));
}

/**
* Delete the working paths of a job.
* <ol>
* <li>The job attempt path</li>
* <li>{@code $dest/__temporary}</li>
* <li>the local working directory for staged files</li>
* </ol>
Expand All @@ -556,14 +574,6 @@ protected void abortJobInternal(CommitContext commitContext,
* @throws IOException IO failure
*/
protected void deleteDestinationPaths(JobContext context) throws IOException {
Path attemptPath = getJobAttemptPath(context);
ignoreIOExceptions(LOG,
"Deleting Job attempt Path", attemptPath.toString(),
() -> deleteWithWarning(
getJobAttemptFileSystem(context),
attemptPath,
true));

// delete the __temporary directory. This will cause problems
// if there is >1 task targeting the same dest dir
deleteWithWarning(getDestFS(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,30 @@ public JobData(Job job,
this.committer = committer;
conf = job.getConfiguration();
}

public Job getJob() {
return job;
}

public JobContext getJContext() {
return jContext;
}

public TaskAttemptContext getTContext() {
return tContext;
}

public AbstractS3ACommitter getCommitter() {
return committer;
}

public Configuration getConf() {
return conf;
}

public Path getWrittenTextPath() {
return writtenTextPath;
}
}

/**
Expand Down
Loading

0 comments on commit 33c7dc1

Please sign in to comment.