Skip to content

Commit

Permalink
[feature](Azure) Support copy into on Azure Blob Storage (#36554)
Browse files Browse the repository at this point in the history
  • Loading branch information
ByteYue authored and dataroaring committed Jun 26, 2024
1 parent 88b16a2 commit fc78ea4
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ private void analyzeStagePB(StagePB stagePB) throws AnalysisException {
}
brokerProperties.put(S3_BUCKET, objInfo.getBucket());
brokerProperties.put(S3_PREFIX, objInfo.getPrefix());
brokerProperties.put(S3Properties.PROVIDER, objInfo.getProvider().toString());
StageProperties stageProperties = new StageProperties(stagePB.getPropertiesMap());
this.copyIntoProperties.mergeProperties(stageProperties);
this.copyIntoProperties.analyze();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.cloud.storage;

import org.apache.doris.common.DdlException;

import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenCredential;
import com.azure.core.credential.TokenRequestContext;
import com.azure.core.http.rest.PagedIterable;
import com.azure.core.http.rest.PagedResponse;
import com.azure.core.http.rest.Response;
import com.azure.core.util.Context;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.batch.BlobBatch;
import com.azure.storage.blob.batch.BlobBatchClient;
import com.azure.storage.blob.batch.BlobBatchClientBuilder;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobProperties;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.ListBlobsOptions;
import com.azure.storage.blob.models.UserDelegationKey;
import com.azure.storage.blob.sas.BlobContainerSasPermission;
import com.azure.storage.blob.sas.BlobSasPermission;
import com.azure.storage.blob.sas.BlobServiceSasSignatureValues;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.common.sas.SasProtocol;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.http.HttpStatus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import reactor.core.publisher.Mono;

import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.List;

public class AzureRemote extends RemoteBase {

private static final Logger LOG = LogManager.getLogger(AzureRemote.class);

private static final String URI_TEMPLATE = "https://%s.blob.core.windows.net/%s";

private BlobContainerClient client;

public AzureRemote(ObjectInfo obj) {
super(obj);
}

@Override
public String getPresignedUrl(String fileName) {
try {
BlobContainerClientBuilder builder = new BlobContainerClientBuilder();
builder.credential(new StorageSharedKeyCredential(obj.getAk(), obj.getSk()));
String containerName = obj.getBucket();
String uri = String.format(URI_TEMPLATE, obj.getAk(),
containerName);
builder.endpoint(uri);
BlobContainerClient containerClient = builder.buildClient();

BlobClient blobClient = containerClient.getBlobClient(normalizePrefix(fileName));

OffsetDateTime expiryTime = OffsetDateTime.now().plusSeconds(SESSION_EXPIRE_SECOND);
BlobSasPermission permission = new BlobSasPermission()
.setReadPermission(true)
.setWritePermission(true)
.setDeletePermission(true);

BlobServiceSasSignatureValues sasValues = new BlobServiceSasSignatureValues(expiryTime, permission)
.setProtocol(SasProtocol.HTTPS_ONLY)
.setStartTime(OffsetDateTime.now());

String sasToken = blobClient.generateSas(sasValues);
return blobClient.getBlobUrl() + "?" + sasToken;
} catch (Exception e) {
e.getStackTrace();
}
return "";
}

@Override
public ListObjectsResult listObjects(String continuationToken) throws DdlException {
return listObjectsInner(normalizePrefix(), continuationToken);
}

@Override
public ListObjectsResult listObjects(String subPrefix, String continuationToken) throws DdlException {
return listObjectsInner(normalizePrefix(subPrefix), continuationToken);
}

@Override
public ListObjectsResult headObject(String subKey) throws DdlException {
initClient();
try {
String key = normalizePrefix(subKey);
BlobClient blobClient = client.getBlobClient(key);
BlobProperties properties = blobClient.getProperties();
ObjectFile objectFile = new ObjectFile(key, getRelativePath(key), properties.getETag(),
properties.getBlobSize());
return new ListObjectsResult(Lists.newArrayList(objectFile), false, null);
} catch (BlobStorageException e) {
if (e.getStatusCode() == HttpStatus.SC_NOT_FOUND) {
LOG.warn("NoSuchKey when head object for Azure, subKey={}", subKey);
return new ListObjectsResult(Lists.newArrayList(), false, null);
}
LOG.warn("Failed to head object for Azure, subKey={}", subKey, e);
throw new DdlException(
"Failed to head object for Azure, subKey=" + subKey + " Error message=" + e.getMessage());
}
}

@Override
public Triple<String, String, String> getStsToken() throws DdlException {
try {
BlobContainerClientBuilder builder = new BlobContainerClientBuilder();
builder.credential(new StorageSharedKeyCredential(obj.getAk(), obj.getSk()));
String containerName = obj.getBucket();
String uri = String.format(URI_TEMPLATE, obj.getAk(),
containerName);
builder.endpoint(uri);
BlobContainerClient containerClient = builder.buildClient();
BlobServiceClient blobServiceClient = containerClient.getServiceClient();

OffsetDateTime keyStart = OffsetDateTime.now();
OffsetDateTime keyExpiry = keyStart.plusSeconds(SESSION_EXPIRE_SECOND);
UserDelegationKey userDelegationKey = blobServiceClient.getUserDelegationKey(keyStart, keyExpiry);

OffsetDateTime expiryTime = OffsetDateTime.now().plusSeconds(SESSION_EXPIRE_SECOND);
BlobContainerSasPermission permission = new BlobContainerSasPermission()
.setReadPermission(true)
.setWritePermission(true)
.setListPermission(true);

BlobServiceSasSignatureValues sasValues = new BlobServiceSasSignatureValues(expiryTime, permission)
.setProtocol(SasProtocol.HTTPS_ONLY)
.setStartTime(OffsetDateTime.now());

String sasToken = containerClient.generateUserDelegationSas(sasValues, userDelegationKey);
return Triple.of(obj.getAk(), obj.getSk(), sasToken);
} catch (Throwable e) {
LOG.warn("Failed get Azure sts token", e);
throw new DdlException(e.getMessage());
}
}

@Override
public void deleteObjects(List<String> keys) throws DdlException {
checkDeleteKeys(keys);
initClient();

try {
BlobBatchClient blobBatchClient = new BlobBatchClientBuilder(
client.getServiceClient()).buildClient();
int maxDelete = 1000;
for (int i = 0; i < keys.size() / maxDelete + 1; i++) {
int cnt = 0;
BlobBatch batch = blobBatchClient.getBlobBatch();
for (int j = maxDelete * i; j < keys.size() && cnt < maxDelete; j++) {
batch.deleteBlob(client.getBlobContainerName(), keys.get(j));
cnt++;
}
Response<Void> resp = blobBatchClient.submitBatchWithResponse(batch, true, null, Context.NONE);
if (resp.getStatusCode() != HttpStatus.SC_OK) {
throw new DdlException(
"Failed delete objects, bucket=" + obj.getBucket());
}
}
} catch (BlobStorageException e) {
LOG.warn("Failed to delete objects for Azure", e);
throw new DdlException("Failed to delete objects for Azure, Error message=" + e.getMessage());
}
}

@Override
public void close() {
client = null;
}

@Override
public String toString() {
return "AzureRemote{obj=" + obj + '}';
}

private ListObjectsResult listObjectsInner(String prefix, String continuationToken) throws DdlException {
initClient();
try {
ListBlobsOptions options = new ListBlobsOptions().setPrefix(prefix);
PagedIterable<BlobItem> pagedBlobs = client.listBlobs(options, continuationToken, null);
PagedResponse<BlobItem> pagedResponse = pagedBlobs.iterableByPage().iterator().next();
List<ObjectFile> objectFiles = new ArrayList<>();

for (BlobItem blobItem : pagedResponse.getElements()) {
objectFiles.add(new ObjectFile(blobItem.getName(), getRelativePath(blobItem.getName()),
blobItem.getProperties().getETag(), blobItem.getProperties().getContentLength()));
}
return new ListObjectsResult(objectFiles, pagedResponse.getContinuationToken() == null,
pagedResponse.getContinuationToken());
} catch (BlobStorageException e) {
LOG.warn("Failed to list objects for Azure", e);
throw new DdlException("Failed to list objects for Azure, Error message=" + e.getMessage());
}
}

private void initClient() {
if (client == null) {
BlobContainerClientBuilder builder = new BlobContainerClientBuilder();
if (obj.getToken() != null) {
builder.credential(new SimpleTokenCredential(obj.getToken()));
} else {
builder.credential(new StorageSharedKeyCredential(obj.getAk(), obj.getSk()));
}
String containerName = obj.getBucket();
String uri = String.format(URI_TEMPLATE, obj.getAk(),
containerName);
builder.endpoint(uri);
client = builder.buildClient();
}
}

// Custom implementation of TokenCredential
private static class SimpleTokenCredential implements TokenCredential {
private static final Logger LOG = LogManager.getLogger(SimpleTokenCredential.class);
private final String token;

SimpleTokenCredential(String token) {
this.token = token;
}

@Override
public Mono<AccessToken> getToken(TokenRequestContext request) {
LOG.info("Getting token for scopes: {}", String.join(", ", request.getScopes()));
// Assume the token is valid for 1 hour from the current time
return Mono.just(new AccessToken(token, OffsetDateTime.now().plusSeconds(SESSION_EXPIRE_SECOND)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public String getPresignedUrl(String fileName) {
config.setCredentials(new DefaultBceCredentials(obj.getAk(), obj.getSk()));
config.setEndpoint(obj.getEndpoint());
BosClient client = new BosClient(config);
URL url = client.generatePresignedUrl(obj.getBucket(), normalizePrefix(fileName), 3600, HttpMethodName.PUT);
URL url = client.generatePresignedUrl(obj.getBucket(), normalizePrefix(fileName),
(int) SESSION_EXPIRE_SECOND, HttpMethodName.PUT);
LOG.info("Bos getPresignedUrl: {}", url);
return url.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public String getPresignedUrl(String fileName) {
clientConfig.setRegion(new Region(obj.getRegion()));
clientConfig.setHttpProtocol(HttpProtocol.https);
COSClient cosClient = new COSClient(cred, clientConfig);
Date expirationDate = new Date(System.currentTimeMillis() + 60 * 60 * 1000);
Date expirationDate = new Date(System.currentTimeMillis() + SESSION_EXPIRE_SECOND);
URL url = cosClient.generatePresignedUrl(obj.getBucket(),
normalizePrefix(fileName), expirationDate, HttpMethodName.PUT,
new HashMap<String, String>(), new HashMap<String, String>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ public String getPresignedUrl(String fileName) {
String sk = obj.getSk();

ObsClient obsClient = new ObsClient(ak, sk, endPoint);
long expireSeconds = 3600L;
TemporarySignatureRequest request = new TemporarySignatureRequest(HttpMethodEnum.PUT, expireSeconds);
TemporarySignatureRequest request = new TemporarySignatureRequest(
HttpMethodEnum.PUT, SESSION_EXPIRE_SECOND);
request.setBucketName(obj.getBucket());
request.setObjectKey(normalizePrefix(fileName));
request.setHeaders(new HashMap<String, String>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public String getPresignedUrl(String fileName) {
try {
GeneratePresignedUrlRequest request
= new GeneratePresignedUrlRequest(bucketName, objectName, HttpMethod.PUT);
Date expiration = new Date(new Date().getTime() + 3600 * 1000);
Date expiration = new Date(new Date().getTime() + SESSION_EXPIRE_SECOND * 1000);
request.setExpiration(expiration);
URL signedUrl = ossClient.generatePresignedUrl(request);
return signedUrl.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ public String toString() {

public ObjectInfo obj;

protected static long SESSION_EXPIRE_SECOND = 3600;

public RemoteBase(ObjectInfo obj) {
this.obj = obj;
}
Expand Down Expand Up @@ -149,6 +151,8 @@ public static RemoteBase newInstance(ObjectInfo obj) throws Exception {
return new ObsRemote(obj);
case BOS:
return new BosRemote(obj);
case AZURE:
return new AzureRemote(obj);
default:
throw new Exception("current not support obj : " + obj.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public String getPresignedUrl(String fileName) {
.build();

PutObjectPresignRequest presignRequest = PutObjectPresignRequest.builder()
.signatureDuration(Duration.ofMinutes(60))
.signatureDuration(Duration.ofSeconds(SESSION_EXPIRE_SECOND))
.putObjectRequest(objectRequest)
.build();

Expand Down

0 comments on commit fc78ea4

Please sign in to comment.