diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index c5e6e09a835ebf..b5961bff641972 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -52,6 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; +import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -235,6 +236,7 @@ import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS; +import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_NO_OVERWRITE; import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_OVERWRITE; @@ -3906,6 +3908,21 @@ public void access(final Path f, final FsAction mode) @Retries.RetryTranslated public FileStatus getFileStatus(final Path f) throws IOException { Path path = qualify(f); + if (isTrackMagicCommitsInMemoryEnabled(getConf()) && isMagicCommitPath(path)) { + // Some downstream apps might call getFileStatus for a magic path to get the file size. + // when commit data is stored in memory construct the dummy S3AFileStatus with correct + // file size fetched from the memory. + if (InMemoryMagicCommitTracker.getTaskAttemptIdToBytesWritten().containsKey(path)) { + long len = InMemoryMagicCommitTracker.getTaskAttemptIdToBytesWritten().get(path); + return new S3AFileStatus(len, + 0L, + path, + getDefaultBlockSize(path), + username, + null, + null); + } + } return trackDurationAndSpan( INVOCATION_GET_FILE_STATUS, path, () -> innerGetFileStatus(path, false, StatusProbeEnum.ALL)); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java index 52df58d6a4b43f..baec79fd79aa4b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java @@ -242,6 +242,13 @@ private CommitConstants() { */ public static final int DEFAULT_COMMITTER_THREADS = 32; + + public static final String FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED = + "fs.s3a.committer.magic.track.commits.in.memory.enabled"; + + public static final boolean FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT = + false; + /** * Path in the cluster filesystem for temporary data: {@value}. * This is for HDFS, not the local filesystem. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java index e6524c91961dc4..a2c11a15c13fe2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java @@ -20,17 +20,19 @@ import java.util.List; +import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker; +import org.apache.hadoop.fs.s3a.commit.magic.S3MagicCommitTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.Statistic; -import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTracker; import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation; import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics; import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*; +import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled; /** * Adds the code needed for S3A to support magic committers. @@ -105,13 +107,15 @@ public PutTracker createTracker(Path path, String key, String pendingsetPath = key + CommitConstants.PENDING_SUFFIX; getStoreContext().incrementStatistic( Statistic.COMMITTER_MAGIC_FILES_CREATED); - tracker = new MagicCommitTracker(path, - getStoreContext().getBucket(), - key, - destKey, - pendingsetPath, - owner.getWriteOperationHelper(), - trackerStatistics); + if (isTrackMagicCommitsInMemoryEnabled(getStoreContext().getConfiguration())) { + tracker = new InMemoryMagicCommitTracker(path, getStoreContext().getBucket(), + key, destKey, pendingsetPath, owner.getWriteOperationHelper(), + trackerStatistics); + } else { + tracker = new S3MagicCommitTracker(path, getStoreContext().getBucket(), + key, destKey, pendingsetPath, owner.getWriteOperationHelper(), + trackerStatistics); + } LOG.debug("Created {}", tracker); } else { LOG.warn("File being created has a \"magic\" path, but the filesystem" diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java new file mode 100644 index 00000000000000..9ab5eca1fb577c --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.magic; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.WriteOperationHelper; +import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; +import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; +import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; +import software.amazon.awssdk.services.s3.model.CompletedPart; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.extractTaskAttemptIdFromPath; + +/** + * InMemoryMagicCommitTracker stores the commit data in memory. + * The commit data and related data stores are flushed out from + * the memory when the task is committed or aborted. + */ +public class InMemoryMagicCommitTracker extends MagicCommitTracker { + + // stores taskAttemptId to commit data mapping + private static Map> + taskAttemptIdToMpuMetdadataMap = new ConcurrentHashMap<>(); + + // stores the path to its length/size mapping + private static Map taskAttemptIdToBytesWritten = new ConcurrentHashMap<>(); + + // stores taskAttemptId to path mapping + private static Map> taskAttemptIdToPath = new ConcurrentHashMap<>(); + + public InMemoryMagicCommitTracker(Path path, + String bucket, + String originalDestKey, + String destKey, + String pendingsetKey, + WriteOperationHelper writer, + PutTrackerStatistics trackerStatistics) { + super(path, bucket, originalDestKey, destKey, pendingsetKey, writer, trackerStatistics); + } + + @Override + public boolean aboutToComplete(String uploadId, + List parts, + long bytesWritten, + final IOStatistics iostatistics) + throws IOException { + Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId), + "empty/null upload ID: " + uploadId); + Preconditions.checkArgument(parts != null, "No uploaded parts list"); + Preconditions.checkArgument(!parts.isEmpty(), "No uploaded parts to save"); + + // build the commit summary + SinglePendingCommit commitData = new SinglePendingCommit(); + commitData.touch(System.currentTimeMillis()); + commitData.setDestinationKey(getDestKey()); + commitData.setBucket(getBucket()); + commitData.setUri(getPath().toUri().toString()); + commitData.setUploadId(uploadId); + commitData.setText(""); + commitData.setLength(bytesWritten); + commitData.bindCommitData(parts); + commitData.setIOStatistics(new IOStatisticsSnapshot(iostatistics)); + + // extract the taskAttemptId from the path + String taskAttemptId = extractTaskAttemptIdFromPath(getPath()); + + // store the commit data with taskAttemptId as the key + taskAttemptIdToMpuMetdadataMap.computeIfAbsent(taskAttemptId, + k -> Collections.synchronizedList(new ArrayList<>())).add(commitData); + + // store the byteswritten(length) for the corresponding file + taskAttemptIdToBytesWritten.put(getPath(), bytesWritten); + + // store the mapping between taskAttemptId and path + // This information is used for removing entries from + // the map once the taskAttempt is completed/committed. + taskAttemptIdToPath.computeIfAbsent(taskAttemptId, + k -> Collections.synchronizedList(new ArrayList<>())).add(getPath()); + + LOG.info("commit metadata for {} parts in {}. size: {} byte(s) " + + "for the taskAttemptId: {} is stored in memory", + parts.size(), getPendingPartKey(), bytesWritten, taskAttemptId); + LOG.debug("Closed MPU to {}, saved commit information to {}; data=:\n{}", + getPath(), getPendingPartKey(), commitData); + + return false; + } + + public static Map> getTaskAttemptIdToMpuMetdadataMap() { + return taskAttemptIdToMpuMetdadataMap; + } + + public static Map getTaskAttemptIdToBytesWritten() { + return taskAttemptIdToBytesWritten; + } + + public static Map> getTaskAttemptIdToPath() { + return taskAttemptIdToPath; + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java index b2e703e1b088d7..2e439669328232 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java @@ -18,37 +18,22 @@ package org.apache.hadoop.fs.s3a.commit.magic; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; import java.util.List; -import java.util.Map; import software.amazon.awssdk.services.s3.model.CompletedPart; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3a.Retries; -import org.apache.hadoop.fs.s3a.S3ADataBlocks; import org.apache.hadoop.fs.s3a.WriteOperationHelper; import org.apache.hadoop.fs.s3a.commit.PutTracker; -import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; -import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics; import org.apache.hadoop.fs.statistics.IOStatistics; -import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; -import org.apache.hadoop.util.Preconditions; import static java.util.Objects.requireNonNull; -import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT; -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER; -import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation; /** * Put tracker for Magic commits. @@ -65,7 +50,7 @@ public class MagicCommitTracker extends PutTracker { private final Path path; private final WriteOperationHelper writer; private final String bucket; - private static final byte[] EMPTY = new byte[0]; + protected static final byte[] EMPTY = new byte[0]; private final PutTrackerStatistics trackerStatistics; /** @@ -118,76 +103,21 @@ public boolean outputImmediatelyVisible() { /** * Complete operation: generate the final commit data, put it. - * @param uploadId Upload ID - * @param parts list of parts + * + * @param uploadId Upload ID + * @param parts list of parts * @param bytesWritten bytes written * @param iostatistics nullable IO statistics * @return false, indicating that the commit must fail. - * @throws IOException any IO problem. + * @throws IOException any IO problem. * @throws IllegalArgumentException bad argument */ @Override public boolean aboutToComplete(String uploadId, List parts, long bytesWritten, - final IOStatistics iostatistics) - throws IOException { - Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId), - "empty/null upload ID: "+ uploadId); - Preconditions.checkArgument(parts != null, - "No uploaded parts list"); - Preconditions.checkArgument(!parts.isEmpty(), - "No uploaded parts to save"); - - // put a 0-byte file with the name of the original under-magic path - // Add the final file length as a header - // this is done before the task commit, so its duration can be - // included in the statistics - Map headers = new HashMap<>(); - headers.put(X_HEADER_MAGIC_MARKER, Long.toString(bytesWritten)); - PutObjectRequest originalDestPut = writer.createPutObjectRequest( - originalDestKey, - 0, - new PutObjectOptions(true, null, headers), false); - upload(originalDestPut, new ByteArrayInputStream(EMPTY)); - - // build the commit summary - SinglePendingCommit commitData = new SinglePendingCommit(); - commitData.touch(System.currentTimeMillis()); - commitData.setDestinationKey(getDestKey()); - commitData.setBucket(bucket); - commitData.setUri(path.toUri().toString()); - commitData.setUploadId(uploadId); - commitData.setText(""); - commitData.setLength(bytesWritten); - commitData.bindCommitData(parts); - commitData.setIOStatistics( - new IOStatisticsSnapshot(iostatistics)); - - byte[] bytes = commitData.toBytes(SinglePendingCommit.serializer()); - LOG.info("Uncommitted data pending to file {};" - + " commit metadata for {} parts in {}. size: {} byte(s)", - path.toUri(), parts.size(), pendingPartKey, bytesWritten); - LOG.debug("Closed MPU to {}, saved commit information to {}; data=:\n{}", - path, pendingPartKey, commitData); - PutObjectRequest put = writer.createPutObjectRequest( - pendingPartKey, - bytes.length, null, false); - upload(put, new ByteArrayInputStream(bytes)); + final IOStatistics iostatistics) throws IOException { return false; - - } - /** - * PUT an object. - * @param request the request - * @param inputStream input stream of data to be uploaded - * @throws IOException on problems - */ - @Retries.RetryTranslated - private void upload(PutObjectRequest request, InputStream inputStream) throws IOException { - trackDurationOfInvocation(trackerStatistics, COMMITTER_MAGIC_MARKER_PUT.getSymbol(), - () -> writer.putObject(request, PutObjectOptions.keepingDirs(), - new S3ADataBlocks.BlockUploadData(inputStream), false, null)); } @Override @@ -201,4 +131,28 @@ public String toString() { sb.append('}'); return sb.toString(); } + + public String getOriginalDestKey() { + return originalDestKey; + } + + public String getPendingPartKey() { + return pendingPartKey; + } + + public Path getPath() { + return path; + } + + public String getBucket() { + return bucket; + } + + public WriteOperationHelper getWriter() { + return writer; + } + + public PutTrackerStatistics getTrackerStatistics() { + return trackerStatistics; + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java new file mode 100644 index 00000000000000..f923e4df409722 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.magic; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.MagicCommitPaths; + +import java.util.List; + +import static org.apache.hadoop.util.Preconditions.checkArgument; + +/** + * Utility class for the class {@link MagicCommitTracker} and its subclasses. + */ +public final class MagicCommitTrackerUtils { + + private MagicCommitTrackerUtils() { + } + + /** + * The magic path is of the following format. + * "s3://bucket-name/table-path/__magic_jobId/job-id/taskAttempt/id/tasks/taskAttemptId" + * So the third child from the "__magic" path will give the task attempt id. + * @param path Path + * @return taskAttemptId + */ + public static String extractTaskAttemptIdFromPath(Path path) { + List elementsInPath = MagicCommitPaths.splitPathToElements(path); + List childrenOfMagicPath = MagicCommitPaths.magicPathChildren(elementsInPath); + + checkArgument(childrenOfMagicPath.size() >= 3, "Magic Path is invalid"); + // 3rd child of the magic path is the taskAttemptId + return childrenOfMagicPath.get(3); + } + + /** + * Is tracking of magic commit data in-memory enabled. + * @param conf Configuration + * @return true if in memory tracking of commit data is enabled. + */ + public static boolean isTrackMagicCommitsInMemoryEnabled(Configuration conf) { + return conf.getBoolean( + CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED, + CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java index 518831b7d4330d..974d0e5d308a5e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java @@ -18,7 +18,9 @@ package org.apache.hadoop.fs.s3a.commit.magic; +import java.io.FileNotFoundException; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import org.slf4j.Logger; @@ -48,8 +50,8 @@ import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TASK_ATTEMPT_ID; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TEMP_DATA; import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*; -import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*; import static org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.*; +import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics; /** @@ -192,23 +194,9 @@ public void commitTask(TaskAttemptContext context) throws IOException { */ private PendingSet innerCommitTask( TaskAttemptContext context) throws IOException { - Path taskAttemptPath = getTaskAttemptPath(context); // load in all pending commits. - CommitOperations actions = getCommitOperations(); - PendingSet pendingSet; + PendingSet pendingSet = loadPendingCommits(context); try (CommitContext commitContext = initiateTaskOperation(context)) { - Pair>> - loaded = actions.loadSinglePendingCommits( - taskAttemptPath, true, commitContext); - pendingSet = loaded.getKey(); - List> failures = loaded.getValue(); - if (!failures.isEmpty()) { - // At least one file failed to load - // revert all which did; report failure with first exception - LOG.error("At least one commit file could not be read: failing"); - abortPendingUploads(commitContext, pendingSet.getCommits(), true); - throw failures.get(0).getValue(); - } // patch in IDs String jobId = getUUID(); String taskId = String.valueOf(context.getTaskAttemptID()); @@ -248,6 +236,79 @@ private PendingSet innerCommitTask( return pendingSet; } + /** + * Loads pending commits from either memory or from the remote store (S3) based on the config. + * @param context TaskAttemptContext + * @return All pending commit data for the given TaskAttemptContext + * @throws IOException + */ + protected PendingSet loadPendingCommits(TaskAttemptContext context) throws IOException { + PendingSet pendingSet = new PendingSet(); + if (isTrackMagicCommitsInMemoryEnabled(context.getConfiguration())) { + // load from memory + List pendingCommits = loadPendingCommitsFromMemory(context); + + for (SinglePendingCommit singleCommit : pendingCommits) { + // aggregate stats + pendingSet.getIOStatistics() + .aggregate(singleCommit.getIOStatistics()); + // then clear so they aren't marshalled again. + singleCommit.getIOStatistics().clear(); + } + pendingSet.setCommits(pendingCommits); + } else { + // Load from remote store + CommitOperations actions = getCommitOperations(); + Path taskAttemptPath = getTaskAttemptPath(context); + try (CommitContext commitContext = initiateTaskOperation(context)) { + Pair>> loaded = + actions.loadSinglePendingCommits(taskAttemptPath, true, commitContext); + pendingSet = loaded.getKey(); + List> failures = loaded.getValue(); + if (!failures.isEmpty()) { + // At least one file failed to load + // revert all which did; report failure with first exception + LOG.error("At least one commit file could not be read: failing"); + abortPendingUploads(commitContext, pendingSet.getCommits(), true); + throw failures.get(0).getValue(); + } + } + } + return pendingSet; + } + + private List loadPendingCommitsFromMemory(TaskAttemptContext context) + throws FileNotFoundException { + String taskAttemptId = String.valueOf(context.getTaskAttemptID()); + // get all the pending commit metadata associated with the taskAttemptId. + // This will also remove the entry from the map. + List pendingCommits = + InMemoryMagicCommitTracker.getTaskAttemptIdToMpuMetdadataMap().remove(taskAttemptId); + // get all the path/files associated with the taskAttemptId. + // This will also remove the entry from the map. + List pathsAssociatedWithTaskAttemptId = + InMemoryMagicCommitTracker.getTaskAttemptIdToPath().remove(taskAttemptId); + + // for each of the path remove the entry from map, + // This is done so that there is no memory leak. + if (pathsAssociatedWithTaskAttemptId != null) { + for (Path path : pathsAssociatedWithTaskAttemptId) { + boolean cleared = + InMemoryMagicCommitTracker.getTaskAttemptIdToBytesWritten().remove(path) != null; + LOG.debug("Removing path: {} from the memory isSuccess: {}", path, cleared); + } + } else { + LOG.debug("No paths to remove for taskAttemptId: {}", taskAttemptId); + } + + if (pendingCommits == null || pendingCommits.isEmpty()) { + LOG.info("No commit data present for the taskAttemptId: {} in the memory", taskAttemptId); + return new ArrayList<>(); + } + + return pendingCommits; + } + /** * Abort a task. Attempt load then abort all pending files, * then try to delete the task attempt path. @@ -264,9 +325,14 @@ public void abortTask(TaskAttemptContext context) throws IOException { try (DurationInfo d = new DurationInfo(LOG, "Abort task %s", context.getTaskAttemptID()); CommitContext commitContext = initiateTaskOperation(context)) { - getCommitOperations().abortAllSinglePendingCommits(attemptPath, - commitContext, - true); + if (isTrackMagicCommitsInMemoryEnabled(context.getConfiguration())) { + List pendingCommits = loadPendingCommitsFromMemory(context); + for (SinglePendingCommit singleCommit : pendingCommits) { + commitContext.abortSingleCommit(singleCommit); + } + } else { + getCommitOperations().abortAllSinglePendingCommits(attemptPath, commitContext, true); + } } finally { deleteQuietly( attemptPath.getFileSystem(context.getConfiguration()), diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java new file mode 100644 index 00000000000000..c27abf66bfe226 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.magic; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.fs.s3a.S3ADataBlocks; +import org.apache.hadoop.fs.s3a.WriteOperationHelper; +import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; +import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; +import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; +import org.apache.hadoop.util.Preconditions; +import software.amazon.awssdk.services.s3.model.CompletedPart; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation; + +/** + * Stores the commit data under the magic path. + */ +public class S3MagicCommitTracker extends MagicCommitTracker { + + public S3MagicCommitTracker(Path path, + String bucket, + String originalDestKey, + String destKey, + String pendingsetKey, + WriteOperationHelper writer, + PutTrackerStatistics trackerStatistics) { + super(path, bucket, originalDestKey, destKey, pendingsetKey, writer, trackerStatistics); + } + + @Override + public boolean aboutToComplete(String uploadId, + List parts, + long bytesWritten, + final IOStatistics iostatistics) + throws IOException { + Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId), + "empty/null upload ID: "+ uploadId); + Preconditions.checkArgument(parts != null, + "No uploaded parts list"); + Preconditions.checkArgument(!parts.isEmpty(), + "No uploaded parts to save"); + + // put a 0-byte file with the name of the original under-magic path + // Add the final file length as a header + // this is done before the task commit, so its duration can be + // included in the statistics + Map headers = new HashMap<>(); + headers.put(X_HEADER_MAGIC_MARKER, Long.toString(bytesWritten)); + PutObjectRequest originalDestPut = getWriter().createPutObjectRequest( + getOriginalDestKey(), + 0, + new PutObjectOptions(true, null, headers), false); + upload(originalDestPut, new ByteArrayInputStream(EMPTY)); + + // build the commit summary + SinglePendingCommit commitData = new SinglePendingCommit(); + commitData.touch(System.currentTimeMillis()); + commitData.setDestinationKey(getDestKey()); + commitData.setBucket(getBucket()); + commitData.setUri(getPath().toUri().toString()); + commitData.setUploadId(uploadId); + commitData.setText(""); + commitData.setLength(bytesWritten); + commitData.bindCommitData(parts); + commitData.setIOStatistics( + new IOStatisticsSnapshot(iostatistics)); + + byte[] bytes = commitData.toBytes(SinglePendingCommit.serializer()); + LOG.info("Uncommitted data pending to file {};" + + " commit metadata for {} parts in {}. size: {} byte(s)", + getPath().toUri(), parts.size(), getPendingPartKey(), bytesWritten); + LOG.debug("Closed MPU to {}, saved commit information to {}; data=:\n{}", + getPath(), getPendingPartKey(), commitData); + PutObjectRequest put = getWriter().createPutObjectRequest( + getPendingPartKey(), + bytes.length, null, false); + upload(put, new ByteArrayInputStream(bytes)); + return false; + } + + /** + * PUT an object. + * @param request the request + * @param inputStream input stream of data to be uploaded + * @throws IOException on problems + */ + @Retries.RetryTranslated + private void upload(PutObjectRequest request, InputStream inputStream) throws IOException { + trackDurationOfInvocation(getTrackerStatistics(), COMMITTER_MAGIC_MARKER_PUT.getSymbol(), + () -> getWriter().putObject(request, PutObjectOptions.keepingDirs(), + new S3ADataBlocks.BlockUploadData(inputStream), false, null)); + } +} diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md index 4c14921c4b4aad..712866d8123052 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md @@ -362,6 +362,13 @@ the magic directory path rewriting is enabled by default. The Magic Committer has not been field tested to the extent of Netflix's committer; consider it the least mature of the committers. +When there are less number of files to be written, The Magic committer has an option to store the commit data in-memory which can speed up the TaskCommit operation as well as save S3 cost. This can be enabled by the following property +```xml + + fs.s3a.committer.magic.track.commits.in.memory.enabled + true + +``` ### Which Committer to Use? diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java index 67c88039aad1b5..3a7cceb2369ee0 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java @@ -82,6 +82,7 @@ import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID_SOURCE; import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.SPARK_WRITE_UUID; import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_TASKS_SUCCEEDED; +import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString; import static org.apache.hadoop.test.LambdaTestUtils.intercept; @@ -906,7 +907,14 @@ public void testCommitterWithDuplicatedCommit() throws Exception { assertNoMultipartUploadsPending(outDir); // commit task to fail on retry - expectFNFEonTaskCommit(committer, tContext); + // FNFE is not thrown in case of Magic committer when + // in memory commit data is enabled and hence skip the check. + boolean skipExpectFNFE = committer instanceof MagicS3GuardCommitter && + isTrackMagicCommitsInMemoryEnabled(tContext.getConfiguration()); + + if (!skipExpectFNFE) { + expectFNFEonTaskCommit(committer, tContext); + } } /** @@ -1422,7 +1430,10 @@ public void testOutputFormatIntegration() throws Throwable { validateTaskAttemptPathDuringWrite(dest, expectedLength, jobData.getCommitter().getUUID()); recordWriter.close(tContext); // at this point - validateTaskAttemptPathAfterWrite(dest, expectedLength); + // Skip validation when commit data is stored in memory + if (!isTrackMagicCommitsInMemoryEnabled(conf)) { + validateTaskAttemptPathAfterWrite(dest, expectedLength); + } assertTrue("Committer does not have data to commit " + committer, committer.needsTaskCommit(tContext)); commitTask(committer, tContext); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitTrackerUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitTrackerUtils.java new file mode 100644 index 00000000000000..a08f8d2d34b707 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitTrackerUtils.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.fs.s3a.commit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR; +import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.junit.Before; +import org.junit.Test; + +import static junit.framework.TestCase.assertEquals; +import static org.apache.hadoop.fs.s3a.commit.AbstractCommitITest.randomJobId; + +/** + * Class to test {@link MagicCommitTrackerUtils}. + */ +public final class TestMagicCommitTrackerUtils { + + private String jobId; + private String attemptId; + private TaskAttemptID taskAttemptId; + private static final Path DEST_PATH = new Path("s3://dummyBucket/dummyTable"); + + + @Before + public void setup() throws Exception { + jobId = randomJobId(); + attemptId = "attempt_" + jobId + "_m_000000_0"; + taskAttemptId = TaskAttemptID.forName(attemptId); + } + + @Test + public void testExtractTaskAttemptIdFromPath() { + TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl( + new Configuration(), + taskAttemptId); + Path path = CommitUtilsWithMR + .getBaseMagicTaskAttemptPath(taskAttemptContext, "00001", DEST_PATH); + assertEquals("TaskAttemptId didn't match", attemptId, + MagicCommitTrackerUtils.extractTaskAttemptIdFromPath(path)); + + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java index fa963a4b97064d..d18a4de2c476eb 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java @@ -20,8 +20,11 @@ import java.io.IOException; import java.net.URI; +import java.util.Arrays; +import java.util.Collection; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.assertj.core.api.Assertions; import org.junit.Test; @@ -39,6 +42,8 @@ import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static org.apache.hadoop.fs.s3a.S3AUtils.listAndFilter; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; @@ -48,8 +53,11 @@ /** * Test the magic committer's commit protocol. */ +@RunWith(Parameterized.class) public class ITestMagicCommitProtocol extends AbstractITCommitProtocol { + private final boolean trackCommitsInMemory; + @Override protected String suitename() { return "ITestMagicCommitProtocol"; @@ -71,6 +79,26 @@ public void setup() throws Exception { CommitUtils.verifyIsMagicCommitFS(getFileSystem()); } + @Parameterized.Parameters(name = "track-commit-in-memory-{0}") + public static Collection params() { + return Arrays.asList(new Object[][]{ + {false}, + {true} + }); + } + + public ITestMagicCommitProtocol(boolean trackCommitsInMemory) { + this.trackCommitsInMemory = trackCommitsInMemory; + } + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + conf.setBoolean(FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED, trackCommitsInMemory); + + return conf; + } + @Override public void assertJobAbortCleanedUp(JobData jobData) throws Exception {