Skip to content

Commit

Permalink
Translate failures for exchange spooling on Azure
Browse files Browse the repository at this point in the history
  • Loading branch information
linzebing authored and arhimondr committed May 31, 2022
1 parent 2df4445 commit 97dce6a
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.exchange.filesystem;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;

import java.io.IOException;

import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.concurrent.MoreFutures.asVoid;

public final class FileSystemExchangeFutures
{
private FileSystemExchangeFutures() {}

// Helper function that translates exception and transform future type to avoid abstraction leak
public static ListenableFuture<Void> translateFailures(ListenableFuture<?> listenableFuture)
{
return asVoid(Futures.catchingAsync(listenableFuture, Throwable.class, throwable -> {
if (throwable instanceof Error || throwable instanceof IOException) {
return immediateFailedFuture(throwable);
}
else {
return immediateFailedFuture(new IOException(throwable));
}
}, directExecutor()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.airlift.concurrent.MoreFutures.toListenableFuture;
import static io.airlift.slice.SizeOf.estimatedSizeOf;
import static io.trino.plugin.exchange.filesystem.FileSystemExchangeFutures.translateFailures;
import static io.trino.plugin.exchange.filesystem.FileSystemExchangeManager.PATH_SEPARATOR;
import static java.lang.Math.min;
import static java.lang.Math.toIntExact;
Expand Down Expand Up @@ -153,7 +154,7 @@ public ListenableFuture<Void> createEmptyFile(URI file)
{
String containerName = getContainerName(file);
String blobName = getPath(file);
return asVoid(toListenableFuture(blobServiceAsyncClient
return translateFailures(toListenableFuture(blobServiceAsyncClient
.getBlobContainerAsyncClient(containerName)
.getBlobAsyncClient(blobName)
.upload(BinaryData.fromString(""))
Expand Down Expand Up @@ -188,7 +189,7 @@ public ListenableFuture<Void> deleteRecursively(List<URI> directories)
directExecutor()));
}

return asVoid(Futures.allAsList(deleteObjectsFutures.build()));
return translateFailures(Futures.allAsList(deleteObjectsFutures.build()));
}

@Override
Expand Down Expand Up @@ -521,15 +522,15 @@ public ListenableFuture<Void> write(Slice slice)

// Skip multipart upload if there would only be one part
if (slice.length() < blockSize && multiPartUploadFutures.isEmpty()) {
directUploadFuture = asVoid(toListenableFuture(blockBlobAsyncClient.upload(Flux.just(slice.toByteBuffer()), slice.length()).toFuture()));
directUploadFuture = translateFailures(toListenableFuture(blockBlobAsyncClient.upload(Flux.just(slice.toByteBuffer()), slice.length()).toFuture()));
return directUploadFuture;
}

String blockId = Base64.getEncoder().encodeToString(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));
ListenableFuture<Void> uploadFuture = toListenableFuture(blockBlobAsyncClient.stageBlock(blockId, Flux.just(slice.toByteBuffer()), slice.length()).toFuture());
multiPartUploadFutures.add(uploadFuture);
blockIds.add(blockId);
return uploadFuture;
return translateFailures(uploadFuture);
}

@Override
Expand All @@ -543,10 +544,10 @@ public ListenableFuture<Void> finish()
return requireNonNullElseGet(directUploadFuture, Futures::immediateVoidFuture);
}

ListenableFuture<Void> finishFuture = Futures.transformAsync(
ListenableFuture<Void> finishFuture = translateFailures(Futures.transformAsync(
Futures.allAsList(multiPartUploadFutures),
ignored -> asVoid(toListenableFuture(blockBlobAsyncClient.commitBlockList(blockIds).toFuture())),
directExecutor());
ignored -> toListenableFuture(blockBlobAsyncClient.commitBlockList(blockIds).toFuture()),
directExecutor()));
Futures.addCallback(finishFuture, new FutureCallback<>() {
@Override
public void onSuccess(Void result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@
import static com.google.common.base.Strings.nullToEmpty;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static io.airlift.concurrent.MoreFutures.asVoid;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.airlift.concurrent.MoreFutures.toListenableFuture;
import static io.airlift.concurrent.Threads.threadsNamed;
import static io.trino.plugin.exchange.filesystem.FileSystemExchangeFutures.translateFailures;
import static io.trino.plugin.exchange.filesystem.FileSystemExchangeManager.PATH_SEPARATOR;
import static io.trino.plugin.exchange.filesystem.s3.S3FileSystemExchangeStorage.CompatibilityMode.GCP;
import static io.trino.plugin.exchange.filesystem.s3.S3RequestUtil.configureEncryption;
Expand Down Expand Up @@ -238,7 +238,7 @@ public ListenableFuture<Void> createEmptyFile(URI file)
.key(keyFromUri(file))
.build();

return stats.getCreateEmptyFile().record(transformFuture(toListenableFuture(s3AsyncClient.putObject(request, AsyncRequestBody.empty()))));
return stats.getCreateEmptyFile().record(translateFailures(toListenableFuture(s3AsyncClient.putObject(request, AsyncRequestBody.empty()))));
}

@Override
Expand Down Expand Up @@ -285,7 +285,7 @@ public ListenableFuture<Void> deleteRecursively(List<URI> directories)
.collect(toImmutableList())),
directExecutor()));
}
return transformFuture(Futures.allAsList(deleteObjectsFutures.build()));
return translateFailures(Futures.allAsList(deleteObjectsFutures.build()));
}
}

Expand Down Expand Up @@ -456,19 +456,6 @@ private static String keyFromUri(URI uri)
return key;
}

// Helper function that translates exception and transform future type to avoid abstraction leak
private static ListenableFuture<Void> transformFuture(ListenableFuture<?> listenableFuture)
{
return asVoid(Futures.catchingAsync(listenableFuture, Throwable.class, throwable -> {
if (throwable instanceof Error || throwable instanceof IOException) {
return immediateFailedFuture(throwable);
}
else {
return immediateFailedFuture(new IOException(throwable));
}
}, directExecutor()));
}

private static boolean isDirectory(URI uri)
{
return uri.toString().endsWith(PATH_SEPARATOR);
Expand Down Expand Up @@ -787,7 +774,7 @@ public ListenableFuture<Void> write(Slice slice)
.key(key)
.storageClass(storageClass);
configureEncryption(secretKey, putObjectRequestBuilder);
directUploadFuture = transformFuture(toListenableFuture(s3AsyncClient.putObject(putObjectRequestBuilder.build(),
directUploadFuture = translateFailures(toListenableFuture(s3AsyncClient.putObject(putObjectRequestBuilder.build(),
ByteBufferAsyncRequestBody.fromByteBuffer(slice.toByteBuffer()))));
stats.getPutObject().record(directUploadFuture);
stats.getPutObjectDataSizeInBytes().add(slice.length());
Expand All @@ -802,7 +789,7 @@ public ListenableFuture<Void> write(Slice slice)
ListenableFuture<CompletedPart> uploadFuture = Futures.transformAsync(multiPartUploadIdFuture, uploadId -> uploadPart(uploadId, slice, partNum), directExecutor());
multiPartUploadFutures.add(uploadFuture);

return transformFuture(uploadFuture);
return translateFailures(uploadFuture);
}

@Override
Expand All @@ -816,7 +803,7 @@ public ListenableFuture<Void> finish()
return requireNonNullElseGet(directUploadFuture, Futures::immediateVoidFuture);
}

ListenableFuture<Void> finishFuture = transformFuture(Futures.transformAsync(
ListenableFuture<Void> finishFuture = translateFailures(Futures.transformAsync(
Futures.allAsList(multiPartUploadFutures),
completedParts -> completeMultipartUpload(getFutureValue(multiPartUploadIdFuture), completedParts),
directExecutor()));
Expand Down Expand Up @@ -853,7 +840,7 @@ public ListenableFuture<Void> abort()

verify(directUploadFuture == null);
multiPartUploadFutures.forEach(future -> future.cancel(true));
return transformFuture(Futures.transformAsync(multiPartUploadIdFuture, this::abortMultipartUpload, directExecutor()));
return translateFailures(Futures.transformAsync(multiPartUploadIdFuture, this::abortMultipartUpload, directExecutor()));
}

@Override
Expand Down

0 comments on commit 97dce6a

Please sign in to comment.