Skip to content

Commit

Permalink
HADOOP-17414. Magic committer files don't have the count of bytes wri…
Browse files Browse the repository at this point in the history
…tten collected by spark (#2530)


This needs SPARK-33739 in the matching spark branch in order to work

Contributed by Steve Loughran.
  • Loading branch information
steveloughran authored Jan 26, 2021
1 parent e2a7008 commit 80c7404
Show file tree
Hide file tree
Showing 27 changed files with 1,391 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.fs.statistics;

import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTracker;

/**
* Interface for a source of duration tracking.
*
Expand All @@ -36,12 +38,16 @@ public interface DurationTrackerFactory {
* by the given count.
*
* The expected use is within a try-with-resources clause.
*
* The default implementation returns a stub duration tracker.
* @param key statistic key prefix
* @param count #of times to increment the matching counter in this
* operation.
* @return an object to close after an operation completes.
*/
DurationTracker trackDuration(String key, long count);
default DurationTracker trackDuration(String key, long count) {
return stubDurationTracker();
}

/**
* Initiate a duration tracking operation by creating/returning
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,24 @@ public final class StoreStatisticNames {
/** {@value}. */
public static final String OP_TRUNCATE = "op_truncate";

/* The XAttr API */

/** Invoke {@code getXAttrs(Path path)}: {@value}. */
public static final String OP_XATTR_GET_MAP = "op_xattr_get_map";

/** Invoke {@code getXAttr(Path, String)}: {@value}. */
public static final String OP_XATTR_GET_NAMED = "op_xattr_get_named";

/**
* Invoke {@code getXAttrs(Path path, List<String> names)}: {@value}.
*/
public static final String OP_XATTR_GET_NAMED_MAP =
"op_xattr_get_named_map";

/** Invoke {@code listXAttrs(Path path)}: {@value}. */
public static final String OP_XATTR_LIST = "op_xattr_list";


/** {@value}. */
public static final String DELEGATION_TOKENS_ISSUED
= "delegation_tokens_issued";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1048,4 +1048,10 @@ private Constants() {
public static final String STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE
= "fs.s3a.capability.directory.marker.action.delete";

/**
* To comply with the XAttr rules, all headers of the object retrieved
* through the getXAttr APIs have the prefix: {@value}.
*/
public static final String XA_HEADER_PREFIX = "header.";

}
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ private int putObject() throws IOException {
final PutObjectRequest putObjectRequest = uploadData.hasFile() ?
writeOperationHelper.createPutObjectRequest(key, uploadData.getFile())
: writeOperationHelper.createPutObjectRequest(key,
uploadData.getUploadStream(), size);
uploadData.getUploadStream(), size, null);
BlockUploadProgress callback =
new BlockUploadProgress(
block, progressListener, now());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import org.apache.hadoop.fs.s3a.impl.DeleteOperation;
import org.apache.hadoop.fs.s3a.impl.DirectoryPolicy;
import org.apache.hadoop.fs.s3a.impl.DirectoryPolicyImpl;
import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport;
Expand Down Expand Up @@ -330,6 +331,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private DirectoryPolicy directoryPolicy;

/**
* Header processing for XAttr.
*/
private HeaderProcessing headerProcessing;

/**
* Context accessors for re-use.
*/
Expand Down Expand Up @@ -456,6 +462,8 @@ public void initialize(URI name, Configuration originalConf)
magicCommitterEnabled ? "is" : "is not");
committerIntegration = new MagicCommitIntegration(
this, magicCommitterEnabled);
// header processing for rename and magic committer
headerProcessing = new HeaderProcessing(createStoreContext());

// instantiate S3 Select support
selectBinding = new SelectBinding(writeHelper);
Expand Down Expand Up @@ -1781,14 +1789,15 @@ public boolean allowAuthoritative(final Path p) {

/**
* Low-level call to get at the object metadata.
* @param path path to the object
* @param path path to the object. This will be qualified.
* @return metadata
* @throws IOException IO and object access problems.
*/
@VisibleForTesting
@Retries.RetryTranslated
public ObjectMetadata getObjectMetadata(Path path) throws IOException {
return getObjectMetadata(path, null, invoker, null);
return getObjectMetadata(makeQualified(path), null, invoker,
"getObjectMetadata");
}

/**
Expand All @@ -1800,31 +1809,17 @@ public ObjectMetadata getObjectMetadata(Path path) throws IOException {
* @return metadata
* @throws IOException IO and object access problems.
*/
@VisibleForTesting
@Retries.RetryTranslated
public ObjectMetadata getObjectMetadata(Path path,
private ObjectMetadata getObjectMetadata(Path path,
ChangeTracker changeTracker, Invoker changeInvoker, String operation)
throws IOException {
checkNotClosed();
return once("getObjectMetadata", path.toString(),
String key = pathToKey(path);
return once(operation, path.toString(),
() ->
// this always does a full HEAD to the object
getObjectMetadata(
pathToKey(path), changeTracker, changeInvoker, operation));
}

/**
* Get all the headers of the object of a path, if the object exists.
* @param path path to probe
* @return an immutable map of object headers.
* @throws IOException failure of the query
*/
@Retries.RetryTranslated
public Map<String, Object> getObjectHeaders(Path path) throws IOException {
LOG.debug("getObjectHeaders({})", path);
checkNotClosed();
incrementReadOperations();
return getObjectMetadata(path).getRawMetadata();
key, changeTracker, changeInvoker, operation));
}

/**
Expand Down Expand Up @@ -2021,7 +2016,7 @@ protected DurationTrackerFactory getDurationTrackerFactory() {
@Retries.RetryRaw
@VisibleForTesting
ObjectMetadata getObjectMetadata(String key) throws IOException {
return getObjectMetadata(key, null, invoker,null);
return getObjectMetadata(key, null, invoker, "getObjectMetadata");
}

/**
Expand Down Expand Up @@ -4099,59 +4094,8 @@ public int read() throws IOException {
* @return a copy of {@link ObjectMetadata} with only relevant attributes
*/
private ObjectMetadata cloneObjectMetadata(ObjectMetadata source) {
// This approach may be too brittle, especially if
// in future there are new attributes added to ObjectMetadata
// that we do not explicitly call to set here
ObjectMetadata ret = newObjectMetadata(source.getContentLength());

// Possibly null attributes
// Allowing nulls to pass breaks it during later use
if (source.getCacheControl() != null) {
ret.setCacheControl(source.getCacheControl());
}
if (source.getContentDisposition() != null) {
ret.setContentDisposition(source.getContentDisposition());
}
if (source.getContentEncoding() != null) {
ret.setContentEncoding(source.getContentEncoding());
}
if (source.getContentMD5() != null) {
ret.setContentMD5(source.getContentMD5());
}
if (source.getContentType() != null) {
ret.setContentType(source.getContentType());
}
if (source.getExpirationTime() != null) {
ret.setExpirationTime(source.getExpirationTime());
}
if (source.getExpirationTimeRuleId() != null) {
ret.setExpirationTimeRuleId(source.getExpirationTimeRuleId());
}
if (source.getHttpExpiresDate() != null) {
ret.setHttpExpiresDate(source.getHttpExpiresDate());
}
if (source.getLastModified() != null) {
ret.setLastModified(source.getLastModified());
}
if (source.getOngoingRestore() != null) {
ret.setOngoingRestore(source.getOngoingRestore());
}
if (source.getRestoreExpirationTime() != null) {
ret.setRestoreExpirationTime(source.getRestoreExpirationTime());
}
if (source.getSSEAlgorithm() != null) {
ret.setSSEAlgorithm(source.getSSEAlgorithm());
}
if (source.getSSECustomerAlgorithm() != null) {
ret.setSSECustomerAlgorithm(source.getSSECustomerAlgorithm());
}
if (source.getSSECustomerKeyMd5() != null) {
ret.setSSECustomerKeyMd5(source.getSSECustomerKeyMd5());
}

for (Map.Entry<String, String> e : source.getUserMetadata().entrySet()) {
ret.addUserMetadata(e.getKey(), e.getValue());
}
getHeaderProcessing().cloneObjectMetadata(source, ret);
return ret;
}

Expand Down Expand Up @@ -4382,6 +4326,37 @@ public EtagChecksum getFileChecksum(Path f, final long length)
}
}

/**
* Get header processing support.
* @return the header processing of this instance.
*/
private HeaderProcessing getHeaderProcessing() {
return headerProcessing;
}

@Override
public byte[] getXAttr(final Path path, final String name)
throws IOException {
return getHeaderProcessing().getXAttr(path, name);
}

@Override
public Map<String, byte[]> getXAttrs(final Path path) throws IOException {
return getHeaderProcessing().getXAttrs(path);
}

@Override
public Map<String, byte[]> getXAttrs(final Path path,
final List<String> names)
throws IOException {
return getHeaderProcessing().getXAttrs(path, names);
}

@Override
public List<String> listXAttrs(final Path path) throws IOException {
return getHeaderProcessing().listXAttrs(path);
}

/**
* {@inheritDoc}.
*
Expand Down Expand Up @@ -5088,5 +5063,11 @@ public Path makeQualified(final Path path) {
return S3AFileSystem.this.makeQualified(path);
}

@Override
public ObjectMetadata getObjectMetadata(final String key)
throws IOException {
return once("getObjectMetadata", key, () ->
S3AFileSystem.this.getObjectMetadata(key));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AInstrumentation implements Closeable, MetricsSource,
CountersAndGauges, IOStatisticsSource, DurationTrackerFactory {
CountersAndGauges, IOStatisticsSource {
private static final Logger LOG = LoggerFactory.getLogger(
S3AInstrumentation.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,24 @@ public enum Statistic {
"Calls of rename()",
TYPE_COUNTER),

/* The XAttr API metrics are all durations */
INVOCATION_XATTR_GET_MAP(
StoreStatisticNames.OP_XATTR_GET_MAP,
"Calls of getXAttrs(Path path)",
TYPE_DURATION),
INVOCATION_XATTR_GET_NAMED(
StoreStatisticNames.OP_XATTR_GET_NAMED,
"Calls of getXAttr(Path, String)",
TYPE_DURATION),
INVOCATION_XATTR_GET_NAMED_MAP(
StoreStatisticNames.OP_XATTR_GET_NAMED_MAP,
"Calls of xattr()",
TYPE_DURATION),
INVOCATION_OP_XATTR_LIST(
StoreStatisticNames.OP_XATTR_LIST,
"Calls of getXAttrs(Path path, List<String> names)",
TYPE_DURATION),

/* Object IO */
OBJECT_COPY_REQUESTS(StoreStatisticNames.OBJECT_COPY_REQUESTS,
"Object copy requests",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import com.amazonaws.services.s3.model.AmazonS3Exception;
Expand Down Expand Up @@ -172,12 +173,19 @@ public <T> T retry(String action,
* @param destKey destination key
* @param inputStream source data.
* @param length size, if known. Use -1 for not known
* @param headers optional map of custom headers.
* @return the request
*/
public PutObjectRequest createPutObjectRequest(String destKey,
InputStream inputStream, long length) {
InputStream inputStream,
long length,
final Map<String, String> headers) {
ObjectMetadata objectMetadata = newObjectMetadata(length);
if (headers != null) {
objectMetadata.setUserMetadata(headers);
}
return owner.newPutObjectRequest(destKey,
newObjectMetadata(length),
objectMetadata,
inputStream);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
Expand Down Expand Up @@ -77,10 +78,13 @@ <T> T retry(String action,
* @param destKey destination key
* @param inputStream source data.
* @param length size, if known. Use -1 for not known
* @param headers optional map of custom headers.
* @return the request
*/
PutObjectRequest createPutObjectRequest(String destKey,
InputStream inputStream, long length);
InputStream inputStream,
long length,
@Nullable Map<String, String> headers);

/**
* Create a {@link PutObjectRequest} request to upload a file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

import static org.apache.hadoop.fs.s3a.Constants.XA_HEADER_PREFIX;
import static org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory.COMMITTER_FACTORY_SCHEME_PATTERN;

/**
Expand Down Expand Up @@ -316,4 +317,17 @@ private CommitConstants() {
public static final boolean DEFAULT_S3A_COMMITTER_GENERATE_UUID =
false;

/**
* Magic Marker header to declare final file length on magic uploads
* marker objects: {@value}.
*/
public static final String X_HEADER_MAGIC_MARKER =
"x-hadoop-s3a-magic-data-length";

/**
* XAttr name of magic marker, with "header." prefix: {@value}.
*/
public static final String XA_MAGIC_MARKER = XA_HEADER_PREFIX
+ X_HEADER_MAGIC_MARKER;

}
Loading

0 comments on commit 80c7404

Please sign in to comment.