Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into YARN-7707
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Jul 10, 2023
2 parents 7fd1d6d + 37b2d36 commit 331daec
Show file tree
Hide file tree
Showing 88 changed files with 46,413 additions and 1,539 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,9 @@ public class Balancer {
+ "on over-utilized machines."
+ "\n\t[-asService]\tRun as a long running service."
+ "\n\t[-sortTopNodes]"
+ "\n\t[-hotBlockTimeInterval]\tprefer to move cold blocks."
+ "\tSort datanodes based on the utilization so "
+ "that highly utilized datanodes get scheduled first.";
+ "that highly utilized datanodes get scheduled first."
+ "\n\t[-hotBlockTimeInterval]\tprefer to move cold blocks.";

@VisibleForTesting
private static volatile boolean serviceRunning = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,9 @@ void compute() throws IOException {
checksumBlock(block, idx, liveBlkInfo.getToken(),
liveBlkInfo.getDn());
} catch (IOException ioe) {
LOG.warn("Exception while reading checksum", ioe);
String msg = String.format("Exception while reading checksum for block %s at index " +
"%d in blockGroup %s", block, idx, blockGroup);
LOG.warn(msg, ioe);
// reconstruct block and calculate checksum for the failed node
recalculateChecksum(idx, block.getNumBytes());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.datanode.metrics;

import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsTag;
import org.apache.hadoop.metrics2.lib.Interns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
public static final String ENTERING_MAINTENANCE_STATUS =
"is ENTERING MAINTENANCE";
public static final String IN_MAINTENANCE_STATUS = "is IN MAINTENANCE";
public static final String STALE_STATUS = "is STALE";
public static final String NONEXISTENT_STATUS = "does not exist";
public static final String FAILURE_STATUS = "FAILED";
public static final String UNDEFINED = "undefined";
Expand Down Expand Up @@ -370,6 +371,8 @@ private void printDatanodeReplicaStatus(Block block,
out.print(ENTERING_MAINTENANCE_STATUS);
} else if (this.showMaintenanceState && dn.isInMaintenance()) {
out.print(IN_MAINTENANCE_STATUS);
} else if (dn.isStale(this.staleInterval)) {
out.print(STALE_STATUS);
} else {
out.print(HEALTHY_STATUS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,8 @@ else if (args[idx].equals("-replicaDetails")) {
errCode = 4;
} else if (lastLine.endsWith(NamenodeFsck.ENTERING_MAINTENANCE_STATUS)) {
errCode = 5;
} else if (lastLine.endsWith(NamenodeFsck.STALE_STATUS)) {
errCode = 6;
}
return errCode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ Usage:
[-idleiterations <idleiterations>]
[-runDuringUpgrade]
[-asService]
[-sortTopNodes]
[-hotBlockTimeInterval <specified time interval>]

| COMMAND\_OPTION | Description |
|:---- |:---- |
Expand All @@ -304,6 +306,7 @@ Usage:
| `-idleiterations` \<iterations\> | Maximum number of idle iterations before exit. This overwrites the default idleiterations(5). |
| `-runDuringUpgrade` | Whether to run the balancer during an ongoing HDFS upgrade. This is usually not desired since it will not affect used space on over-utilized machines. |
| `-asService` | Run Balancer as a long running service. |
| `-sortTopNodes` | Sort datanodes based on the utilization so that highly utilized datanodes get scheduled first. |
| `-hotBlockTimeInterval` | Prefer moving cold blocks i.e blocks associated with files accessed or modified before the specified time interval. |
| `-h`\|`--help` | Display the tool usage and help information and exit. |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.server.namenode;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.MiniDFSCluster.HDFS_MINIDFS_BASEDIR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -1681,6 +1682,91 @@ public Boolean get() {
assertFalse(fsckOut.contains(NamenodeFsck.IN_MAINTENANCE_STATUS));
}

/**
* Test for blockIdCK with datanode staleness.
*/
@Test
public void testBlockIdCKStaleness() throws Exception {
final short replFactor = 1;
final long blockSize = 512;
Configuration configuration = new Configuration();

// Shorten dfs.namenode.stale.datanode.interval for easier testing.
configuration.setLong(DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, 5000);
configuration.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
configuration.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, replFactor);

String[] racks = {"/rack1", "/rack2"};
String[] hosts = {"host1", "host2"};

File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(configuration, builderBaseDir)
.hosts(hosts).racks(racks).build();
assertNotNull("Failed Cluster Creation", cluster);
cluster.waitClusterUp();
DistributedFileSystem fs = cluster.getFileSystem();
assertNotNull("Failed to get FileSystem", fs);

try {
DFSTestUtil util = new DFSTestUtil.Builder().
setName(getClass().getSimpleName()).setNumFiles(1).build();

// Create one file.
final String pathString = new String("/testfile");
final Path path = new Path(pathString);
util.createFile(fs, path, 1024L, replFactor, 1024L);
util.waitReplication(fs, path, replFactor);
StringBuilder sb = new StringBuilder();
for (LocatedBlock lb: util.getAllBlocks(fs, path)) {
sb.append(lb.getBlock().getLocalBlock().getBlockName() + " ");
}
String[] bIds = sb.toString().split(" ");

// Make sure datanode is HEALTHY before down.
String outStr = runFsck(configuration, 0, true, "/", "-blockId", bIds[0]);
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));

FSNamesystem fsn = cluster.getNameNode().getNamesystem();
BlockManager bm = fsn.getBlockManager();
DatanodeManager dnm = bm.getDatanodeManager();
DatanodeDescriptor dn = dnm.getDatanode(cluster.getDataNodes().get(0)
.getDatanodeId());
final String dnName = dn.getXferAddr();

// Make the block on datanode enter stale state.
cluster.stopDataNode(0);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
DatanodeInfo datanodeInfo = null;
for (DatanodeInfo info : fs.getDataNodeStats()) {
if (dnName.equals(info.getXferAddr())) {
datanodeInfo = info;
}
}
if (datanodeInfo != null && datanodeInfo.isStale(5000)) {
return true;
}
} catch (Exception e) {
LOG.warn("Unexpected exception: " + e);
return false;
}
return false;
}
}, 500, 30000);
outStr = runFsck(configuration, 6, true, "/", "-blockId", bIds[0]);
assertTrue(outStr.contains(NamenodeFsck.STALE_STATUS));
} finally {
if (fs != null) {
fs.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
}

/**
* Test for blockIdCK with block corruption.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,19 @@ private static int getAppAttemptId(Configuration conf) {
MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
}

/**
* Build a qualified parent path for the temporary multipart upload commit
* directory built by {@link #getMultipartUploadCommitsDirectory(Configuration, String)}.
* @param conf configuration defining default FS.
* @param uuid uuid of job
* @return a path which can be used for temporary work
* @throws IOException on an IO failure.
*/
public static Path getStagingUploadsParentDirectory(Configuration conf,
String uuid) throws IOException {
return getMultipartUploadCommitsDirectory(conf, uuid).getParent();
}

/**
* Build a qualified temporary path for the multipart upload commit
* information in the cluster filesystem.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,8 @@ public void cleanupStagingDirs() {

/**
* Staging committer cleanup includes calling wrapped committer's
* cleanup method, and removing all destination paths in the final
* filesystem.
* cleanup method, and removing staging uploads path and all
* destination paths in the final filesystem.
* @param commitContext commit context
* @param suppressExceptions should exceptions be suppressed?
* @throws IOException IO failures if exceptions are not suppressed.
Expand All @@ -515,6 +515,9 @@ protected void cleanup(CommitContext commitContext,
maybeIgnore(suppressExceptions, "Cleanup wrapped committer",
() -> wrappedCommitter.cleanupJob(
commitContext.getJobContext()));
maybeIgnore(suppressExceptions, "Delete staging uploads path",
() -> deleteStagingUploadsParentDirectory(
commitContext.getJobContext()));
maybeIgnore(suppressExceptions, "Delete destination paths",
() -> deleteDestinationPaths(
commitContext.getJobContext()));
Expand Down Expand Up @@ -543,11 +546,26 @@ protected void abortJobInternal(CommitContext commitContext,
}
}

/**
* Delete the multipart upload staging directory.
* @param context job context
* @throws IOException IO failure
*/
protected void deleteStagingUploadsParentDirectory(JobContext context)
throws IOException {
Path stagingUploadsPath = Paths.getStagingUploadsParentDirectory(
context.getConfiguration(), getUUID());
ignoreIOExceptions(LOG,
"Deleting staging uploads path", stagingUploadsPath.toString(),
() -> deleteWithWarning(
stagingUploadsPath.getFileSystem(getConf()),
stagingUploadsPath,
true));
}

/**
* Delete the working paths of a job.
* <ol>
* <li>The job attempt path</li>
* <li>{@code $dest/__temporary}</li>
* <li>the local working directory for staged files</li>
* </ol>
Expand All @@ -556,14 +574,6 @@ protected void abortJobInternal(CommitContext commitContext,
* @throws IOException IO failure
*/
protected void deleteDestinationPaths(JobContext context) throws IOException {
Path attemptPath = getJobAttemptPath(context);
ignoreIOExceptions(LOG,
"Deleting Job attempt Path", attemptPath.toString(),
() -> deleteWithWarning(
getJobAttemptFileSystem(context),
attemptPath,
true));

// delete the __temporary directory. This will cause problems
// if there is >1 task targeting the same dest dir
deleteWithWarning(getDestFS(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,30 @@ public JobData(Job job,
this.committer = committer;
conf = job.getConfiguration();
}

public Job getJob() {
return job;
}

public JobContext getJContext() {
return jContext;
}

public TaskAttemptContext getTContext() {
return tContext;
}

public AbstractS3ACommitter getCommitter() {
return committer;
}

public Configuration getConf() {
return conf;
}

public Path getWrittenTextPath() {
return writtenTextPath;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.io.IOException;
import java.util.UUID;

import org.junit.Test;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -141,6 +143,74 @@ protected void validateTaskAttemptWorkingDirectory(final AbstractS3ACommitter co
assertEquals("file", wd.toUri().getScheme());
}

@Test
public void testStagingUploadsDirectoryCleanedUp() throws Exception {
describe("Assert that the staging uploads directory is cleaned up after successful commit");
JobData jobData = startJob(false);
JobContext jContext = jobData.getJContext();
TaskAttemptContext tContext = jobData.getTContext();
StagingCommitter committer = (StagingCommitter) jobData.getCommitter();

Path stagingUploadsDir = Paths.getStagingUploadsParentDirectory(
jContext.getConfiguration(),
committer.getUUID());

ContractTestUtils.assertPathExists(
stagingUploadsDir.getFileSystem(jContext.getConfiguration()),
"staging uploads path must exist after setupJob",
stagingUploadsDir
);

// write output
writeTextOutput(tContext);

// do commit
committer.commitTask(tContext);

commitJob(committer, jContext);

ContractTestUtils.assertPathDoesNotExist(
stagingUploadsDir.getFileSystem(jContext.getConfiguration()),
"staging uploads path must not exist after commitJob",
stagingUploadsDir
);
}

@Test
public void testStagingUploadsDirectoryCleanedUpWithFailure() throws Exception {
describe("Assert that the staging uploads directory is cleaned up after failed commit");
JobData jobData = startJob(new FailingCommitterFactory(), false);
JobContext jContext = jobData.getJContext();
TaskAttemptContext tContext = jobData.getTContext();
StagingCommitter committer = (StagingCommitter) jobData.getCommitter();

Path stagingUploadsDir = Paths.getStagingUploadsParentDirectory(
jContext.getConfiguration(),
committer.getUUID());

ContractTestUtils.assertPathExists(
stagingUploadsDir.getFileSystem(jContext.getConfiguration()),
"staging uploads path must exist after setupJob",
stagingUploadsDir
);

// do commit
committer.commitTask(tContext);

// now fail job
expectSimulatedFailureOnJobCommit(jContext, committer);

commitJob(committer, jContext);

expectJobCommitToFail(jContext, committer);

ContractTestUtils.assertPathDoesNotExist(
stagingUploadsDir.getFileSystem(jContext.getConfiguration()),
"staging uploads path must not exist after commitJob",
stagingUploadsDir
);
}

/**
* The class provides a overridden implementation of commitJobInternal which
* causes the commit failed for the first time then succeed.
Expand Down
Loading

0 comments on commit 331daec

Please sign in to comment.