Skip to content

Commit

Permalink
HADOOP-19047: Support InMemory Tracking Of S3A Magic Commits
Browse files Browse the repository at this point in the history
  • Loading branch information
shameersss1 committed Jan 19, 2024
1 parent f6fea5d commit 3d0489f
Show file tree
Hide file tree
Showing 7 changed files with 349 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;

import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker;
import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTracker;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
Expand Down Expand Up @@ -3896,6 +3898,19 @@ public void access(final Path f, final FsAction mode)
@Retries.RetryTranslated
public FileStatus getFileStatus(final Path f) throws IOException {
Path path = qualify(f);
//TODO: Check if inmemory is enabled or not
if (isMagicCommitPath(path)) {
if (InMemoryMagicCommitTracker.taskAttemptIdToBytesWritten.containsKey(path)) {
long len = InMemoryMagicCommitTracker.taskAttemptIdToBytesWritten.get(path);
return new S3AFileStatus(len,
0L,
path,
getDefaultBlockSize(path),
username,
null,
null);
}
}
return trackDurationAndSpan(
INVOCATION_GET_FILE_STATUS, path, () ->
innerGetFileStatus(path, false, StatusProbeEnum.ALL));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,12 @@ private CommitConstants() {
*/
public static final int DEFAULT_COMMITTER_THREADS = 32;


public static final String FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED =
"fs.s3a.committer.magic.track.commits.in.memory.enabled";

public static final boolean FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT = false;

/**
* Path in the cluster filesystem for temporary data: {@value}.
* This is for HDFS, not the local filesystem.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.util.List;

import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker;
import org.apache.hadoop.fs.s3a.commit.magic.S3MagicCommitTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -105,13 +107,18 @@ public PutTracker createTracker(Path path, String key,
String pendingsetPath = key + CommitConstants.PENDING_SUFFIX;
getStoreContext().incrementStatistic(
Statistic.COMMITTER_MAGIC_FILES_CREATED);
tracker = new MagicCommitTracker(path,
getStoreContext().getBucket(),
key,
destKey,
pendingsetPath,
owner.getWriteOperationHelper(),
trackerStatistics);
boolean trackMagicCommitsInMemory = getStoreContext().getConfiguration().getBoolean(
CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED,
CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT);
if (trackMagicCommitsInMemory) {
tracker = new InMemoryMagicCommitTracker(path, getStoreContext().getBucket(),
key, destKey, pendingsetPath, owner.getWriteOperationHelper(),
trackerStatistics);
} else {
tracker = new S3MagicCommitTracker(path, getStoreContext().getBucket(),
key, destKey, pendingsetPath, owner.getWriteOperationHelper(),
trackerStatistics);
}
LOG.debug("Created {}", tracker);
} else {
LOG.warn("File being created has a \"magic\" path, but the filesystem"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a.commit.magic;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
import org.apache.hadoop.fs.s3a.commit.MagicCommitPaths;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import software.amazon.awssdk.services.s3.model.CompletedPart;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.hadoop.util.Preconditions.checkArgument;

public class InMemoryMagicCommitTracker extends MagicCommitTracker {

// stores taskAttemptId to commits mapping
public static Map<String, List<SinglePendingCommit>> taskAttemptIdToMpuMetdadataMap
= new ConcurrentHashMap<>();

// stores the path to its length in map
public static Map<Path, Long> taskAttemptIdToBytesWritten
= new ConcurrentHashMap<>();

// stores taskAttemptId to Path map
public static Map<String, List<Path>> taskAttemptIdToPath = new ConcurrentHashMap();

public InMemoryMagicCommitTracker(Path path,
String bucket,
String originalDestKey,
String destKey,
String pendingsetKey,
WriteOperationHelper writer,
PutTrackerStatistics trackerStatistics) {
super(path, bucket, originalDestKey, destKey, pendingsetKey, writer, trackerStatistics);
}

@Override
public boolean aboutToComplete(String uploadId,
List<CompletedPart> parts,
long bytesWritten,
final IOStatistics iostatistics)
throws IOException {
Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId), "empty/null upload ID: " + uploadId);
Preconditions.checkArgument(parts != null, "No uploaded parts list");
Preconditions.checkArgument(!parts.isEmpty(), "No uploaded parts to save");

// build the commit summary
SinglePendingCommit commitData = new SinglePendingCommit();
commitData.touch(System.currentTimeMillis());
commitData.setDestinationKey(getDestKey());
commitData.setBucket(bucket);
commitData.setUri(path.toUri().toString());
commitData.setUploadId(uploadId);
commitData.setText("");
commitData.setLength(bytesWritten);
commitData.bindCommitData(parts);
commitData.setIOStatistics(new IOStatisticsSnapshot(iostatistics));

// extract the taskAttemptId from the path
String taskAttemptId = extractTaskAttemptIdFromPath(path);

// store the commit data with taskAttemptId as the key
taskAttemptIdToMpuMetdadataMap.computeIfAbsent(taskAttemptId,
k -> new ArrayList<>()).add(commitData);

// store the byteswritten for the corresponding file
taskAttemptIdToBytesWritten.put(path, bytesWritten);

// store the mapping between taskAttemptId and path
// This information is used for removing entries from
// the map once the taskAttempt is completed/committed.
taskAttemptIdToPath.computeIfAbsent(taskAttemptId,
k -> new ArrayList<>()).add(path);

return false;
}

/**
* The magic path is of the format, "s3://bucket-name/table-path/__magic_<jobId>/job-id/tasks/taskAttemptId/__base/..."
* So the third child from the "__magic" path will give the task attempt id.
* @param path
* @return taskAttemptId
*/
private static String extractTaskAttemptIdFromPath(Path path) {
List<String> elementsInPath = MagicCommitPaths.splitPathToElements(path);
List<String> childrenOfMagicPath = MagicCommitPaths.magicPathChildren(elementsInPath);

// 3rd child of the magic path is the taskAttemptId
checkArgument(childrenOfMagicPath.size() >= 3, "Magic Path is invalid");
return childrenOfMagicPath.get(3);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ public class MagicCommitTracker extends PutTracker {
public static final Logger LOG = LoggerFactory.getLogger(
MagicCommitTracker.class);

private final String originalDestKey;
private final String pendingPartKey;
private final Path path;
private final WriteOperationHelper writer;
private final String bucket;
private static final byte[] EMPTY = new byte[0];
private final PutTrackerStatistics trackerStatistics;
protected final String originalDestKey;
protected final String pendingPartKey;
protected final Path path;
protected final WriteOperationHelper writer;
protected final String bucket;
protected static final byte[] EMPTY = new byte[0];
protected final PutTrackerStatistics trackerStatistics;

/**
* Magic commit tracker.
Expand Down Expand Up @@ -118,76 +118,21 @@ public boolean outputImmediatelyVisible() {

/**
* Complete operation: generate the final commit data, put it.
* @param uploadId Upload ID
* @param parts list of parts
*
* @param uploadId Upload ID
* @param parts list of parts
* @param bytesWritten bytes written
* @param iostatistics nullable IO statistics
* @return false, indicating that the commit must fail.
* @throws IOException any IO problem.
* @throws IOException any IO problem.
* @throws IllegalArgumentException bad argument
*/
@Override
public boolean aboutToComplete(String uploadId,
List<CompletedPart> parts,
long bytesWritten,
final IOStatistics iostatistics)
throws IOException {
Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId),
"empty/null upload ID: "+ uploadId);
Preconditions.checkArgument(parts != null,
"No uploaded parts list");
Preconditions.checkArgument(!parts.isEmpty(),
"No uploaded parts to save");

// put a 0-byte file with the name of the original under-magic path
// Add the final file length as a header
// this is done before the task commit, so its duration can be
// included in the statistics
Map<String, String> headers = new HashMap<>();
headers.put(X_HEADER_MAGIC_MARKER, Long.toString(bytesWritten));
PutObjectRequest originalDestPut = writer.createPutObjectRequest(
originalDestKey,
0,
new PutObjectOptions(true, null, headers), false);
upload(originalDestPut, new ByteArrayInputStream(EMPTY));

// build the commit summary
SinglePendingCommit commitData = new SinglePendingCommit();
commitData.touch(System.currentTimeMillis());
commitData.setDestinationKey(getDestKey());
commitData.setBucket(bucket);
commitData.setUri(path.toUri().toString());
commitData.setUploadId(uploadId);
commitData.setText("");
commitData.setLength(bytesWritten);
commitData.bindCommitData(parts);
commitData.setIOStatistics(
new IOStatisticsSnapshot(iostatistics));

byte[] bytes = commitData.toBytes(SinglePendingCommit.serializer());
LOG.info("Uncommitted data pending to file {};"
+ " commit metadata for {} parts in {}. size: {} byte(s)",
path.toUri(), parts.size(), pendingPartKey, bytesWritten);
LOG.debug("Closed MPU to {}, saved commit information to {}; data=:\n{}",
path, pendingPartKey, commitData);
PutObjectRequest put = writer.createPutObjectRequest(
pendingPartKey,
bytes.length, null, false);
upload(put, new ByteArrayInputStream(bytes));
final IOStatistics iostatistics) throws IOException {
return false;

}
/**
* PUT an object.
* @param request the request
* @param inputStream input stream of data to be uploaded
* @throws IOException on problems
*/
@Retries.RetryTranslated
private void upload(PutObjectRequest request, InputStream inputStream) throws IOException {
trackDurationOfInvocation(trackerStatistics, COMMITTER_MAGIC_MARKER_PUT.getSymbol(),
() -> writer.putObject(request, PutObjectOptions.keepingDirs(),
new S3ADataBlocks.BlockUploadData(inputStream), false, null));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.hadoop.fs.s3a.commit.magic;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import org.slf4j.Logger;
Expand Down Expand Up @@ -192,23 +194,9 @@ public void commitTask(TaskAttemptContext context) throws IOException {
*/
private PendingSet innerCommitTask(
TaskAttemptContext context) throws IOException {
Path taskAttemptPath = getTaskAttemptPath(context);
// load in all pending commits.
CommitOperations actions = getCommitOperations();
PendingSet pendingSet;
PendingSet pendingSet = loadPendingCommits(context);
try (CommitContext commitContext = initiateTaskOperation(context)) {
Pair<PendingSet, List<Pair<LocatedFileStatus, IOException>>>
loaded = actions.loadSinglePendingCommits(
taskAttemptPath, true, commitContext);
pendingSet = loaded.getKey();
List<Pair<LocatedFileStatus, IOException>> failures = loaded.getValue();
if (!failures.isEmpty()) {
// At least one file failed to load
// revert all which did; report failure with first exception
LOG.error("At least one commit file could not be read: failing");
abortPendingUploads(commitContext, pendingSet.getCommits(), true);
throw failures.get(0).getValue();
}
// patch in IDs
String jobId = getUUID();
String taskId = String.valueOf(context.getTaskAttemptID());
Expand Down Expand Up @@ -248,6 +236,66 @@ private PendingSet innerCommitTask(
return pendingSet;
}

/**
* Loads pending commits from either inmemory data store or from the
* S3 path based on the configuration.
* @param context
* @return
* @throws IOException
*/
private PendingSet loadPendingCommits(TaskAttemptContext context) throws IOException {
PendingSet pendingSet = new PendingSet();
boolean trackMagicCommitsInMemory = context.getConfiguration()
.getBoolean(CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED,
CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT);
if (trackMagicCommitsInMemory) {
String taskAttemptId = String.valueOf(context.getTaskAttemptID());
// get all the pending commit metadata associated with the taskAttemptId.
// This will also remove the entry from the map.
List<SinglePendingCommit> pendingCommits =
InMemoryMagicCommitTracker.taskAttemptIdToMpuMetdadataMap.remove(taskAttemptId);
// get all the path/files associated with the taskAttemptId.
// This will also remove the entry from the map.
List<Path> pathsAssociatedWithTaskAttemptId =
InMemoryMagicCommitTracker.taskAttemptIdToPath.remove(taskAttemptId);
// for each of the path remove the entry from map,
// This is done so that there is no memory leak.
for (Path path : pathsAssociatedWithTaskAttemptId) {
InMemoryMagicCommitTracker.taskAttemptIdToBytesWritten.remove(path);
}

for (SinglePendingCommit singleCommit : pendingCommits) {
// aggregate stats
pendingSet.getIOStatistics()
.aggregate(singleCommit.getIOStatistics());
// then clear so they aren't marshalled again.
singleCommit.getIOStatistics().clear();
}
if (pendingCommits != null && !pendingCommits.isEmpty()) {
pendingSet.setCommits(pendingCommits);
} else {
pendingSet.setCommits(new ArrayList<>());
}
} else {
CommitOperations actions = getCommitOperations();
Path taskAttemptPath = getTaskAttemptPath(context);
try (CommitContext commitContext = initiateTaskOperation(context)) {
Pair<PendingSet, List<Pair<LocatedFileStatus, IOException>>> loaded =
actions.loadSinglePendingCommits(taskAttemptPath, true, commitContext);
pendingSet = loaded.getKey();
List<Pair<LocatedFileStatus, IOException>> failures = loaded.getValue();
if (!failures.isEmpty()) {
// At least one file failed to load
// revert all which did; report failure with first exception
LOG.error("At least one commit file could not be read: failing");
abortPendingUploads(commitContext, pendingSet.getCommits(), true);
throw failures.get(0).getValue();
}
}
}
return pendingSet;
}

/**
* Abort a task. Attempt load then abort all pending files,
* then try to delete the task attempt path.
Expand Down
Loading

0 comments on commit 3d0489f

Please sign in to comment.