Skip to content

Commit

Permalink
repository-azure: revert the fix for #1734 once upstream solution is …
Browse files Browse the repository at this point in the history
…available (#2446) (#2475)

Signed-off-by: Andriy Redko <[email protected]>
(cherry picked from commit 12dd5d7)

Co-authored-by: Andriy Redko <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] and reta authored Mar 16, 2022
1 parent bdca667 commit b76e7ae
Show file tree
Hide file tree
Showing 16 changed files with 73 additions and 136 deletions.
14 changes: 7 additions & 7 deletions plugins/repository-azure/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,22 @@ opensearchplugin {
}

dependencies {
api 'com.azure:azure-core:1.22.0'
api 'com.azure:azure-storage-common:12.14.3'
api 'com.azure:azure-core-http-netty:1.11.7'
api 'com.azure:azure-core:1.26.0'
api 'com.azure:azure-storage-common:12.15.0'
api 'com.azure:azure-core-http-netty:1.11.8'
api "io.netty:netty-codec-dns:${versions.netty}"
api "io.netty:netty-codec-socks:${versions.netty}"
api "io.netty:netty-codec-http2:${versions.netty}"
api "io.netty:netty-handler-proxy:${versions.netty}"
api "io.netty:netty-resolver-dns:${versions.netty}"
api "io.netty:netty-transport-native-unix-common:${versions.netty}"
implementation project(':modules:transport-netty4')
api 'com.azure:azure-storage-blob:12.14.1'
api 'com.azure:azure-storage-blob:12.14.4'
api 'org.reactivestreams:reactive-streams:1.0.3'
api 'io.projectreactor:reactor-core:3.4.15'
api 'io.projectreactor.netty:reactor-netty:1.0.13'
api 'io.projectreactor.netty:reactor-netty-core:1.0.13'
api 'io.projectreactor.netty:reactor-netty-http:1.0.13'
api 'io.projectreactor.netty:reactor-netty:1.0.16'
api 'io.projectreactor.netty:reactor-netty-core:1.0.16'
api 'io.projectreactor.netty:reactor-netty-http:1.0.16'
api "org.slf4j:slf4j-api:${versions.slf4j}"
api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson}"
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
461b89dcf8948a0c4a97d4f1d876f778d0cac7aa

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0ea66d4531fb41cb3b5ab55e2e7b7f301e7f8503

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2b92020693d09e4980b96d278e8038a1087afea0

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
4d63ce8bbd20379c5e5262b1204ceac7b31a7743

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
d90829f6127966b0c35c4a3e8e23ca9ed29cd8a5

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
8f842a912677f2bc614ff60fb9e786d4fa429c34

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
93edb9a1dc774d843551a616e0f316e11ffa81ed
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import com.azure.core.http.HttpMethod;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.rest.PagedResponse;
import com.azure.core.http.rest.Response;
import com.azure.core.util.Context;
import com.azure.storage.blob.BlobClient;
Expand All @@ -52,7 +51,6 @@
import com.azure.storage.blob.options.BlobParallelUploadOptions;
import com.azure.storage.common.implementation.Constants;

import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.util.Throwables;
Expand Down Expand Up @@ -84,7 +82,6 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -220,71 +217,50 @@ public DeleteResult deleteBlobDirectory(String path, Executor executor) throws U
final ListBlobsOptions listBlobsOptions = new ListBlobsOptions().setPrefix(path);

SocketAccess.doPrivilegedVoidException(() -> {
String continuationToken = null;

do {
// Fetch one page at a time, others are going to be fetched by continuation token
// TODO: reconsider reverting to simplified approach once https://github.com/Azure/azure-sdk-for-java/issues/26064
// gets addressed.
final Optional<PagedResponse<BlobItem>> pageOpt = blobContainer.listBlobs(listBlobsOptions, timeout())
.streamByPage(continuationToken)
.findFirst();

if (!pageOpt.isPresent()) {
// No more pages, should never happen
break;
}

final PagedResponse<BlobItem> page = pageOpt.get();
for (final BlobItem blobItem : page.getValue()) {
// Skipping prefixes as those are not deletable and should not be there
assert (blobItem.isPrefix() == null || !blobItem.isPrefix()) : "Only blobs (not prefixes) are expected";

outstanding.incrementAndGet();
executor.execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
final long len = blobItem.getProperties().getContentLength();

final BlobClient azureBlob = blobContainer.getBlobClient(blobItem.getName());
logger.trace(
() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blobItem.getName())
);
final Response<Void> response = azureBlob.deleteWithResponse(null, null, timeout(), client.v2().get());
logger.trace(
() -> new ParameterizedMessage(
"container [{}]: blob [{}] deleted status [{}].",
container,
blobItem.getName(),
response.getStatusCode()
)
);

blobsDeleted.incrementAndGet();
if (len >= 0) {
bytesDeleted.addAndGet(len);
}
for (final BlobItem blobItem : blobContainer.listBlobs(listBlobsOptions, timeout())) {
// Skipping prefixes as those are not deletable and should not be there
assert (blobItem.isPrefix() == null || !blobItem.isPrefix()) : "Only blobs (not prefixes) are expected";

outstanding.incrementAndGet();
executor.execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
final long len = blobItem.getProperties().getContentLength();

final BlobClient azureBlob = blobContainer.getBlobClient(blobItem.getName());
logger.trace(
() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blobItem.getName())
);
final Response<Void> response = azureBlob.deleteWithResponse(null, null, timeout(), client.v2().get());
logger.trace(
() -> new ParameterizedMessage(
"container [{}]: blob [{}] deleted status [{}].",
container,
blobItem.getName(),
response.getStatusCode()
)
);

blobsDeleted.incrementAndGet();
if (len >= 0) {
bytesDeleted.addAndGet(len);
}
}

@Override
public void onFailure(Exception e) {
exceptions.add(e);
}
@Override
public void onFailure(Exception e) {
exceptions.add(e);
}

@Override
public void onAfter() {
if (outstanding.decrementAndGet() == 0) {
result.onResponse(null);
}
@Override
public void onAfter() {
if (outstanding.decrementAndGet() == 0) {
result.onResponse(null);
}
});
}

// Fetch next continuation token
continuationToken = page.getContinuationToken();
} while (StringUtils.isNotBlank(continuationToken));
}
});
}
});

if (outstanding.decrementAndGet() == 0) {
result.onResponse(null);
}
Expand Down Expand Up @@ -325,39 +301,19 @@ public Map<String, BlobMetadata> listBlobsByPrefix(String keyPath, String prefix
.setPrefix(keyPath + (prefix == null ? "" : prefix));

SocketAccess.doPrivilegedVoidException(() -> {
String continuationToken = null;

do {
// Fetch one page at a time, others are going to be fetched by continuation token
// TODO: reconsider reverting to simplified approach once https://github.com/Azure/azure-sdk-for-java/issues/26064
// gets addressed
final Optional<PagedResponse<BlobItem>> pageOpt = blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())
.streamByPage(continuationToken)
.findFirst();

if (!pageOpt.isPresent()) {
// No more pages, should never happen
break;
for (final BlobItem blobItem : blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())) {
// Skipping over the prefixes, only look for the blobs
if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
continue;
}

final PagedResponse<BlobItem> page = pageOpt.get();
for (final BlobItem blobItem : page.getValue()) {
// Skipping over the prefixes, only look for the blobs
if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
continue;
}
final String name = getBlobName(blobItem.getName(), container, keyPath);
logger.trace(() -> new ParameterizedMessage("blob name [{}]", name));

final String name = getBlobName(blobItem.getName(), container, keyPath);
logger.trace(() -> new ParameterizedMessage("blob name [{}]", name));

final BlobItemProperties properties = blobItem.getProperties();
logger.trace(() -> new ParameterizedMessage("blob name [{}], size [{}]", name, properties.getContentLength()));
blobsBuilder.put(name, new PlainBlobMetadata(name, properties.getContentLength()));
}

// Fetch next continuation token
continuationToken = page.getContinuationToken();
} while (StringUtils.isNotBlank(continuationToken));
final BlobItemProperties properties = blobItem.getProperties();
logger.trace(() -> new ParameterizedMessage("blob name [{}], size [{}]", name, properties.getContentLength()));
blobsBuilder.put(name, new PlainBlobMetadata(name, properties.getContentLength()));
}
});

return MapBuilder.newMapBuilder(blobsBuilder).immutableMap();
Expand All @@ -373,36 +329,17 @@ public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxExcept
.setPrefix(keyPath);

SocketAccess.doPrivilegedVoidException(() -> {
String continuationToken = null;

do {
// Fetch one page at a time, others are going to be fetched by continuation token
// TODO: reconsider reverting to simplified approach once https://github.com/Azure/azure-sdk-for-java/issues/26064
// gets addressed
final Optional<PagedResponse<BlobItem>> pageOpt = blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())
.streamByPage(continuationToken)
.findFirst();

if (!pageOpt.isPresent()) {
// No more pages, should never happen
break;
}

final PagedResponse<BlobItem> page = pageOpt.get();
for (final BlobItem blobItem : page.getValue()) {
// Skipping over the blobs, only look for prefixes
if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
// Expecting name in the form /container/keyPath.* and we want to strip off the /container/
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /.
// Lastly, we add the length of keyPath to the offset to strip this container's path.
final String name = getBlobName(blobItem.getName(), container, keyPath).replaceAll("/$", "");
logger.trace(() -> new ParameterizedMessage("blob name [{}]", name));
blobsBuilder.add(name);
}
for (final BlobItem blobItem : blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())) {
// Skipping over the blobs, only look for prefixes
if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
// Expecting name in the form /container/keyPath.* and we want to strip off the /container/
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /.
// Lastly, we add the length of keyPath to the offset to strip this container's path.
final String name = getBlobName(blobItem.getName(), container, keyPath).replaceAll("/$", "");
logger.trace(() -> new ParameterizedMessage("blob name [{}]", name));
blobsBuilder.add(name);
}
// Fetch next continuation token
continuationToken = page.getContinuationToken();
} while (StringUtils.isNotBlank(continuationToken));
}
});

return Collections.unmodifiableMap(
Expand Down

0 comments on commit b76e7ae

Please sign in to comment.