Skip to content

Commit

Permalink
HADOOP-19047: Support InMemory Tracking Of S3A Magic Commits
Browse files Browse the repository at this point in the history
  • Loading branch information
shameersss1 committed Jan 26, 2024
1 parent da34ecd commit a5b27c5
Show file tree
Hide file tree
Showing 12 changed files with 577 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;

import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
Expand Down Expand Up @@ -235,6 +236,7 @@
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS;
import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_NO_OVERWRITE;
import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_OVERWRITE;
Expand Down Expand Up @@ -3906,6 +3908,21 @@ public void access(final Path f, final FsAction mode)
@Retries.RetryTranslated
public FileStatus getFileStatus(final Path f) throws IOException {
Path path = qualify(f);
if (isTrackMagicCommitsInMemoryEnabled(getConf()) && isMagicCommitPath(path)) {
// Some downstream apps might call getFileStatus for a magic path to get the file size.
// when commit data is stored in memory construct the dummy S3AFileStatus with correct
// file size fetched from the memory.
if (InMemoryMagicCommitTracker.getTaskAttemptIdToBytesWritten().containsKey(path)) {
long len = InMemoryMagicCommitTracker.getTaskAttemptIdToBytesWritten().get(path);
return new S3AFileStatus(len,
0L,
path,
getDefaultBlockSize(path),
username,
null,
null);
}
}
return trackDurationAndSpan(
INVOCATION_GET_FILE_STATUS, path, () ->
innerGetFileStatus(path, false, StatusProbeEnum.ALL));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,13 @@ private CommitConstants() {
*/
public static final int DEFAULT_COMMITTER_THREADS = 32;


public static final String FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED =
"fs.s3a.committer.magic.track.commits.in.memory.enabled";

public static final boolean FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT =
false;

/**
* Path in the cluster filesystem for temporary data: {@value}.
* This is for HDFS, not the local filesystem.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,19 @@

import java.util.List;

import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker;
import org.apache.hadoop.fs.s3a.commit.magic.S3MagicCommitTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTracker;
import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation;
import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;

import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*;
import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled;

/**
* Adds the code needed for S3A to support magic committers.
Expand Down Expand Up @@ -105,13 +107,15 @@ public PutTracker createTracker(Path path, String key,
String pendingsetPath = key + CommitConstants.PENDING_SUFFIX;
getStoreContext().incrementStatistic(
Statistic.COMMITTER_MAGIC_FILES_CREATED);
tracker = new MagicCommitTracker(path,
getStoreContext().getBucket(),
key,
destKey,
pendingsetPath,
owner.getWriteOperationHelper(),
trackerStatistics);
if (isTrackMagicCommitsInMemoryEnabled(getStoreContext().getConfiguration())) {
tracker = new InMemoryMagicCommitTracker(path, getStoreContext().getBucket(),
key, destKey, pendingsetPath, owner.getWriteOperationHelper(),
trackerStatistics);
} else {
tracker = new S3MagicCommitTracker(path, getStoreContext().getBucket(),
key, destKey, pendingsetPath, owner.getWriteOperationHelper(),
trackerStatistics);
}
LOG.debug("Created {}", tracker);
} else {
LOG.warn("File being created has a \"magic\" path, but the filesystem"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a.commit.magic;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import software.amazon.awssdk.services.s3.model.CompletedPart;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.extractTaskAttemptIdFromPath;

/**
* InMemoryMagicCommitTracker stores the commit data in memory.
* The commit data and related data stores are flushed out from
* the memory when the task is committed or aborted.
*/
public class InMemoryMagicCommitTracker extends MagicCommitTracker {

// stores taskAttemptId to commit data mapping
private static Map<String, List<SinglePendingCommit>>
taskAttemptIdToMpuMetdadataMap = new ConcurrentHashMap<>();

// stores the path to its length/size mapping
private static Map<Path, Long> taskAttemptIdToBytesWritten = new ConcurrentHashMap<>();

// stores taskAttemptId to path mapping
private static Map<String, List<Path>> taskAttemptIdToPath = new ConcurrentHashMap<>();

public InMemoryMagicCommitTracker(Path path,
String bucket,
String originalDestKey,
String destKey,
String pendingsetKey,
WriteOperationHelper writer,
PutTrackerStatistics trackerStatistics) {
super(path, bucket, originalDestKey, destKey, pendingsetKey, writer, trackerStatistics);
}

@Override
public boolean aboutToComplete(String uploadId,
List<CompletedPart> parts,
long bytesWritten,
final IOStatistics iostatistics)
throws IOException {
Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId),
"empty/null upload ID: " + uploadId);
Preconditions.checkArgument(parts != null, "No uploaded parts list");
Preconditions.checkArgument(!parts.isEmpty(), "No uploaded parts to save");

// build the commit summary
SinglePendingCommit commitData = new SinglePendingCommit();
commitData.touch(System.currentTimeMillis());
commitData.setDestinationKey(getDestKey());
commitData.setBucket(getBucket());
commitData.setUri(getPath().toUri().toString());
commitData.setUploadId(uploadId);
commitData.setText("");
commitData.setLength(bytesWritten);
commitData.bindCommitData(parts);
commitData.setIOStatistics(new IOStatisticsSnapshot(iostatistics));

// extract the taskAttemptId from the path
String taskAttemptId = extractTaskAttemptIdFromPath(getPath());

// store the commit data with taskAttemptId as the key
taskAttemptIdToMpuMetdadataMap.computeIfAbsent(taskAttemptId,
k -> Collections.synchronizedList(new ArrayList<>())).add(commitData);

// store the byteswritten(length) for the corresponding file
taskAttemptIdToBytesWritten.put(getPath(), bytesWritten);

// store the mapping between taskAttemptId and path
// This information is used for removing entries from
// the map once the taskAttempt is completed/committed.
taskAttemptIdToPath.computeIfAbsent(taskAttemptId,
k -> Collections.synchronizedList(new ArrayList<>())).add(getPath());

LOG.info("commit metadata for {} parts in {}. size: {} byte(s) "
+ "for the taskAttemptId: {} is stored in memory",
parts.size(), getPendingPartKey(), bytesWritten, taskAttemptId);
LOG.debug("Closed MPU to {}, saved commit information to {}; data=:\n{}",
getPath(), getPendingPartKey(), commitData);

return false;
}

public static Map<String, List<SinglePendingCommit>> getTaskAttemptIdToMpuMetdadataMap() {
return taskAttemptIdToMpuMetdadataMap;
}

public static Map<Path, Long> getTaskAttemptIdToBytesWritten() {
return taskAttemptIdToBytesWritten;
}

public static Map<String, List<Path>> getTaskAttemptIdToPath() {
return taskAttemptIdToPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,22 @@

package org.apache.hadoop.fs.s3a.commit.magic;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.fs.s3a.S3ADataBlocks;
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
import org.apache.hadoop.fs.s3a.commit.PutTracker;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.util.Preconditions;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;

/**
* Put tracker for Magic commits.
Expand All @@ -65,7 +50,7 @@ public class MagicCommitTracker extends PutTracker {
private final Path path;
private final WriteOperationHelper writer;
private final String bucket;
private static final byte[] EMPTY = new byte[0];
protected static final byte[] EMPTY = new byte[0];
private final PutTrackerStatistics trackerStatistics;

/**
Expand Down Expand Up @@ -118,76 +103,21 @@ public boolean outputImmediatelyVisible() {

/**
* Complete operation: generate the final commit data, put it.
* @param uploadId Upload ID
* @param parts list of parts
*
* @param uploadId Upload ID
* @param parts list of parts
* @param bytesWritten bytes written
* @param iostatistics nullable IO statistics
* @return false, indicating that the commit must fail.
* @throws IOException any IO problem.
* @throws IOException any IO problem.
* @throws IllegalArgumentException bad argument
*/
@Override
public boolean aboutToComplete(String uploadId,
List<CompletedPart> parts,
long bytesWritten,
final IOStatistics iostatistics)
throws IOException {
Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId),
"empty/null upload ID: "+ uploadId);
Preconditions.checkArgument(parts != null,
"No uploaded parts list");
Preconditions.checkArgument(!parts.isEmpty(),
"No uploaded parts to save");

// put a 0-byte file with the name of the original under-magic path
// Add the final file length as a header
// this is done before the task commit, so its duration can be
// included in the statistics
Map<String, String> headers = new HashMap<>();
headers.put(X_HEADER_MAGIC_MARKER, Long.toString(bytesWritten));
PutObjectRequest originalDestPut = writer.createPutObjectRequest(
originalDestKey,
0,
new PutObjectOptions(true, null, headers), false);
upload(originalDestPut, new ByteArrayInputStream(EMPTY));

// build the commit summary
SinglePendingCommit commitData = new SinglePendingCommit();
commitData.touch(System.currentTimeMillis());
commitData.setDestinationKey(getDestKey());
commitData.setBucket(bucket);
commitData.setUri(path.toUri().toString());
commitData.setUploadId(uploadId);
commitData.setText("");
commitData.setLength(bytesWritten);
commitData.bindCommitData(parts);
commitData.setIOStatistics(
new IOStatisticsSnapshot(iostatistics));

byte[] bytes = commitData.toBytes(SinglePendingCommit.serializer());
LOG.info("Uncommitted data pending to file {};"
+ " commit metadata for {} parts in {}. size: {} byte(s)",
path.toUri(), parts.size(), pendingPartKey, bytesWritten);
LOG.debug("Closed MPU to {}, saved commit information to {}; data=:\n{}",
path, pendingPartKey, commitData);
PutObjectRequest put = writer.createPutObjectRequest(
pendingPartKey,
bytes.length, null, false);
upload(put, new ByteArrayInputStream(bytes));
final IOStatistics iostatistics) throws IOException {
return false;

}
/**
* PUT an object.
* @param request the request
* @param inputStream input stream of data to be uploaded
* @throws IOException on problems
*/
@Retries.RetryTranslated
private void upload(PutObjectRequest request, InputStream inputStream) throws IOException {
trackDurationOfInvocation(trackerStatistics, COMMITTER_MAGIC_MARKER_PUT.getSymbol(),
() -> writer.putObject(request, PutObjectOptions.keepingDirs(),
new S3ADataBlocks.BlockUploadData(inputStream), false, null));
}

@Override
Expand All @@ -201,4 +131,28 @@ public String toString() {
sb.append('}');
return sb.toString();
}

public String getOriginalDestKey() {
return originalDestKey;
}

public String getPendingPartKey() {
return pendingPartKey;
}

public Path getPath() {
return path;
}

public String getBucket() {
return bucket;
}

public WriteOperationHelper getWriter() {
return writer;
}

public PutTrackerStatistics getTrackerStatistics() {
return trackerStatistics;
}
}
Loading

0 comments on commit a5b27c5

Please sign in to comment.