From fc78ea4f723fef773af63d0f0d06c240d02c1a22 Mon Sep 17 00:00:00 2001 From: AlexYue Date: Mon, 24 Jun 2024 21:58:35 +0800 Subject: [PATCH] [feature](Azure) Support copy into on Azure Blob Storage (#36554) --- .../org/apache/doris/analysis/CopyStmt.java | 1 + .../doris/cloud/storage/AzureRemote.java | 255 ++++++++++++++++++ .../apache/doris/cloud/storage/BosRemote.java | 3 +- .../apache/doris/cloud/storage/CosRemote.java | 2 +- .../apache/doris/cloud/storage/ObsRemote.java | 4 +- .../apache/doris/cloud/storage/OssRemote.java | 2 +- .../doris/cloud/storage/RemoteBase.java | 4 + .../apache/doris/cloud/storage/S3Remote.java | 2 +- 8 files changed, 267 insertions(+), 6 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/cloud/storage/AzureRemote.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java index 564d332d3217fc..80ba68ac575507 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java @@ -205,6 +205,7 @@ private void analyzeStagePB(StagePB stagePB) throws AnalysisException { } brokerProperties.put(S3_BUCKET, objInfo.getBucket()); brokerProperties.put(S3_PREFIX, objInfo.getPrefix()); + brokerProperties.put(S3Properties.PROVIDER, objInfo.getProvider().toString()); StageProperties stageProperties = new StageProperties(stagePB.getPropertiesMap()); this.copyIntoProperties.mergeProperties(stageProperties); this.copyIntoProperties.analyze(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/AzureRemote.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/AzureRemote.java new file mode 100644 index 00000000000000..0dc0cf60019c40 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/AzureRemote.java @@ -0,0 +1,255 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.storage; + +import org.apache.doris.common.DdlException; + +import com.azure.core.credential.AccessToken; +import com.azure.core.credential.TokenCredential; +import com.azure.core.credential.TokenRequestContext; +import com.azure.core.http.rest.PagedIterable; +import com.azure.core.http.rest.PagedResponse; +import com.azure.core.http.rest.Response; +import com.azure.core.util.Context; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobContainerClientBuilder; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.batch.BlobBatch; +import com.azure.storage.blob.batch.BlobBatchClient; +import com.azure.storage.blob.batch.BlobBatchClientBuilder; +import com.azure.storage.blob.models.BlobItem; +import com.azure.storage.blob.models.BlobProperties; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.models.ListBlobsOptions; +import com.azure.storage.blob.models.UserDelegationKey; +import com.azure.storage.blob.sas.BlobContainerSasPermission; +import com.azure.storage.blob.sas.BlobSasPermission; +import com.azure.storage.blob.sas.BlobServiceSasSignatureValues; +import com.azure.storage.common.StorageSharedKeyCredential; +import com.azure.storage.common.sas.SasProtocol; +import com.google.common.collect.Lists; +import org.apache.commons.lang3.tuple.Triple; +import org.apache.http.HttpStatus; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import reactor.core.publisher.Mono; + +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.List; + +public class AzureRemote extends RemoteBase { + + private static final Logger LOG = LogManager.getLogger(AzureRemote.class); + + private static final String URI_TEMPLATE = "https://%s.blob.core.windows.net/%s"; + + private BlobContainerClient client; + + public AzureRemote(ObjectInfo obj) { + super(obj); + } + + @Override + public String getPresignedUrl(String fileName) { + try { + BlobContainerClientBuilder builder = new BlobContainerClientBuilder(); + builder.credential(new StorageSharedKeyCredential(obj.getAk(), obj.getSk())); + String containerName = obj.getBucket(); + String uri = String.format(URI_TEMPLATE, obj.getAk(), + containerName); + builder.endpoint(uri); + BlobContainerClient containerClient = builder.buildClient(); + + BlobClient blobClient = containerClient.getBlobClient(normalizePrefix(fileName)); + + OffsetDateTime expiryTime = OffsetDateTime.now().plusSeconds(SESSION_EXPIRE_SECOND); + BlobSasPermission permission = new BlobSasPermission() + .setReadPermission(true) + .setWritePermission(true) + .setDeletePermission(true); + + BlobServiceSasSignatureValues sasValues = new BlobServiceSasSignatureValues(expiryTime, permission) + .setProtocol(SasProtocol.HTTPS_ONLY) + .setStartTime(OffsetDateTime.now()); + + String sasToken = blobClient.generateSas(sasValues); + return blobClient.getBlobUrl() + "?" + sasToken; + } catch (Exception e) { + e.getStackTrace(); + } + return ""; + } + + @Override + public ListObjectsResult listObjects(String continuationToken) throws DdlException { + return listObjectsInner(normalizePrefix(), continuationToken); + } + + @Override + public ListObjectsResult listObjects(String subPrefix, String continuationToken) throws DdlException { + return listObjectsInner(normalizePrefix(subPrefix), continuationToken); + } + + @Override + public ListObjectsResult headObject(String subKey) throws DdlException { + initClient(); + try { + String key = normalizePrefix(subKey); + BlobClient blobClient = client.getBlobClient(key); + BlobProperties properties = blobClient.getProperties(); + ObjectFile objectFile = new ObjectFile(key, getRelativePath(key), properties.getETag(), + properties.getBlobSize()); + return new ListObjectsResult(Lists.newArrayList(objectFile), false, null); + } catch (BlobStorageException e) { + if (e.getStatusCode() == HttpStatus.SC_NOT_FOUND) { + LOG.warn("NoSuchKey when head object for Azure, subKey={}", subKey); + return new ListObjectsResult(Lists.newArrayList(), false, null); + } + LOG.warn("Failed to head object for Azure, subKey={}", subKey, e); + throw new DdlException( + "Failed to head object for Azure, subKey=" + subKey + " Error message=" + e.getMessage()); + } + } + + @Override + public Triple getStsToken() throws DdlException { + try { + BlobContainerClientBuilder builder = new BlobContainerClientBuilder(); + builder.credential(new StorageSharedKeyCredential(obj.getAk(), obj.getSk())); + String containerName = obj.getBucket(); + String uri = String.format(URI_TEMPLATE, obj.getAk(), + containerName); + builder.endpoint(uri); + BlobContainerClient containerClient = builder.buildClient(); + BlobServiceClient blobServiceClient = containerClient.getServiceClient(); + + OffsetDateTime keyStart = OffsetDateTime.now(); + OffsetDateTime keyExpiry = keyStart.plusSeconds(SESSION_EXPIRE_SECOND); + UserDelegationKey userDelegationKey = blobServiceClient.getUserDelegationKey(keyStart, keyExpiry); + + OffsetDateTime expiryTime = OffsetDateTime.now().plusSeconds(SESSION_EXPIRE_SECOND); + BlobContainerSasPermission permission = new BlobContainerSasPermission() + .setReadPermission(true) + .setWritePermission(true) + .setListPermission(true); + + BlobServiceSasSignatureValues sasValues = new BlobServiceSasSignatureValues(expiryTime, permission) + .setProtocol(SasProtocol.HTTPS_ONLY) + .setStartTime(OffsetDateTime.now()); + + String sasToken = containerClient.generateUserDelegationSas(sasValues, userDelegationKey); + return Triple.of(obj.getAk(), obj.getSk(), sasToken); + } catch (Throwable e) { + LOG.warn("Failed get Azure sts token", e); + throw new DdlException(e.getMessage()); + } + } + + @Override + public void deleteObjects(List keys) throws DdlException { + checkDeleteKeys(keys); + initClient(); + + try { + BlobBatchClient blobBatchClient = new BlobBatchClientBuilder( + client.getServiceClient()).buildClient(); + int maxDelete = 1000; + for (int i = 0; i < keys.size() / maxDelete + 1; i++) { + int cnt = 0; + BlobBatch batch = blobBatchClient.getBlobBatch(); + for (int j = maxDelete * i; j < keys.size() && cnt < maxDelete; j++) { + batch.deleteBlob(client.getBlobContainerName(), keys.get(j)); + cnt++; + } + Response resp = blobBatchClient.submitBatchWithResponse(batch, true, null, Context.NONE); + if (resp.getStatusCode() != HttpStatus.SC_OK) { + throw new DdlException( + "Failed delete objects, bucket=" + obj.getBucket()); + } + } + } catch (BlobStorageException e) { + LOG.warn("Failed to delete objects for Azure", e); + throw new DdlException("Failed to delete objects for Azure, Error message=" + e.getMessage()); + } + } + + @Override + public void close() { + client = null; + } + + @Override + public String toString() { + return "AzureRemote{obj=" + obj + '}'; + } + + private ListObjectsResult listObjectsInner(String prefix, String continuationToken) throws DdlException { + initClient(); + try { + ListBlobsOptions options = new ListBlobsOptions().setPrefix(prefix); + PagedIterable pagedBlobs = client.listBlobs(options, continuationToken, null); + PagedResponse pagedResponse = pagedBlobs.iterableByPage().iterator().next(); + List objectFiles = new ArrayList<>(); + + for (BlobItem blobItem : pagedResponse.getElements()) { + objectFiles.add(new ObjectFile(blobItem.getName(), getRelativePath(blobItem.getName()), + blobItem.getProperties().getETag(), blobItem.getProperties().getContentLength())); + } + return new ListObjectsResult(objectFiles, pagedResponse.getContinuationToken() == null, + pagedResponse.getContinuationToken()); + } catch (BlobStorageException e) { + LOG.warn("Failed to list objects for Azure", e); + throw new DdlException("Failed to list objects for Azure, Error message=" + e.getMessage()); + } + } + + private void initClient() { + if (client == null) { + BlobContainerClientBuilder builder = new BlobContainerClientBuilder(); + if (obj.getToken() != null) { + builder.credential(new SimpleTokenCredential(obj.getToken())); + } else { + builder.credential(new StorageSharedKeyCredential(obj.getAk(), obj.getSk())); + } + String containerName = obj.getBucket(); + String uri = String.format(URI_TEMPLATE, obj.getAk(), + containerName); + builder.endpoint(uri); + client = builder.buildClient(); + } + } + + // Custom implementation of TokenCredential + private static class SimpleTokenCredential implements TokenCredential { + private static final Logger LOG = LogManager.getLogger(SimpleTokenCredential.class); + private final String token; + + SimpleTokenCredential(String token) { + this.token = token; + } + + @Override + public Mono getToken(TokenRequestContext request) { + LOG.info("Getting token for scopes: {}", String.join(", ", request.getScopes())); + // Assume the token is valid for 1 hour from the current time + return Mono.just(new AccessToken(token, OffsetDateTime.now().plusSeconds(SESSION_EXPIRE_SECOND))); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/BosRemote.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/BosRemote.java index c55eb9031fa8c8..b3da80e9bb6033 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/BosRemote.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/BosRemote.java @@ -42,7 +42,8 @@ public String getPresignedUrl(String fileName) { config.setCredentials(new DefaultBceCredentials(obj.getAk(), obj.getSk())); config.setEndpoint(obj.getEndpoint()); BosClient client = new BosClient(config); - URL url = client.generatePresignedUrl(obj.getBucket(), normalizePrefix(fileName), 3600, HttpMethodName.PUT); + URL url = client.generatePresignedUrl(obj.getBucket(), normalizePrefix(fileName), + (int) SESSION_EXPIRE_SECOND, HttpMethodName.PUT); LOG.info("Bos getPresignedUrl: {}", url); return url.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/CosRemote.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/CosRemote.java index 7d1aa27b7d9351..e541014e78efd6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/CosRemote.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/CosRemote.java @@ -56,7 +56,7 @@ public String getPresignedUrl(String fileName) { clientConfig.setRegion(new Region(obj.getRegion())); clientConfig.setHttpProtocol(HttpProtocol.https); COSClient cosClient = new COSClient(cred, clientConfig); - Date expirationDate = new Date(System.currentTimeMillis() + 60 * 60 * 1000); + Date expirationDate = new Date(System.currentTimeMillis() + SESSION_EXPIRE_SECOND); URL url = cosClient.generatePresignedUrl(obj.getBucket(), normalizePrefix(fileName), expirationDate, HttpMethodName.PUT, new HashMap(), new HashMap()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/ObsRemote.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/ObsRemote.java index 6f8e332f6f4111..053066c11bc33f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/ObsRemote.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/ObsRemote.java @@ -55,8 +55,8 @@ public String getPresignedUrl(String fileName) { String sk = obj.getSk(); ObsClient obsClient = new ObsClient(ak, sk, endPoint); - long expireSeconds = 3600L; - TemporarySignatureRequest request = new TemporarySignatureRequest(HttpMethodEnum.PUT, expireSeconds); + TemporarySignatureRequest request = new TemporarySignatureRequest( + HttpMethodEnum.PUT, SESSION_EXPIRE_SECOND); request.setBucketName(obj.getBucket()); request.setObjectKey(normalizePrefix(fileName)); request.setHeaders(new HashMap()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/OssRemote.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/OssRemote.java index 139d2bbd415adf..42e019c77ba5fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/OssRemote.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/OssRemote.java @@ -65,7 +65,7 @@ public String getPresignedUrl(String fileName) { try { GeneratePresignedUrlRequest request = new GeneratePresignedUrlRequest(bucketName, objectName, HttpMethod.PUT); - Date expiration = new Date(new Date().getTime() + 3600 * 1000); + Date expiration = new Date(new Date().getTime() + SESSION_EXPIRE_SECOND * 1000); request.setExpiration(expiration); URL signedUrl = ossClient.generatePresignedUrl(request); return signedUrl.toString(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/RemoteBase.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/RemoteBase.java index e146e52f534d94..b15c9fb35a8b68 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/RemoteBase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/RemoteBase.java @@ -116,6 +116,8 @@ public String toString() { public ObjectInfo obj; + protected static long SESSION_EXPIRE_SECOND = 3600; + public RemoteBase(ObjectInfo obj) { this.obj = obj; } @@ -149,6 +151,8 @@ public static RemoteBase newInstance(ObjectInfo obj) throws Exception { return new ObsRemote(obj); case BOS: return new BosRemote(obj); + case AZURE: + return new AzureRemote(obj); default: throw new Exception("current not support obj : " + obj.toString()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/S3Remote.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/S3Remote.java index 49d82ab5ae26d6..35e9dc7b39a64a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/S3Remote.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/storage/S3Remote.java @@ -54,7 +54,7 @@ public String getPresignedUrl(String fileName) { .build(); PutObjectPresignRequest presignRequest = PutObjectPresignRequest.builder() - .signatureDuration(Duration.ofMinutes(60)) + .signatureDuration(Duration.ofSeconds(SESSION_EXPIRE_SECOND)) .putObjectRequest(objectRequest) .build();