Skip to content

Commit

Permalink
HADOOP-17122: Preserving Directory Attributes in DistCp with Atomic C…
Browse files Browse the repository at this point in the history
…opy (apache#2133)


Contributed by Swaminathan Balachandran
  • Loading branch information
swamirishi authored and swaminathan-b committed Aug 22, 2020
1 parent e0d8423 commit 6e5fae5
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,10 @@ private void preserveFileAttributesForDirectories(Configuration conf)
SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf,
SequenceFile.Reader.file(sourceListing));
long totalLen = clusterFS.getFileStatus(sourceListing).getLen();

Path targetRoot = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
// For Atomic Copy the Final & Work Path are different & atomic copy has
// already moved it to final path.
Path targetRoot =
new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));

long preservedEntries = 0;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import java.util.*;

import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH;
import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_TARGET_WORK_PATH;
import static org.apache.hadoop.tools.util.TestDistCpUtils.*;

public class TestCopyCommitter {
Expand Down Expand Up @@ -160,10 +162,10 @@ public void testPreserveStatus() throws IOException {
context.setTargetPathExists(false);

CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
Path listingFile = new Path("/tmp1/" + rand.nextLong());
listing.buildListing(listingFile, context);

conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);

committer.commitJob(jobContext);
checkDirectoryPermissions(fs, targetBase, sourcePerm);
Expand All @@ -179,6 +181,45 @@ public void testPreserveStatus() throws IOException {

}

@Test
public void testPreserveStatusWithAtomicCommit() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(
taskAttemptContext.getConfiguration(),
taskAttemptContext.getTaskAttemptID().getJobID());
Configuration conf = jobContext.getConfiguration();
String sourceBase;
String workBase;
String targetBase;
FileSystem fs = null;
try {
OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
fs = FileSystem.get(conf);
FsPermission sourcePerm = new FsPermission((short) 511);
FsPermission initialPerm = new FsPermission((short) 448);
sourceBase = TestDistCpUtils.createTestSetup(fs, sourcePerm);
workBase = TestDistCpUtils.createTestSetup(fs, initialPerm);
targetBase = "/tmp1/" + rand.nextLong();
final DistCpOptions options = new DistCpOptions.Builder(
Collections.singletonList(new Path(sourceBase)), new Path("/out"))
.preserve(FileAttribute.PERMISSION).build();
options.appendToConf(conf);
final DistCpContext context = new DistCpContext(options);
context.setTargetPathExists(false);
CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
Path listingFile = new Path("/tmp1/" + rand.nextLong());
listing.buildListing(listingFile, context);
conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
conf.set(CONF_LABEL_TARGET_WORK_PATH, workBase);
conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
committer.commitJob(jobContext);
checkDirectoryPermissions(fs, targetBase, sourcePerm);
} finally {
TestDistCpUtils.delete(fs, "/tmp1");
conf.unset(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
}
}

@Test
public void testDeleteMissing() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
Expand Down Expand Up @@ -207,8 +248,8 @@ public void testDeleteMissing() throws IOException {
Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
listing.buildListing(listingFile, context);

conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase);
conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);

committer.commitJob(jobContext);
verifyFoldersAreInSync(fs, targetBase, sourceBase);
Expand Down Expand Up @@ -256,8 +297,8 @@ public void testPreserveTimeWithDeleteMiss() throws IOException {
Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
listing.buildListing(listingFile, context);

conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase);
conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);

Path sourceListing = new Path(
conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
Expand Down Expand Up @@ -320,8 +361,8 @@ public void testDeleteMissingFlatInterleavedFiles() throws IOException {
Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
listing.buildListing(listingFile, context);

conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase);
conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);

committer.commitJob(jobContext);
verifyFoldersAreInSync(fs, targetBase, sourceBase);
Expand Down Expand Up @@ -353,8 +394,8 @@ public void testAtomicCommitMissingFinal() throws IOException {
fs = FileSystem.get(conf);
fs.mkdirs(new Path(workPath));

conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workPath);
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath);
conf.set(CONF_LABEL_TARGET_WORK_PATH, workPath);
conf.set(CONF_LABEL_TARGET_FINAL_PATH, finalPath);
conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);

assertPathExists(fs, "Work path", new Path(workPath));
Expand Down Expand Up @@ -391,8 +432,8 @@ public void testAtomicCommitExistingFinal() throws IOException {
fs.mkdirs(new Path(workPath));
fs.mkdirs(new Path(finalPath));

conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workPath);
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath);
conf.set(CONF_LABEL_TARGET_WORK_PATH, workPath);
conf.set(CONF_LABEL_TARGET_FINAL_PATH, finalPath);
conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);

assertPathExists(fs, "Work path", new Path(workPath));
Expand Down Expand Up @@ -463,8 +504,8 @@ private void testCommitWithChecksumMismatch(boolean skipCrc)
+ String.valueOf(rand.nextLong()));
listing.buildListing(listingFile, context);

conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase);
conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);

OutputCommitter committer = new CopyCommitter(
null, taskAttemptContext);
Expand Down

0 comments on commit 6e5fae5

Please sign in to comment.