-
Notifications
You must be signed in to change notification settings - Fork 8.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-18382. SDK upgrade prerequisites #4698
Changes from all commits
21b8741
5c56159
952bbcc
e877e68
36c1f6e
8b7ae99
eb95a0a
5f81bce
f20831f
fa3775c
fe5e2e3
4c652b1
b385452
ca3150f
8ded46f
71ac6f8
bd80a4f
15fe8d9
9619ceb
eb0666a
3b87159
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,6 +54,8 @@ | |
import com.amazonaws.services.s3.AmazonS3; | ||
import com.amazonaws.services.s3.Headers; | ||
import com.amazonaws.services.s3.model.CannedAccessControlList; | ||
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; | ||
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; | ||
import com.amazonaws.services.s3.model.CopyObjectRequest; | ||
import com.amazonaws.services.s3.model.DeleteObjectsRequest; | ||
import com.amazonaws.services.s3.model.DeleteObjectsResult; | ||
|
@@ -70,6 +72,8 @@ | |
import com.amazonaws.services.s3.model.PutObjectRequest; | ||
import com.amazonaws.services.s3.model.PutObjectResult; | ||
import com.amazonaws.services.s3.model.S3Object; | ||
import com.amazonaws.services.s3.model.SelectObjectContentRequest; | ||
import com.amazonaws.services.s3.model.SelectObjectContentResult; | ||
import com.amazonaws.services.s3.model.StorageClass; | ||
import com.amazonaws.services.s3.model.UploadPartRequest; | ||
import com.amazonaws.services.s3.model.UploadPartResult; | ||
|
@@ -127,6 +131,7 @@ | |
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum; | ||
import org.apache.hadoop.fs.s3a.impl.StoreContext; | ||
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder; | ||
import org.apache.hadoop.fs.s3a.impl.V2Migration; | ||
import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream; | ||
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations; | ||
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl; | ||
|
@@ -877,6 +882,7 @@ public Listing getListing() { | |
* @param dtEnabled are delegation tokens enabled? | ||
* @throws IOException failure. | ||
*/ | ||
@SuppressWarnings("deprecation") | ||
private void bindAWSClient(URI name, boolean dtEnabled) throws IOException { | ||
Configuration conf = getConf(); | ||
credentials = null; | ||
|
@@ -889,6 +895,7 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException { | |
// with it if so. | ||
|
||
LOG.debug("Using delegation tokens"); | ||
V2Migration.v1DelegationTokenCredentialProvidersUsed(); | ||
S3ADelegationTokens tokens = new S3ADelegationTokens(); | ||
this.delegationTokens = Optional.of(tokens); | ||
tokens.bindToFileSystem(getCanonicalUri(), | ||
|
@@ -1216,7 +1223,7 @@ public int getDefaultPort() { | |
* This is for internal use within the S3A code itself. | ||
* @return AmazonS3Client | ||
*/ | ||
AmazonS3 getAmazonS3Client() { | ||
private AmazonS3 getAmazonS3Client() { | ||
return s3; | ||
} | ||
|
||
|
@@ -1230,6 +1237,7 @@ AmazonS3 getAmazonS3Client() { | |
@VisibleForTesting | ||
public AmazonS3 getAmazonS3ClientForTesting(String reason) { | ||
LOG.warn("Access to S3A client requested, reason {}", reason); | ||
V2Migration.v1S3ClientRequested(); | ||
return s3; | ||
} | ||
|
||
|
@@ -1614,6 +1622,25 @@ public <T> CompletableFuture<T> submit(final CallableRaisingIOE<T> operation) { | |
} | ||
} | ||
|
||
/** | ||
* Callbacks for WriteOperationHelper. | ||
*/ | ||
private final class WriteOperationHelperCallbacksImpl | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i like this. yes, it makes for a bigger patch, but it was needed anyway |
||
implements WriteOperationHelper.WriteOperationHelperCallbacks { | ||
|
||
@Override | ||
public SelectObjectContentResult selectObjectContent(SelectObjectContentRequest request) { | ||
return s3.selectObjectContent(request); | ||
} | ||
|
||
@Override | ||
public CompleteMultipartUploadResult completeMultipartUpload( | ||
CompleteMultipartUploadRequest request) { | ||
return s3.completeMultipartUpload(request); | ||
} | ||
} | ||
|
||
|
||
/** | ||
* Create the read context for reading from the referenced file, | ||
* using FS state as well as the status. | ||
|
@@ -1838,7 +1865,8 @@ public WriteOperationHelper createWriteOperationHelper(AuditSpan auditSpan) { | |
getConf(), | ||
statisticsContext, | ||
getAuditSpanSource(), | ||
auditSpan); | ||
auditSpan, | ||
new WriteOperationHelperCallbacksImpl()); | ||
} | ||
|
||
/** | ||
|
@@ -2319,6 +2347,7 @@ public int getMaxKeys() { | |
@Retries.RetryTranslated | ||
@InterfaceStability.Evolving | ||
public ObjectMetadata getObjectMetadata(Path path) throws IOException { | ||
V2Migration.v1GetObjectMetadataCalled(); | ||
return trackDurationAndSpan(INVOCATION_GET_FILE_STATUS, path, () -> | ||
getObjectMetadata(makeQualified(path), null, invoker, | ||
"getObjectMetadata")); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,6 +46,7 @@ | |
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; | ||
import org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider; | ||
import org.apache.hadoop.fs.s3a.impl.NetworkBinding; | ||
import org.apache.hadoop.fs.s3a.impl.V2Migration; | ||
import org.apache.hadoop.fs.s3native.S3xLoginHelper; | ||
import org.apache.hadoop.net.ConnectTimeoutException; | ||
import org.apache.hadoop.security.ProviderUtils; | ||
|
@@ -551,6 +552,7 @@ public static long dateToLong(final Date date) { | |
/** | ||
* The standard AWS provider list for AWS connections. | ||
*/ | ||
@SuppressWarnings("deprecation") | ||
public static final List<Class<?>> | ||
STANDARD_AWS_PROVIDERS = Collections.unmodifiableList( | ||
Arrays.asList( | ||
|
@@ -637,6 +639,10 @@ public static AWSCredentialProviderList buildAWSProviderList( | |
AWSCredentialProviderList providers = new AWSCredentialProviderList(); | ||
for (Class<?> aClass : awsClasses) { | ||
|
||
if (aClass.getName().contains(AWS_AUTH_CLASS_PREFIX)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. again, log through the proposed LogOnce log |
||
V2Migration.v1ProviderReferenced(aClass.getName()); | ||
} | ||
|
||
if (forbidden.contains(aClass)) { | ||
throw new IOException(E_FORBIDDEN_AWS_PROVIDER | ||
+ " in option " + key + ": " + aClass); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -144,20 +144,26 @@ public class WriteOperationHelper implements WriteOperations { | |
*/ | ||
private final RequestFactory requestFactory; | ||
|
||
/** | ||
* WriteOperationHelper callbacks. | ||
*/ | ||
private final WriteOperationHelperCallbacks writeOperationHelperCallbacks; | ||
|
||
/** | ||
* Constructor. | ||
* @param owner owner FS creating the helper | ||
* @param conf Configuration object | ||
* @param statisticsContext statistics context | ||
* @param auditSpanSource source of spans | ||
* @param auditSpan span to activate | ||
* | ||
* @param writeOperationHelperCallbacks callbacks used by writeOperationHelper | ||
*/ | ||
protected WriteOperationHelper(S3AFileSystem owner, | ||
Configuration conf, | ||
S3AStatisticsContext statisticsContext, | ||
final AuditSpanSource auditSpanSource, | ||
final AuditSpan auditSpan) { | ||
final AuditSpan auditSpan, | ||
final WriteOperationHelperCallbacks writeOperationHelperCallbacks) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: needs a javadoc entry |
||
this.owner = owner; | ||
this.invoker = new Invoker(new S3ARetryPolicy(conf), | ||
this::operationRetried); | ||
|
@@ -168,6 +174,7 @@ protected WriteOperationHelper(S3AFileSystem owner, | |
this.auditSpanSource = auditSpanSource; | ||
this.auditSpan = checkNotNull(auditSpan); | ||
this.requestFactory = owner.getRequestFactory(); | ||
this.writeOperationHelperCallbacks = writeOperationHelperCallbacks; | ||
} | ||
|
||
/** | ||
|
@@ -359,8 +366,7 @@ private CompleteMultipartUploadResult finalizeMultipartUpload( | |
final CompleteMultipartUploadRequest request = | ||
getRequestFactory().newCompleteMultipartUploadRequest( | ||
destKey, uploadId, partETags); | ||
return owner.getAmazonS3Client().completeMultipartUpload( | ||
request); | ||
return writeOperationHelperCallbacks.completeMultipartUpload(request); | ||
}); | ||
owner.finishedWrite(destKey, length, uploadResult.getETag(), | ||
uploadResult.getVersionId(), | ||
|
@@ -716,7 +722,7 @@ public SelectObjectContentResult select( | |
try (DurationInfo ignored = | ||
new DurationInfo(LOG, "S3 Select operation")) { | ||
try { | ||
return owner.getAmazonS3Client().selectObjectContent(request); | ||
return writeOperationHelperCallbacks.selectObjectContent(request); | ||
} catch (AmazonS3Exception e) { | ||
LOG.error("Failure of S3 Select request against {}", | ||
source); | ||
|
@@ -758,4 +764,25 @@ public RequestFactory getRequestFactory() { | |
return requestFactory; | ||
} | ||
|
||
/*** | ||
* Callbacks for writeOperationHelper. | ||
*/ | ||
public interface WriteOperationHelperCallbacks { | ||
|
||
/** | ||
* Initiates a select request. | ||
* @param request selectObjectContent request | ||
* @return selectObjectContentResult | ||
*/ | ||
SelectObjectContentResult selectObjectContent(SelectObjectContentRequest request); | ||
|
||
/** | ||
* Initiates a complete multi-part upload request. | ||
* @param request Complete multi-part upload request | ||
* @return completeMultipartUploadResult | ||
*/ | ||
CompleteMultipartUploadResult completeMultipartUpload(CompleteMultipartUploadRequest request); | ||
|
||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have a getClient() call too; i will comment on that in the conversation