From 80c7404b519da8d7d69be4c01eb84dd2c08d80a5 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 26 Jan 2021 19:30:51 +0000 Subject: [PATCH] HADOOP-17414. Magic committer files don't have the count of bytes written collected by spark (#2530) This needs SPARK-33739 in the matching spark branch in order to work Contributed by Steve Loughran. --- .../fs/statistics/DurationTrackerFactory.java | 8 +- .../fs/statistics/StoreStatisticNames.java | 18 + .../org/apache/hadoop/fs/s3a/Constants.java | 6 + .../hadoop/fs/s3a/S3ABlockOutputStream.java | 2 +- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 127 ++--- .../hadoop/fs/s3a/S3AInstrumentation.java | 2 +- .../org/apache/hadoop/fs/s3a/Statistic.java | 18 + .../hadoop/fs/s3a/WriteOperationHelper.java | 12 +- .../apache/hadoop/fs/s3a/WriteOperations.java | 6 +- .../hadoop/fs/s3a/commit/CommitConstants.java | 14 + .../fs/s3a/commit/CommitOperations.java | 26 + .../s3a/commit/magic/MagicCommitTracker.java | 23 +- .../hadoop/fs/s3a/impl/ContextAccessors.java | 13 + .../hadoop/fs/s3a/impl/HeaderProcessing.java | 500 ++++++++++++++++++ .../fs/s3a/statistics/CountersAndGauges.java | 3 +- .../impl/BondedS3AStatisticsContext.java | 6 + .../hadoop-aws/committer_architecture.md | 10 + .../markdown/tools/hadoop-aws/committers.md | 1 + .../hadoop/fs/s3a/ITestS3AMiscOperations.java | 8 + .../fs/s3a/ITestS3ARemoteFileChanged.java | 4 +- .../s3a/commit/AbstractITCommitProtocol.java | 13 +- .../fs/s3a/commit/ITestCommitOperations.java | 83 ++- .../magic/ITestMagicCommitProtocol.java | 41 +- .../ITestStagingCommitProtocol.java | 13 +- .../hadoop/fs/s3a/impl/ITestXAttrCost.java | 219 ++++++++ .../fs/s3a/impl/TestHeaderProcessing.java | 313 +++++++++++ .../s3a/impl/TestPartialDeleteFailures.java | 10 +- 27 files changed, 1391 insertions(+), 108 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestXAttrCost.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestHeaderProcessing.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/DurationTrackerFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/DurationTrackerFactory.java index b1d87c9100f95..641d7e8368bb1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/DurationTrackerFactory.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/DurationTrackerFactory.java @@ -18,6 +18,8 @@ package org.apache.hadoop.fs.statistics; +import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTracker; + /** * Interface for a source of duration tracking. * @@ -36,12 +38,16 @@ public interface DurationTrackerFactory { * by the given count. * * The expected use is within a try-with-resources clause. + * + * The default implementation returns a stub duration tracker. * @param key statistic key prefix * @param count #of times to increment the matching counter in this * operation. * @return an object to close after an operation completes. */ - DurationTracker trackDuration(String key, long count); + default DurationTracker trackDuration(String key, long count) { + return stubDurationTracker(); + } /** * Initiate a duration tracking operation by creating/returning diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java index 4baf37d10fd77..0dd6540dc02a1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java @@ -130,6 +130,24 @@ public final class StoreStatisticNames { /** {@value}. */ public static final String OP_TRUNCATE = "op_truncate"; + /* The XAttr API */ + + /** Invoke {@code getXAttrs(Path path)}: {@value}. */ + public static final String OP_XATTR_GET_MAP = "op_xattr_get_map"; + + /** Invoke {@code getXAttr(Path, String)}: {@value}. */ + public static final String OP_XATTR_GET_NAMED = "op_xattr_get_named"; + + /** + * Invoke {@code getXAttrs(Path path, List names)}: {@value}. + */ + public static final String OP_XATTR_GET_NAMED_MAP = + "op_xattr_get_named_map"; + + /** Invoke {@code listXAttrs(Path path)}: {@value}. */ + public static final String OP_XATTR_LIST = "op_xattr_list"; + + /** {@value}. */ public static final String DELEGATION_TOKENS_ISSUED = "delegation_tokens_issued"; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 49a0f8105a9be..d14a82e5c304c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1048,4 +1048,10 @@ private Constants() { public static final String STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE = "fs.s3a.capability.directory.marker.action.delete"; + /** + * To comply with the XAttr rules, all headers of the object retrieved + * through the getXAttr APIs have the prefix: {@value}. + */ + public static final String XA_HEADER_PREFIX = "header."; + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 0fdad2150b6c7..5784ab8615e6e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -447,7 +447,7 @@ private int putObject() throws IOException { final PutObjectRequest putObjectRequest = uploadData.hasFile() ? writeOperationHelper.createPutObjectRequest(key, uploadData.getFile()) : writeOperationHelper.createPutObjectRequest(key, - uploadData.getUploadStream(), size); + uploadData.getUploadStream(), size, null); BlockUploadProgress callback = new BlockUploadProgress( block, progressListener, now()); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index f60ff75c7d6bb..d643e89433a76 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -107,6 +107,7 @@ import org.apache.hadoop.fs.s3a.impl.DeleteOperation; import org.apache.hadoop.fs.s3a.impl.DirectoryPolicy; import org.apache.hadoop.fs.s3a.impl.DirectoryPolicyImpl; +import org.apache.hadoop.fs.s3a.impl.HeaderProcessing; import org.apache.hadoop.fs.s3a.impl.InternalConstants; import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks; import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport; @@ -330,6 +331,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, */ private DirectoryPolicy directoryPolicy; + /** + * Header processing for XAttr. + */ + private HeaderProcessing headerProcessing; + /** * Context accessors for re-use. */ @@ -456,6 +462,8 @@ public void initialize(URI name, Configuration originalConf) magicCommitterEnabled ? "is" : "is not"); committerIntegration = new MagicCommitIntegration( this, magicCommitterEnabled); + // header processing for rename and magic committer + headerProcessing = new HeaderProcessing(createStoreContext()); // instantiate S3 Select support selectBinding = new SelectBinding(writeHelper); @@ -1781,14 +1789,15 @@ public boolean allowAuthoritative(final Path p) { /** * Low-level call to get at the object metadata. - * @param path path to the object + * @param path path to the object. This will be qualified. * @return metadata * @throws IOException IO and object access problems. */ @VisibleForTesting @Retries.RetryTranslated public ObjectMetadata getObjectMetadata(Path path) throws IOException { - return getObjectMetadata(path, null, invoker, null); + return getObjectMetadata(makeQualified(path), null, invoker, + "getObjectMetadata"); } /** @@ -1800,31 +1809,17 @@ public ObjectMetadata getObjectMetadata(Path path) throws IOException { * @return metadata * @throws IOException IO and object access problems. */ - @VisibleForTesting @Retries.RetryTranslated - public ObjectMetadata getObjectMetadata(Path path, + private ObjectMetadata getObjectMetadata(Path path, ChangeTracker changeTracker, Invoker changeInvoker, String operation) throws IOException { checkNotClosed(); - return once("getObjectMetadata", path.toString(), + String key = pathToKey(path); + return once(operation, path.toString(), () -> // this always does a full HEAD to the object getObjectMetadata( - pathToKey(path), changeTracker, changeInvoker, operation)); - } - - /** - * Get all the headers of the object of a path, if the object exists. - * @param path path to probe - * @return an immutable map of object headers. - * @throws IOException failure of the query - */ - @Retries.RetryTranslated - public Map getObjectHeaders(Path path) throws IOException { - LOG.debug("getObjectHeaders({})", path); - checkNotClosed(); - incrementReadOperations(); - return getObjectMetadata(path).getRawMetadata(); + key, changeTracker, changeInvoker, operation)); } /** @@ -2021,7 +2016,7 @@ protected DurationTrackerFactory getDurationTrackerFactory() { @Retries.RetryRaw @VisibleForTesting ObjectMetadata getObjectMetadata(String key) throws IOException { - return getObjectMetadata(key, null, invoker,null); + return getObjectMetadata(key, null, invoker, "getObjectMetadata"); } /** @@ -4099,59 +4094,8 @@ public int read() throws IOException { * @return a copy of {@link ObjectMetadata} with only relevant attributes */ private ObjectMetadata cloneObjectMetadata(ObjectMetadata source) { - // This approach may be too brittle, especially if - // in future there are new attributes added to ObjectMetadata - // that we do not explicitly call to set here ObjectMetadata ret = newObjectMetadata(source.getContentLength()); - - // Possibly null attributes - // Allowing nulls to pass breaks it during later use - if (source.getCacheControl() != null) { - ret.setCacheControl(source.getCacheControl()); - } - if (source.getContentDisposition() != null) { - ret.setContentDisposition(source.getContentDisposition()); - } - if (source.getContentEncoding() != null) { - ret.setContentEncoding(source.getContentEncoding()); - } - if (source.getContentMD5() != null) { - ret.setContentMD5(source.getContentMD5()); - } - if (source.getContentType() != null) { - ret.setContentType(source.getContentType()); - } - if (source.getExpirationTime() != null) { - ret.setExpirationTime(source.getExpirationTime()); - } - if (source.getExpirationTimeRuleId() != null) { - ret.setExpirationTimeRuleId(source.getExpirationTimeRuleId()); - } - if (source.getHttpExpiresDate() != null) { - ret.setHttpExpiresDate(source.getHttpExpiresDate()); - } - if (source.getLastModified() != null) { - ret.setLastModified(source.getLastModified()); - } - if (source.getOngoingRestore() != null) { - ret.setOngoingRestore(source.getOngoingRestore()); - } - if (source.getRestoreExpirationTime() != null) { - ret.setRestoreExpirationTime(source.getRestoreExpirationTime()); - } - if (source.getSSEAlgorithm() != null) { - ret.setSSEAlgorithm(source.getSSEAlgorithm()); - } - if (source.getSSECustomerAlgorithm() != null) { - ret.setSSECustomerAlgorithm(source.getSSECustomerAlgorithm()); - } - if (source.getSSECustomerKeyMd5() != null) { - ret.setSSECustomerKeyMd5(source.getSSECustomerKeyMd5()); - } - - for (Map.Entry e : source.getUserMetadata().entrySet()) { - ret.addUserMetadata(e.getKey(), e.getValue()); - } + getHeaderProcessing().cloneObjectMetadata(source, ret); return ret; } @@ -4382,6 +4326,37 @@ public EtagChecksum getFileChecksum(Path f, final long length) } } + /** + * Get header processing support. + * @return the header processing of this instance. + */ + private HeaderProcessing getHeaderProcessing() { + return headerProcessing; + } + + @Override + public byte[] getXAttr(final Path path, final String name) + throws IOException { + return getHeaderProcessing().getXAttr(path, name); + } + + @Override + public Map getXAttrs(final Path path) throws IOException { + return getHeaderProcessing().getXAttrs(path); + } + + @Override + public Map getXAttrs(final Path path, + final List names) + throws IOException { + return getHeaderProcessing().getXAttrs(path, names); + } + + @Override + public List listXAttrs(final Path path) throws IOException { + return getHeaderProcessing().listXAttrs(path); + } + /** * {@inheritDoc}. * @@ -5088,5 +5063,11 @@ public Path makeQualified(final Path path) { return S3AFileSystem.this.makeQualified(path); } + @Override + public ObjectMetadata getObjectMetadata(final String key) + throws IOException { + return once("getObjectMetadata", key, () -> + S3AFileSystem.this.getObjectMetadata(key)); + } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index c25e3b3c0efb5..5fcc15774761b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -120,7 +120,7 @@ @InterfaceAudience.Private @InterfaceStability.Evolving public class S3AInstrumentation implements Closeable, MetricsSource, - CountersAndGauges, IOStatisticsSource, DurationTrackerFactory { + CountersAndGauges, IOStatisticsSource { private static final Logger LOG = LoggerFactory.getLogger( S3AInstrumentation.class); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java index 6709382baf54d..0bd2a622f4f7b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -157,6 +157,24 @@ public enum Statistic { "Calls of rename()", TYPE_COUNTER), + /* The XAttr API metrics are all durations */ + INVOCATION_XATTR_GET_MAP( + StoreStatisticNames.OP_XATTR_GET_MAP, + "Calls of getXAttrs(Path path)", + TYPE_DURATION), + INVOCATION_XATTR_GET_NAMED( + StoreStatisticNames.OP_XATTR_GET_NAMED, + "Calls of getXAttr(Path, String)", + TYPE_DURATION), + INVOCATION_XATTR_GET_NAMED_MAP( + StoreStatisticNames.OP_XATTR_GET_NAMED_MAP, + "Calls of xattr()", + TYPE_DURATION), + INVOCATION_OP_XATTR_LIST( + StoreStatisticNames.OP_XATTR_LIST, + "Calls of getXAttrs(Path path, List names)", + TYPE_DURATION), + /* Object IO */ OBJECT_COPY_REQUESTS(StoreStatisticNames.OBJECT_COPY_REQUESTS, "Object copy requests", diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index e75c09c9f4c52..49a5eb276caf0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -25,6 +25,7 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import com.amazonaws.services.s3.model.AmazonS3Exception; @@ -172,12 +173,19 @@ public T retry(String action, * @param destKey destination key * @param inputStream source data. * @param length size, if known. Use -1 for not known + * @param headers optional map of custom headers. * @return the request */ public PutObjectRequest createPutObjectRequest(String destKey, - InputStream inputStream, long length) { + InputStream inputStream, + long length, + final Map headers) { + ObjectMetadata objectMetadata = newObjectMetadata(length); + if (headers != null) { + objectMetadata.setUserMetadata(headers); + } return owner.newPutObjectRequest(destKey, - newObjectMetadata(length), + objectMetadata, inputStream); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java index 0b336142ccc9f..2636ed7e3284c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; @@ -77,10 +78,13 @@ T retry(String action, * @param destKey destination key * @param inputStream source data. * @param length size, if known. Use -1 for not known + * @param headers optional map of custom headers. * @return the request */ PutObjectRequest createPutObjectRequest(String destKey, - InputStream inputStream, long length); + InputStream inputStream, + long length, + @Nullable Map headers); /** * Create a {@link PutObjectRequest} request to upload a file. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java index 3224a5ab36d76..60939967e2d25 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java @@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import static org.apache.hadoop.fs.s3a.Constants.XA_HEADER_PREFIX; import static org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory.COMMITTER_FACTORY_SCHEME_PATTERN; /** @@ -316,4 +317,17 @@ private CommitConstants() { public static final boolean DEFAULT_S3A_COMMITTER_GENERATE_UUID = false; + /** + * Magic Marker header to declare final file length on magic uploads + * marker objects: {@value}. + */ + public static final String X_HEADER_MAGIC_MARKER = + "x-hadoop-s3a-magic-data-length"; + + /** + * XAttr name of magic marker, with "header." prefix: {@value}. + */ + public static final String XA_MAGIC_MARKER = XA_HEADER_PREFIX + + X_HEADER_MAGIC_MARKER; + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java index c9fb3806b2c5a..4562e0f751523 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -39,6 +40,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; @@ -50,6 +52,7 @@ import org.apache.hadoop.fs.s3a.commit.files.PendingSet; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.fs.s3a.commit.files.SuccessData; +import org.apache.hadoop.fs.s3a.impl.HeaderProcessing; import org.apache.hadoop.fs.s3a.impl.InternalConstants; import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState; import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics; @@ -606,6 +609,29 @@ public CommitContext initiateCommitOperation(Path path) throws IOException { return new CommitContext(writeOperations.initiateCommitOperation(path)); } + /** + * Get the magic file length of a file. + * If the FS doesn't support the API, the attribute is missing or + * the parse to long fails, then Optional.empty() is returned. + * Static for some easier testability. + * @param fs filesystem + * @param path path + * @return either a length or None. + * @throws IOException on error + * */ + public static Optional extractMagicFileLength(FileSystem fs, Path path) + throws IOException { + byte[] bytes; + try { + bytes = fs.getXAttr(path, XA_MAGIC_MARKER); + } catch (UnsupportedOperationException e) { + // FS doesn't support xattr. + LOG.debug("Filesystem {} doesn't support XAttr API", fs); + return Optional.empty(); + } + return HeaderProcessing.extractXAttrLongValue(bytes); + } + /** * Commit context. * diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java index 0f1a0a6534df9..ddaee19f9f74a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java @@ -20,7 +20,9 @@ import java.io.ByteArrayInputStream; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import com.amazonaws.services.s3.model.PartETag; import com.amazonaws.services.s3.model.PutObjectRequest; @@ -37,6 +39,8 @@ import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER; + /** * Put tracker for Magic commits. *

Important

: must not directly or indirectly import a class which @@ -122,13 +126,6 @@ public boolean aboutToComplete(String uploadId, Preconditions.checkArgument(!parts.isEmpty(), "No uploaded parts to save"); - // put a 0-byte file with the name of the original under-magic path - PutObjectRequest originalDestPut = writer.createPutObjectRequest( - originalDestKey, - new ByteArrayInputStream(EMPTY), - 0); - writer.uploadObject(originalDestPut); - // build the commit summary SinglePendingCommit commitData = new SinglePendingCommit(); commitData.touch(System.currentTimeMillis()); @@ -150,9 +147,19 @@ public boolean aboutToComplete(String uploadId, PutObjectRequest put = writer.createPutObjectRequest( pendingPartKey, new ByteArrayInputStream(bytes), - bytes.length); + bytes.length, null); writer.uploadObject(put); + // Add the final file length as a header + Map headers = new HashMap<>(); + headers.put(X_HEADER_MAGIC_MARKER, Long.toString(bytesWritten)); + // now put a 0-byte file with the name of the original under-magic path + PutObjectRequest originalDestPut = writer.createPutObjectRequest( + originalDestKey, + new ByteArrayInputStream(EMPTY), + 0, + headers); + writer.uploadObject(originalDestPut); return false; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ContextAccessors.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ContextAccessors.java index d39c649df2e22..27ac7dec1dd19 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ContextAccessors.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ContextAccessors.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.nio.file.AccessDeniedException; +import com.amazonaws.services.s3.model.ObjectMetadata; + import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.Retries; @@ -81,4 +83,15 @@ public interface ContextAccessors { * @return possibly new path. */ Path makeQualified(Path path); + + /** + * Retrieve the object metadata. + * + * @param key key to retrieve. + * @return metadata + * @throws IOException IO and object access problems. + */ + @Retries.RetryTranslated + ObjectMetadata getObjectMetadata(String key) throws IOException; + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java new file mode 100644 index 0000000000000..5efec2b36dafe --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java @@ -0,0 +1,500 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import javax.annotation.Nullable; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.TreeMap; + +import com.amazonaws.services.s3.Headers; +import com.amazonaws.services.s3.model.ObjectMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Statistic; +import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; + +import static org.apache.hadoop.fs.s3a.Constants.XA_HEADER_PREFIX; +import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_OP_XATTR_LIST; +import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_MAP; +import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_NAMED; +import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_NAMED_MAP; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration; + +/** + * Part of the S3A FS where object headers are + * processed. + * Implements all the various XAttr read operations. + * Those APIs all expect byte arrays back. + * Metadata cloning is also implemented here, so as + * to stay in sync with custom header logic. + * + * The standard header names are extracted from the AWS SDK. + * The S3A connector does not (currently) support setting them, + * though it would be possible to do so through the createFile() + * builder API. + */ +public class HeaderProcessing extends AbstractStoreOperation { + + private static final Logger LOG = LoggerFactory.getLogger( + HeaderProcessing.class); + + /** + * An empty buffer. + */ + private static final byte[] EMPTY = new byte[0]; + + + /** + * Standard HTTP header found on some S3 objects: {@value}. + */ + public static final String XA_CACHE_CONTROL = + XA_HEADER_PREFIX + Headers.CACHE_CONTROL; + /** + * Standard HTTP header found on some S3 objects: {@value}. + */ + public static final String XA_CONTENT_DISPOSITION = + XA_HEADER_PREFIX + Headers.CONTENT_DISPOSITION; + + /** + * Standard HTTP header found on some S3 objects: {@value}. + */ + public static final String XA_CONTENT_ENCODING = + XA_HEADER_PREFIX + Headers.CONTENT_ENCODING; + + /** + * Standard HTTP header found on some S3 objects: {@value}. + */ + public static final String XA_CONTENT_LANGUAGE = + XA_HEADER_PREFIX + Headers.CONTENT_LANGUAGE; + + /** + * Length XAttr: {@value}. + */ + public static final String XA_CONTENT_LENGTH = + XA_HEADER_PREFIX + Headers.CONTENT_LENGTH; + + /** + * Standard HTTP header found on some S3 objects: {@value}. + */ + public static final String XA_CONTENT_MD5 = + XA_HEADER_PREFIX + Headers.CONTENT_MD5; + + /** + * Content range: {@value}. + * This is returned on GET requests with ranges. + */ + public static final String XA_CONTENT_RANGE = + XA_HEADER_PREFIX + Headers.CONTENT_RANGE; + + /** + * Content type: may be set when uploading. + * {@value}. + */ + public static final String XA_CONTENT_TYPE = + XA_HEADER_PREFIX + Headers.CONTENT_TYPE; + + /** + * Etag Header {@value}. + * Also accessible via {@code ObjectMetadata.getEtag()}, where + * it can be retrieved via {@code getFileChecksum(path)} if + * the S3A connector is enabled. + */ + public static final String XA_ETAG = XA_HEADER_PREFIX + Headers.ETAG; + + + /** + * last modified XAttr: {@value}. + */ + public static final String XA_LAST_MODIFIED = + XA_HEADER_PREFIX + Headers.LAST_MODIFIED; + + /* AWS Specific Headers. May not be found on other S3 endpoints. */ + + /** + * object archive status; empty if not on S3 Glacier + * (i.e all normal files should be non-archived as + * S3A and applications don't handle archived data) + * Value {@value}. + */ + public static final String XA_ARCHIVE_STATUS = + XA_HEADER_PREFIX + Headers.ARCHIVE_STATUS; + + /** + * Object legal hold status. {@value}. + */ + public static final String XA_OBJECT_LOCK_LEGAL_HOLD_STATUS = + XA_HEADER_PREFIX + Headers.OBJECT_LOCK_LEGAL_HOLD_STATUS; + + /** + * Object lock mode. {@value}. + */ + public static final String XA_OBJECT_LOCK_MODE = + XA_HEADER_PREFIX + Headers.OBJECT_LOCK_MODE; + + /** + * ISO8601 expiry date of object lock hold. {@value}. + */ + public static final String XA_OBJECT_LOCK_RETAIN_UNTIL_DATE = + XA_HEADER_PREFIX + Headers.OBJECT_LOCK_RETAIN_UNTIL_DATE; + + /** + * Replication status for cross-region replicated objects. {@value}. + */ + public static final String XA_OBJECT_REPLICATION_STATUS = + XA_HEADER_PREFIX + Headers.OBJECT_REPLICATION_STATUS; + + /** + * Version ID; empty for non-versioned buckets/data. {@value}. + */ + public static final String XA_S3_VERSION_ID = + XA_HEADER_PREFIX + Headers.S3_VERSION_ID; + + /** + * The server-side encryption algorithm to use + * with AWS-managed keys: {@value}. + */ + public static final String XA_SERVER_SIDE_ENCRYPTION = + XA_HEADER_PREFIX + Headers.SERVER_SIDE_ENCRYPTION; + + /** + * Storage Class XAttr: {@value}. + */ + public static final String XA_STORAGE_CLASS = + XA_HEADER_PREFIX + Headers.STORAGE_CLASS; + + /** + * Standard headers which are retrieved from HEAD Requests + * and set as XAttrs if the response included the relevant header. + */ + public static final String[] XA_STANDARD_HEADERS = { + /* HTTP standard headers */ + XA_CACHE_CONTROL, + XA_CONTENT_DISPOSITION, + XA_CONTENT_ENCODING, + XA_CONTENT_LANGUAGE, + XA_CONTENT_LENGTH, + XA_CONTENT_MD5, + XA_CONTENT_RANGE, + XA_CONTENT_TYPE, + XA_ETAG, + XA_LAST_MODIFIED, + /* aws headers */ + XA_ARCHIVE_STATUS, + XA_OBJECT_LOCK_LEGAL_HOLD_STATUS, + XA_OBJECT_LOCK_MODE, + XA_OBJECT_LOCK_RETAIN_UNTIL_DATE, + XA_OBJECT_REPLICATION_STATUS, + XA_S3_VERSION_ID, + XA_SERVER_SIDE_ENCRYPTION, + XA_STORAGE_CLASS, + }; + + /** + * Content type of generic binary objects. + * This is the default for uploaded objects. + */ + public static final String CONTENT_TYPE_OCTET_STREAM = + "application/octet-stream"; + + /** + * XML content type : {@value}. + * This is application/xml, not text/xml, and is + * what a HEAD of / returns as the type of a root path. + */ + public static final String CONTENT_TYPE_APPLICATION_XML = + "application/xml"; + + /** + * Construct. + * @param storeContext store context. + */ + public HeaderProcessing(final StoreContext storeContext) { + super(storeContext); + } + + /** + * Query the store, get all the headers into a map. Each Header + * has the "header." prefix. + * Caller must have read access. + * The value of each header is the string value of the object + * UTF-8 encoded. + * @param path path of object. + * @param statistic statistic to use for duration tracking. + * @return the headers + * @throws IOException failure, including file not found. + */ + private Map retrieveHeaders( + final Path path, + final Statistic statistic) throws IOException { + StoreContext context = getStoreContext(); + ContextAccessors accessors = context.getContextAccessors(); + String objectKey = accessors.pathToKey(path); + ObjectMetadata md; + String symbol = statistic.getSymbol(); + S3AStatisticsContext instrumentation = context.getInstrumentation(); + try { + md = trackDuration(instrumentation, symbol, () -> + accessors.getObjectMetadata(objectKey)); + } catch (FileNotFoundException e) { + // no entry. It could be a directory, so try again. + md = trackDuration(instrumentation, symbol, () -> + accessors.getObjectMetadata(objectKey + "/")); + } + // all user metadata + Map rawHeaders = md.getUserMetadata(); + Map headers = new TreeMap<>(); + rawHeaders.forEach((key, value) -> + headers.put(XA_HEADER_PREFIX + key, encodeBytes(value))); + + // and add the usual content length &c, if set + maybeSetHeader(headers, XA_CACHE_CONTROL, + md.getCacheControl()); + maybeSetHeader(headers, XA_CONTENT_DISPOSITION, + md.getContentDisposition()); + maybeSetHeader(headers, XA_CONTENT_ENCODING, + md.getContentEncoding()); + maybeSetHeader(headers, XA_CONTENT_LANGUAGE, + md.getContentLanguage()); + maybeSetHeader(headers, XA_CONTENT_LENGTH, + md.getContentLength()); + maybeSetHeader(headers, XA_CONTENT_MD5, + md.getContentMD5()); + maybeSetHeader(headers, XA_CONTENT_RANGE, + md.getContentRange()); + maybeSetHeader(headers, XA_CONTENT_TYPE, + md.getContentType()); + maybeSetHeader(headers, XA_ETAG, + md.getETag()); + maybeSetHeader(headers, XA_LAST_MODIFIED, + md.getLastModified()); + + // AWS custom headers + maybeSetHeader(headers, XA_ARCHIVE_STATUS, + md.getArchiveStatus()); + maybeSetHeader(headers, XA_OBJECT_LOCK_LEGAL_HOLD_STATUS, + md.getObjectLockLegalHoldStatus()); + maybeSetHeader(headers, XA_OBJECT_LOCK_MODE, + md.getObjectLockMode()); + maybeSetHeader(headers, XA_OBJECT_LOCK_RETAIN_UNTIL_DATE, + md.getObjectLockRetainUntilDate()); + maybeSetHeader(headers, XA_OBJECT_REPLICATION_STATUS, + md.getReplicationStatus()); + maybeSetHeader(headers, XA_S3_VERSION_ID, + md.getVersionId()); + maybeSetHeader(headers, XA_SERVER_SIDE_ENCRYPTION, + md.getSSEAlgorithm()); + maybeSetHeader(headers, XA_STORAGE_CLASS, + md.getStorageClass()); + maybeSetHeader(headers, XA_STORAGE_CLASS, + md.getReplicationStatus()); + return headers; + } + + /** + * Set a header if the value is non null. + * + * @param headers header map + * @param name header name + * @param value value to encode. + */ + private void maybeSetHeader( + final Map headers, + final String name, + final Object value) { + if (value != null) { + headers.put(name, encodeBytes(value)); + } + } + + /** + * Stringify an object and return its bytes in UTF-8 encoding. + * @param s source + * @return encoded object or an empty buffer + */ + public static byte[] encodeBytes(@Nullable Object s) { + return s == null + ? EMPTY + : s.toString().getBytes(StandardCharsets.UTF_8); + } + + /** + * Get the string value from the bytes. + * if null : return null, otherwise the UTF-8 decoded + * bytes. + * @param bytes source bytes + * @return decoded value + */ + public static String decodeBytes(byte[] bytes) { + return bytes == null + ? null + : new String(bytes, StandardCharsets.UTF_8); + } + + /** + * Get an XAttr name and value for a file or directory. + * @param path Path to get extended attribute + * @param name XAttr name. + * @return byte[] XAttr value or null + * @throws IOException IO failure + */ + public byte[] getXAttr(Path path, String name) throws IOException { + return retrieveHeaders(path, INVOCATION_XATTR_GET_NAMED).get(name); + } + + /** + * See {@code FileSystem.getXAttrs(path}. + * + * @param path Path to get extended attributes + * @return Map describing the XAttrs of the file or directory + * @throws IOException IO failure + */ + public Map getXAttrs(Path path) throws IOException { + return retrieveHeaders(path, INVOCATION_XATTR_GET_MAP); + } + + /** + * See {@code FileSystem.listXAttrs(path)}. + * @param path Path to get extended attributes + * @return List of supported XAttrs + * @throws IOException IO failure + */ + public List listXAttrs(final Path path) throws IOException { + return new ArrayList<>(retrieveHeaders(path, INVOCATION_OP_XATTR_LIST) + .keySet()); + } + + /** + * See {@code FileSystem.getXAttrs(path, names}. + * @param path Path to get extended attributes + * @param names XAttr names. + * @return Map describing the XAttrs of the file or directory + * @throws IOException IO failure + */ + public Map getXAttrs(Path path, List names) + throws IOException { + Map headers = retrieveHeaders(path, + INVOCATION_XATTR_GET_NAMED_MAP); + Map result = new TreeMap<>(); + headers.entrySet().stream() + .filter(entry -> names.contains(entry.getKey())) + .forEach(entry -> result.put(entry.getKey(), entry.getValue())); + return result; + } + + /** + * Convert an XAttr byte array to a long. + * testability. + * @param data data to parse + * @return either a length or none + */ + public static Optional extractXAttrLongValue(byte[] data) { + String xAttr; + xAttr = HeaderProcessing.decodeBytes(data); + if (StringUtils.isNotEmpty(xAttr)) { + try { + long l = Long.parseLong(xAttr); + if (l >= 0) { + return Optional.of(l); + } + } catch (NumberFormatException ex) { + LOG.warn("Not a number: {}", xAttr, ex); + } + } + // missing/empty header or parse failure. + return Optional.empty(); + } + + /** + * Creates a copy of the passed {@link ObjectMetadata}. + * Does so without using the {@link ObjectMetadata#clone()} method, + * to avoid copying unnecessary headers. + * This operation does not copy the {@code X_HEADER_MAGIC_MARKER} + * header to avoid confusion. If a marker file is renamed, + * it loses information about any remapped file. + * If new fields are added to ObjectMetadata which are not + * present in the user metadata headers, they will not be picked + * up or cloned unless this operation is updated. + * @param source the {@link ObjectMetadata} to copy + * @param dest the metadata to update; this is the return value. + */ + public void cloneObjectMetadata(ObjectMetadata source, + ObjectMetadata dest) { + + // Possibly null attributes + // Allowing nulls to pass breaks it during later use + if (source.getCacheControl() != null) { + dest.setCacheControl(source.getCacheControl()); + } + if (source.getContentDisposition() != null) { + dest.setContentDisposition(source.getContentDisposition()); + } + if (source.getContentEncoding() != null) { + dest.setContentEncoding(source.getContentEncoding()); + } + if (source.getContentMD5() != null) { + dest.setContentMD5(source.getContentMD5()); + } + if (source.getContentType() != null) { + dest.setContentType(source.getContentType()); + } + if (source.getExpirationTime() != null) { + dest.setExpirationTime(source.getExpirationTime()); + } + if (source.getExpirationTimeRuleId() != null) { + dest.setExpirationTimeRuleId(source.getExpirationTimeRuleId()); + } + if (source.getHttpExpiresDate() != null) { + dest.setHttpExpiresDate(source.getHttpExpiresDate()); + } + if (source.getLastModified() != null) { + dest.setLastModified(source.getLastModified()); + } + if (source.getOngoingRestore() != null) { + dest.setOngoingRestore(source.getOngoingRestore()); + } + if (source.getRestoreExpirationTime() != null) { + dest.setRestoreExpirationTime(source.getRestoreExpirationTime()); + } + if (source.getSSEAlgorithm() != null) { + dest.setSSEAlgorithm(source.getSSEAlgorithm()); + } + if (source.getSSECustomerAlgorithm() != null) { + dest.setSSECustomerAlgorithm(source.getSSECustomerAlgorithm()); + } + if (source.getSSECustomerKeyMd5() != null) { + dest.setSSECustomerKeyMd5(source.getSSECustomerKeyMd5()); + } + + // copy user metadata except the magic marker header. + source.getUserMetadata().entrySet().stream() + .filter(e -> !e.getKey().equals(X_HEADER_MAGIC_MARKER)) + .forEach(e -> dest.addUserMetadata(e.getKey(), e.getValue())); + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/CountersAndGauges.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/CountersAndGauges.java index 61cc0330936d8..f9093ff7117cc 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/CountersAndGauges.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/CountersAndGauges.java @@ -21,11 +21,12 @@ import java.time.Duration; import org.apache.hadoop.fs.s3a.Statistic; +import org.apache.hadoop.fs.statistics.DurationTrackerFactory; /** * This is the foundational API for collecting S3A statistics. */ -public interface CountersAndGauges { +public interface CountersAndGauges extends DurationTrackerFactory { /** * Increment a specific counter. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/BondedS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/BondedS3AStatisticsContext.java index 006eb24f72c85..51bb4afebc4ff 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/BondedS3AStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/BondedS3AStatisticsContext.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.s3a.statistics.S3AMultipartUploaderStatistics; import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; import org.apache.hadoop.fs.s3a.statistics.StatisticsFromAwsSdk; +import org.apache.hadoop.fs.statistics.DurationTracker; /** * An S3A statistics context which is bonded to a @@ -210,6 +211,11 @@ public S3AMultipartUploaderStatistics createMultipartUploaderStatistics() { return new S3AMultipartUploaderStatisticsImpl(this::incrementCounter); } + @Override + public DurationTracker trackDuration(final String key, final long count) { + return getInstrumentation().trackDuration(key, count); + } + /** * This is the interface which an integration source must implement * for the integration. diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md index 48d75dc79532c..048f08cf7c0f0 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md @@ -1337,6 +1337,16 @@ On `close()`, summary data would be written to the file `/results/latest/__magic/job400_1/task_01_01/latest.orc.lzo.pending`. This would contain the upload ID and all the parts and etags of uploaded data. +A marker file is also created, so that code which verifies that a newly created file +exists does not fail. +1. These marker files are zero bytes long. +1. They declare the full length of the final file in the HTTP header + `x-hadoop-s3a-magic-data-length`. +1. A call to `getXAttr("header.x-hadoop-s3a-magic-data-length")` will return a + string containing the number of bytes in the data uploaded. + +This is needed so that the Spark write-tracking code can report how much data +has been created. #### Task commit diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md index 0e86f5244067b..d4292df03a9fb 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md @@ -360,6 +360,7 @@ However, it has extra requirements of the filesystem 1. The S3A client must be configured to recognize interactions with the magic directories and treat them specially. +Now that Amazon S3 is consistent, the magic committer is enabled by default. It's also not been field tested to the extent of Netflix's committer; consider it the least mature of the committers. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java index 2f0599dc8c374..e6ebfba922d5f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java @@ -29,6 +29,7 @@ import com.amazonaws.services.s3.model.GetBucketEncryptionResult; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PutObjectRequest; +import org.assertj.core.api.Assertions; import org.junit.Assume; import org.junit.Test; @@ -47,6 +48,7 @@ import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM; import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_ETAG; import static org.hamcrest.Matchers.nullValue; /** @@ -171,6 +173,9 @@ public void testEmptyFileChecksums() throws Throwable { assertNotEquals("file 1 checksum", 0, checksum1.getLength()); assertEquals("checksums of empty files", checksum1, fs.getFileChecksum(touchFile("file2"), 0)); + Assertions.assertThat(fs.getXAttr(file1, XA_ETAG)) + .describedAs("etag from xattr") + .isEqualTo(checksum1.getBytes()); } /** @@ -222,6 +227,9 @@ public void testNonEmptyFileChecksums() throws Throwable { createFile(fs, file4, true, "hello, world".getBytes(StandardCharsets.UTF_8)); assertNotEquals(checksum2, fs.getFileChecksum(file4, 0)); + Assertions.assertThat(fs.getXAttr(file3, XA_ETAG)) + .describedAs("etag from xattr") + .isEqualTo(checksum1.getBytes()); } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java index 66ec8ff3a22dc..adcf578b05862 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java @@ -272,7 +272,9 @@ public void setup() throws Exception { @Override public void teardown() throws Exception { // restore the s3 client so there's no mocking interfering with the teardown - originalS3Client.ifPresent(fs::setAmazonS3Client); + if (fs != null) { + originalS3Client.ifPresent(fs::setAmazonS3Client); + } super.teardown(); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java index 1108194fb5cb5..14207e8359788 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java @@ -1335,11 +1335,12 @@ public void testOutputFormatIntegration() throws Throwable { = outputFormat.getRecordWriter(tContext); IntWritable iw = new IntWritable(1); recordWriter.write(iw, iw); + long expectedLength = 4; Path dest = recordWriter.getDest(); - validateTaskAttemptPathDuringWrite(dest); + validateTaskAttemptPathDuringWrite(dest, expectedLength); recordWriter.close(tContext); // at this point - validateTaskAttemptPathAfterWrite(dest); + validateTaskAttemptPathAfterWrite(dest, expectedLength); assertTrue("Committer does not have data to commit " + committer, committer.needsTaskCommit(tContext)); commitTask(committer, tContext); @@ -1750,9 +1751,11 @@ public void testS3ACommitterFactoryBinding() throws Throwable { * Validate the path of a file being written to during the write * itself. * @param p path + * @param expectedLength * @throws IOException IO failure */ - protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException { + protected void validateTaskAttemptPathDuringWrite(Path p, + final long expectedLength) throws IOException { } @@ -1760,9 +1763,11 @@ protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException { * Validate the path of a file being written to after the write * operation has completed. * @param p path + * @param expectedLength * @throws IOException IO failure */ - protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException { + protected void validateTaskAttemptPathAfterWrite(Path p, + final long expectedLength) throws IOException { } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java index 978f08c9b898e..b025f6f0969fe 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java @@ -26,6 +26,7 @@ import com.amazonaws.services.s3.model.PartETag; import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; +import org.assertj.core.api.Assertions; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +53,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; +import static org.apache.hadoop.fs.s3a.commit.CommitOperations.extractMagicFileLength; import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*; import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*; import static org.apache.hadoop.fs.s3a.Constants.*; @@ -216,13 +218,13 @@ private static Path makeMagic(Path destFile) { @Test public void testCommitEmptyFile() throws Throwable { - describe("create then commit an empty file"); + describe("create then commit an empty magic file"); createCommitAndVerify("empty-commit.txt", new byte[0]); } @Test public void testCommitSmallFile() throws Throwable { - describe("create then commit an empty file"); + describe("create then commit a small magic file"); createCommitAndVerify("small-commit.txt", DATASET); } @@ -288,6 +290,64 @@ public void testBaseRelativePath() throws Throwable { commit("child.txt", pendingChildPath, expectedDestPath, 0, 0); } + /** + * Verify that that when a marker file is renamed, its + * magic marker attribute is lost. + */ + @Test + public void testMarkerFileRename() + throws Exception { + S3AFileSystem fs = getFileSystem(); + Path destFile = methodPath(); + Path destDir = destFile.getParent(); + fs.delete(destDir, true); + Path magicDest = makeMagic(destFile); + Path magicDir = magicDest.getParent(); + fs.mkdirs(magicDir); + + // use the builder API to verify it works exactly the + // same. + try (FSDataOutputStream stream = fs.createFile(magicDest) + .overwrite(true) + .recursive() + .build()) { + assertIsMagicStream(stream); + stream.write(DATASET); + } + Path magic2 = new Path(magicDir, "magic2"); + // rename the marker + fs.rename(magicDest, magic2); + + // the renamed file has no header + Assertions.assertThat(extractMagicFileLength(fs, magic2)) + .describedAs("XAttribute " + XA_MAGIC_MARKER + " of " + magic2) + .isEmpty(); + // abort the upload, which is driven by the .pending files + // there must be 1 deleted file; during test debugging with aborted + // runs there may be more. + Assertions.assertThat(newCommitOperations() + .abortPendingUploadsUnderPath(destDir)) + .describedAs("Aborting all pending uploads under %s", destDir) + .isGreaterThanOrEqualTo(1); + } + + /** + * Assert that an output stream is magic. + * @param stream stream to probe. + */ + protected void assertIsMagicStream(final FSDataOutputStream stream) { + Assertions.assertThat(stream.hasCapability(STREAM_CAPABILITY_MAGIC_OUTPUT)) + .describedAs("Stream capability %s in stream %s", + STREAM_CAPABILITY_MAGIC_OUTPUT, stream) + .isTrue(); + } + + /** + * Create a file through the magic commit mechanism. + * @param filename file to create (with __magic path.) + * @param data data to write + * @throws Exception failure + */ private void createCommitAndVerify(String filename, byte[] data) throws Exception { S3AFileSystem fs = getFileSystem(); @@ -295,19 +355,30 @@ private void createCommitAndVerify(String filename, byte[] data) fs.delete(destFile.getParent(), true); Path magicDest = makeMagic(destFile); assertPathDoesNotExist("Magic file should not exist", magicDest); + long dataSize = data != null ? data.length : 0; try(FSDataOutputStream stream = fs.create(magicDest, true)) { - assertTrue(stream.hasCapability(STREAM_CAPABILITY_MAGIC_OUTPUT)); - if (data != null && data.length > 0) { + assertIsMagicStream(stream); + if (dataSize > 0) { stream.write(data); } stream.close(); } FileStatus status = getFileStatusEventually(fs, magicDest, CONSISTENCY_WAIT); - assertEquals("Non empty marker file: " + status, 0, status.getLen()); - + assertEquals("Magic marker file is not zero bytes: " + status, + 0, 0); + Assertions.assertThat(extractMagicFileLength(fs, + magicDest)) + .describedAs("XAttribute " + XA_MAGIC_MARKER + " of " + magicDest) + .isNotEmpty() + .hasValue(dataSize); commit(filename, destFile, HIGH_THROTTLE, 0); verifyFileContents(fs, destFile, data); + // the destination file doesn't have the attribute + Assertions.assertThat(extractMagicFileLength(fs, + destFile)) + .describedAs("XAttribute " + XA_MAGIC_MARKER + " of " + destFile) + .isEmpty(); } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java index f6d6307b5d8bb..7ee1833ba2f58 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java @@ -20,17 +20,21 @@ import java.io.IOException; import java.net.URI; +import java.util.List; import org.assertj.core.api.Assertions; import org.junit.Test; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.commit.AbstractITCommitProtocol; import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter; import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.CommitOperations; import org.apache.hadoop.fs.s3a.commit.CommitUtils; import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjection; import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjectionImpl; @@ -39,6 +43,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import static org.apache.hadoop.fs.s3a.S3AUtils.listAndFilter; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.hamcrest.CoreMatchers.containsString; @@ -107,18 +112,44 @@ public MagicS3GuardCommitter createFailingCommitter( return new CommitterWithFailedThenSucceed(getOutDir(), tContext); } - protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException { + protected void validateTaskAttemptPathDuringWrite(Path p, + final long expectedLength) throws IOException { String pathStr = p.toString(); assertTrue("not magic " + pathStr, pathStr.contains(MAGIC)); assertPathDoesNotExist("task attempt visible", p); } - protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException { - FileStatus st = getFileSystem().getFileStatus(p); - assertEquals("file length in " + st, 0, st.getLen()); - Path pendingFile = new Path(p.toString() + PENDING_SUFFIX); + protected void validateTaskAttemptPathAfterWrite(Path marker, + final long expectedLength) throws IOException { + // the pending file exists + Path pendingFile = new Path(marker.toString() + PENDING_SUFFIX); assertPathExists("pending file", pendingFile); + S3AFileSystem fs = getFileSystem(); + + // THIS SEQUENCE MUST BE RUN IN ORDER ON A S3GUARDED + // STORE + // if you list the parent dir and find the marker, it + // is really 0 bytes long + String name = marker.getName(); + List filtered = listAndFilter(fs, + marker.getParent(), false, + (path) -> path.getName().equals(name)); + Assertions.assertThat(filtered) + .hasSize(1); + Assertions.assertThat(filtered.get(0)) + .matches(lst -> lst.getLen() == 0, + "Listing should return 0 byte length"); + + // marker file is empty + FileStatus st = fs.getFileStatus(marker); + assertEquals("file length in " + st, 0, st.getLen()); + // xattr header + Assertions.assertThat(CommitOperations.extractMagicFileLength(fs, + marker)) + .describedAs("XAttribute " + XA_MAGIC_MARKER) + .isNotEmpty() + .hasValue(expectedLength); } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java index a4dfacead38d9..826c3cd2743a7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java @@ -23,6 +23,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; @@ -113,14 +114,20 @@ protected void expectJobCommitToFail(JobContext jContext, IOException.class); } - protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException { + protected void validateTaskAttemptPathDuringWrite(Path p, + final long expectedLength) throws IOException { // this is expected to be local FS ContractTestUtils.assertPathExists(getLocalFS(), "task attempt", p); } - protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException { + protected void validateTaskAttemptPathAfterWrite(Path p, + final long expectedLength) throws IOException { // this is expected to be local FS - ContractTestUtils.assertPathExists(getLocalFS(), "task attempt", p); + // this is expected to be local FS + FileSystem localFS = getLocalFS(); + ContractTestUtils.assertPathExists(localFS, "task attempt", p); + FileStatus st = localFS.getFileStatus(p); + assertEquals("file length in " + st, expectedLength, st.getLen()); } protected FileSystem getLocalFS() throws IOException { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestXAttrCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestXAttrCost.java new file mode 100644 index 0000000000000..aa3cecaf1eb7a --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestXAttrCost.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.FileNotFoundException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.assertj.core.api.AbstractStringAssert; +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; + +import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_OP_XATTR_LIST; +import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_MAP; +import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_NAMED; +import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_OCTET_STREAM; +import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_APPLICATION_XML; +import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_CONTENT_LENGTH; +import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_CONTENT_TYPE; +import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_STANDARD_HEADERS; +import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.decodeBytes; +import static org.apache.hadoop.fs.s3a.performance.OperationCost.CREATE_FILE_OVERWRITE; + +/** + * Invoke XAttr API calls against objects in S3 and validate header + * extraction. + */ +public class ITestXAttrCost extends AbstractS3ACostTest { + + private static final Logger LOG = + LoggerFactory.getLogger(ITestXAttrCost.class); + + private static final int GET_METADATA_ON_OBJECT = 1; + private static final int GET_METADATA_ON_DIR = GET_METADATA_ON_OBJECT * 2; + + public ITestXAttrCost() { + // no parameterization here + super(false, true, false); + } + + @Test + public void testXAttrRoot() throws Throwable { + describe("Test xattr on root"); + Path root = new Path("/"); + S3AFileSystem fs = getFileSystem(); + Map xAttrs = verifyMetrics( + () -> fs.getXAttrs(root), + with(INVOCATION_XATTR_GET_MAP, GET_METADATA_ON_OBJECT)); + logXAttrs(xAttrs); + List headerList = verifyMetrics(() -> + fs.listXAttrs(root), + with(INVOCATION_OP_XATTR_LIST, GET_METADATA_ON_OBJECT)); + + // verify this contains all the standard markers, + // but not the magic marker header + Assertions.assertThat(headerList) + .describedAs("Headers on root object") + .containsOnly( + XA_CONTENT_LENGTH, + XA_CONTENT_TYPE); + assertHeaderEntry(xAttrs, XA_CONTENT_TYPE) + .isEqualTo(CONTENT_TYPE_APPLICATION_XML); + } + + /** + * Log the attributes as strings. + * @param xAttrs map of attributes + */ + private void logXAttrs(final Map xAttrs) { + xAttrs.forEach((k, v) -> + LOG.info("{} has bytes[{}] => \"{}\"", + k, v.length, decodeBytes(v))); + } + + @Test + public void testXAttrFile() throws Throwable { + describe("Test xattr on a file"); + Path testFile = methodPath(); + create(testFile, true, CREATE_FILE_OVERWRITE); + S3AFileSystem fs = getFileSystem(); + Map xAttrs = verifyMetrics(() -> + fs.getXAttrs(testFile), + with(INVOCATION_XATTR_GET_MAP, GET_METADATA_ON_OBJECT)); + logXAttrs(xAttrs); + assertHeaderEntry(xAttrs, XA_CONTENT_LENGTH) + .isEqualTo("0"); + + // get the list of supported headers + List headerList = verifyMetrics( + () -> fs.listXAttrs(testFile), + with(INVOCATION_OP_XATTR_LIST, GET_METADATA_ON_OBJECT)); + // verify this contains all the standard markers, + // but not the magic marker header + Assertions.assertThat(headerList) + .describedAs("Supported headers") + .containsAnyElementsOf(Arrays.asList(XA_STANDARD_HEADERS)); + + // ask for one header and validate its value + byte[] bytes = verifyMetrics(() -> + fs.getXAttr(testFile, XA_CONTENT_LENGTH), + with(INVOCATION_XATTR_GET_NAMED, GET_METADATA_ON_OBJECT)); + assertHeader(XA_CONTENT_LENGTH, bytes) + .isEqualTo("0"); + assertHeaderEntry(xAttrs, XA_CONTENT_TYPE) + .isEqualTo(CONTENT_TYPE_OCTET_STREAM); + } + + /** + * Directory attributes can be retrieved, but they take two HEAD requests. + * @throws Throwable + */ + @Test + public void testXAttrDir() throws Throwable { + describe("Test xattr on a dir"); + + S3AFileSystem fs = getFileSystem(); + Path dir = methodPath(); + fs.mkdirs(dir); + Map xAttrs = verifyMetrics(() -> + fs.getXAttrs(dir), + with(INVOCATION_XATTR_GET_MAP, GET_METADATA_ON_DIR)); + logXAttrs(xAttrs); + assertHeaderEntry(xAttrs, XA_CONTENT_LENGTH) + .isEqualTo("0"); + + // get the list of supported headers + List headerList = verifyMetrics( + () -> fs.listXAttrs(dir), + with(INVOCATION_OP_XATTR_LIST, GET_METADATA_ON_DIR)); + // verify this contains all the standard markers, + // but not the magic marker header + Assertions.assertThat(headerList) + .describedAs("Supported headers") + .containsAnyElementsOf(Arrays.asList(XA_STANDARD_HEADERS)); + + // ask for one header and validate its value + byte[] bytes = verifyMetrics(() -> + fs.getXAttr(dir, XA_CONTENT_LENGTH), + with(INVOCATION_XATTR_GET_NAMED, GET_METADATA_ON_DIR)); + assertHeader(XA_CONTENT_LENGTH, bytes) + .isEqualTo("0"); + assertHeaderEntry(xAttrs, XA_CONTENT_TYPE) + .isEqualTo(CONTENT_TYPE_OCTET_STREAM); + } + + /** + * When the operations are called on a missing path, FNFE is + * raised and only one attempt is made to retry the operation. + */ + @Test + public void testXAttrMissingFile() throws Throwable { + describe("Test xattr on a missing path"); + Path testFile = methodPath(); + S3AFileSystem fs = getFileSystem(); + int getMetadataOnMissingFile = GET_METADATA_ON_DIR; + verifyMetricsIntercepting(FileNotFoundException.class, "", () -> + fs.getXAttrs(testFile), + with(INVOCATION_XATTR_GET_MAP, getMetadataOnMissingFile)); + verifyMetricsIntercepting(FileNotFoundException.class, "", () -> + fs.getXAttr(testFile, XA_CONTENT_LENGTH), + with(INVOCATION_XATTR_GET_NAMED, getMetadataOnMissingFile)); + verifyMetricsIntercepting(FileNotFoundException.class, "", () -> + fs.listXAttrs(testFile), + with(INVOCATION_OP_XATTR_LIST, getMetadataOnMissingFile)); + } + + /** + * Generate an assert on a named header in the map. + * @param xAttrs attribute map + * @param key header key + * @return the assertion + */ + private AbstractStringAssert assertHeaderEntry( + Map xAttrs, String key) { + + return assertHeader(key, xAttrs.get(key)); + } + + /** + * Create an assertion on the header; check for the bytes + * being non-null/empty and then returns the decoded values + * as a string assert. + * @param key header key (for error) + * @param bytes value + * @return the assertion + */ + private AbstractStringAssert assertHeader(final String key, + final byte[] bytes) { + + String decoded = decodeBytes(bytes); + return Assertions.assertThat(decoded) + .describedAs("xattr %s decoded to: %s", key, decoded) + .isNotNull() + .isNotEmpty(); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestHeaderProcessing.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestHeaderProcessing.java new file mode 100644 index 0000000000000..e0c6feeb256cc --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestHeaderProcessing.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.amazonaws.services.s3.model.ObjectMetadata; +import org.assertj.core.api.Assertions; +import org.assertj.core.util.Lists; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.fs.s3a.test.OperationTrackingStore; +import org.apache.hadoop.test.HadoopTestBase; + +import static java.lang.System.currentTimeMillis; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.XA_MAGIC_MARKER; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER; +import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_LAST_MODIFIED; +import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_CONTENT_LENGTH; +import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.decodeBytes; +import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.encodeBytes; +import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.extractXAttrLongValue; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Unit tests of header processing logic in {@link HeaderProcessing}. + * Builds up a context accessor where the path + * defined in {@link #MAGIC_PATH} exists and returns object metadata. + * + */ +public class TestHeaderProcessing extends HadoopTestBase { + + private static final XAttrContextAccessor CONTEXT_ACCESSORS + = new XAttrContextAccessor(); + + public static final String VALUE = "abcdeFGHIJ123!@##&82;"; + + public static final long FILE_LENGTH = 1024; + + private static final String FINAL_FILE = "s3a://bucket/dest/output.csv"; + + private StoreContext context; + + private HeaderProcessing headerProcessing; + + private static final String MAGIC_KEY + = "dest/__magic/job1/ta1/__base/output.csv"; + private static final String MAGIC_FILE + = "s3a://bucket/" + MAGIC_KEY; + + private static final Path MAGIC_PATH = + new Path(MAGIC_FILE); + + public static final long MAGIC_LEN = 4096L; + + /** + * All the XAttrs which are built up. + */ + private static final String[] RETRIEVED_XATTRS = { + XA_MAGIC_MARKER, + XA_CONTENT_LENGTH, + XA_LAST_MODIFIED + }; + + @Before + public void setup() throws Exception { + CONTEXT_ACCESSORS.len = FILE_LENGTH; + CONTEXT_ACCESSORS.userHeaders.put( + X_HEADER_MAGIC_MARKER, + Long.toString(MAGIC_LEN)); + context = S3ATestUtils.createMockStoreContext(true, + new OperationTrackingStore(), CONTEXT_ACCESSORS); + headerProcessing = new HeaderProcessing(context); + } + + @Test + public void testByteRoundTrip() throws Throwable { + Assertions.assertThat(decodeBytes(encodeBytes(VALUE))) + .describedAs("encoding of " + VALUE) + .isEqualTo(VALUE); + } + + @Test + public void testGetMarkerXAttr() throws Throwable { + assertAttributeHasValue(XA_MAGIC_MARKER, MAGIC_LEN); + } + + @Test + public void testGetLengthXAttr() throws Throwable { + assertAttributeHasValue(XA_CONTENT_LENGTH, FILE_LENGTH); + } + + /** + * Last modified makes it through. + */ + @Test + public void testGetDateXAttr() throws Throwable { + Assertions.assertThat( + decodeBytes(headerProcessing.getXAttr(MAGIC_PATH, + XA_LAST_MODIFIED))) + .describedAs("XAttribute " + XA_LAST_MODIFIED) + .isEqualTo(CONTEXT_ACCESSORS.date.toString()); + } + + /** + * The API calls on unknown paths raise 404s. + */ + @Test + public void test404() throws Throwable { + intercept(FileNotFoundException.class, () -> + headerProcessing.getXAttr(new Path(FINAL_FILE), XA_MAGIC_MARKER)); + } + + /** + * This call returns all the attributes which aren't null, including + * all the standard HTTP headers. + */ + @Test + public void testGetAllXAttrs() throws Throwable { + Map xAttrs = headerProcessing.getXAttrs(MAGIC_PATH); + Assertions.assertThat(xAttrs.keySet()) + .describedAs("Attribute keys") + .contains(RETRIEVED_XATTRS); + } + + /** + * This call returns all the attributes which aren't null, including + * all the standard HTTP headers. + */ + @Test + public void testListXAttrKeys() throws Throwable { + List xAttrs = headerProcessing.listXAttrs(MAGIC_PATH); + Assertions.assertThat(xAttrs) + .describedAs("Attribute keys") + .contains(RETRIEVED_XATTRS); + } + + /** + * Filtering is on attribute key, not header. + */ + @Test + public void testGetFilteredXAttrs() throws Throwable { + Map xAttrs = headerProcessing.getXAttrs(MAGIC_PATH, + Lists.list(XA_MAGIC_MARKER, XA_CONTENT_LENGTH, "unknown")); + Assertions.assertThat(xAttrs.keySet()) + .describedAs("Attribute keys") + .containsExactlyInAnyOrder(XA_MAGIC_MARKER, XA_CONTENT_LENGTH); + // and the values are good + assertLongAttributeValue( + XA_MAGIC_MARKER, + xAttrs.get(XA_MAGIC_MARKER), + MAGIC_LEN); + assertLongAttributeValue( + XA_CONTENT_LENGTH, + xAttrs.get(XA_CONTENT_LENGTH), + FILE_LENGTH); + } + + /** + * An empty list of keys results in empty results. + */ + @Test + public void testFilterEmptyXAttrs() throws Throwable { + Map xAttrs = headerProcessing.getXAttrs(MAGIC_PATH, + Lists.list()); + Assertions.assertThat(xAttrs.keySet()) + .describedAs("Attribute keys") + .isEmpty(); + } + + /** + * Add two headers to the metadata, then verify that + * the magic marker header is copied, but not the other header. + */ + @Test + public void testMetadataCopySkipsMagicAttribute() throws Throwable { + + final String owner = "x-header-owner"; + final String root = "root"; + CONTEXT_ACCESSORS.userHeaders.put(owner, root); + final ObjectMetadata source = context.getContextAccessors() + .getObjectMetadata(MAGIC_KEY); + final Map sourceUserMD = source.getUserMetadata(); + Assertions.assertThat(sourceUserMD.get(owner)) + .describedAs("owner header in copied MD") + .isEqualTo(root); + + ObjectMetadata dest = new ObjectMetadata(); + headerProcessing.cloneObjectMetadata(source, dest); + + Assertions.assertThat(dest.getUserMetadata().get(X_HEADER_MAGIC_MARKER)) + .describedAs("Magic marker header in copied MD") + .isNull(); + Assertions.assertThat(dest.getUserMetadata().get(owner)) + .describedAs("owner header in copied MD") + .isEqualTo(root); + } + + /** + * Assert that an XAttr has a specific long value. + * @param key attribute key + * @param bytes bytes of the attribute. + * @param expected expected numeric value. + */ + private void assertLongAttributeValue( + final String key, + final byte[] bytes, + final long expected) { + Assertions.assertThat(extractXAttrLongValue(bytes)) + .describedAs("XAttribute " + key) + .isNotEmpty() + .hasValue(expected); + } + + /** + * Assert that a retrieved XAttr has a specific long value. + * @param key attribute key + * @param expected expected numeric value. + */ + protected void assertAttributeHasValue(final String key, + final long expected) + throws IOException { + assertLongAttributeValue( + key, + headerProcessing.getXAttr(MAGIC_PATH, key), + expected); + } + + /** + * Context accessor with XAttrs returned for the {@link #MAGIC_PATH} + * path. + */ + private static final class XAttrContextAccessor + implements ContextAccessors { + + private final Map userHeaders = new HashMap<>(); + + private long len; + private Date date = new Date(currentTimeMillis()); + + @Override + public Path keyToPath(final String key) { + return new Path("s3a://bucket/" + key); + } + + @Override + public String pathToKey(final Path path) { + // key is path with leading / stripped. + String key = path.toUri().getPath(); + return key.length() > 1 ? key.substring(1) : key; + } + + @Override + public File createTempFile(final String prefix, final long size) + throws IOException { + throw new UnsupportedOperationException("unsppported"); + } + + @Override + public String getBucketLocation() throws IOException { + return null; + } + + @Override + public Path makeQualified(final Path path) { + return path; + } + + @Override + public ObjectMetadata getObjectMetadata(final String key) + throws IOException { + if (MAGIC_KEY.equals(key)) { + ObjectMetadata omd = new ObjectMetadata(); + omd.setUserMetadata(userHeaders); + omd.setContentLength(len); + omd.setLastModified(date); + return omd; + } else { + throw new FileNotFoundException(key); + } + } + + public void setHeader(String key, String val) { + userHeaders.put(key, val); + } + } + +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java index a2e7031a1b276..42714cb1555e4 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java @@ -29,6 +29,7 @@ import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.MultiObjectDeleteException; import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; +import com.amazonaws.services.s3.model.ObjectMetadata; import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.Test; @@ -226,7 +227,8 @@ public void testProcessDeleteFailure() throws Throwable { } - private static class MinimalContextAccessor implements ContextAccessors { + private static final class MinimalContextAccessor + implements ContextAccessors { @Override public Path keyToPath(final String key) { @@ -253,6 +255,12 @@ public String getBucketLocation() throws IOException { public Path makeQualified(final Path path) { return path; } + + @Override + public ObjectMetadata getObjectMetadata(final String key) + throws IOException { + return new ObjectMetadata(); + } } }