diff --git a/CHANGELOG.md b/CHANGELOG.md index 83be0cacf6cf0..bb571cc21cd5c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 - Bump `forbiddenapis` from 3.3 to 3.4 -- Bump `avro` from 1.11.0 to 1.11.1 +- Bump `avro` from 1.11.1 to 1.11.2 - Bump `woodstox-core` from 6.3.0 to 6.3.1 - Bump `xmlbeans` from 5.1.0 to 5.1.1 ([#4354](https://github.com/opensearch-project/OpenSearch/pull/4354)) - Bump `reactor-netty-core` from 1.0.19 to 1.0.22 ([#4447](https://github.com/opensearch-project/OpenSearch/pull/4447)) @@ -77,6 +77,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Support OpenSSL Provider with default Netty allocator ([#5460](https://github.com/opensearch-project/OpenSearch/pull/5460)) - Replaces ZipInputStream with ZipFile to fix Zip Slip vulnerability ([#7230](https://github.com/opensearch-project/OpenSearch/pull/7230)) - Add missing validation/parsing of SearchBackpressureMode of SearchBackpressureSettings ([#7541](https://github.com/opensearch-project/OpenSearch/pull/7541)) +- Adds log4j configuration for telemetry LogSpanExporter ([#8393](https://github.com/opensearch-project/OpenSearch/pull/8393)) ### Security @@ -95,6 +96,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add API to initialize extensions ([#8029]()https://github.com/opensearch-project/OpenSearch/pull/8029) - Add distributed tracing framework ([#7543](https://github.com/opensearch-project/OpenSearch/issues/7543)) - Enable Point based optimization for custom comparators ([#8168](https://github.com/opensearch-project/OpenSearch/pull/8168)) +- [Extensions] Support extension additional settings with extension REST initialization ([#8414](https://github.com/opensearch-project/OpenSearch/pull/8414)) +- Adds mock implementation for TelemetryPlugin ([#7545](https://github.com/opensearch-project/OpenSearch/issues/7545)) ### Dependencies - Bump `com.azure:azure-storage-common` from 12.21.0 to 12.21.1 (#7566, #7814) @@ -127,6 +130,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Upgrade] Lucene 9.7.0 release (#8272) - Bump `org.jboss.resteasy:resteasy-jackson2-provider` from 3.0.26.Final to 6.2.4.Final in /qa/wildfly ([#8209](https://github.com/opensearch-project/OpenSearch/pull/8209)) - Bump `com.google.api-client:google-api-client` from 1.34.0 to 2.2.0 ([#8276](https://github.com/opensearch-project/OpenSearch/pull/8276)) +- Update Apache HttpCore/ HttpClient and Apache HttpCore5 / HttpClient5 dependencies ([#8434](https://github.com/opensearch-project/OpenSearch/pull/8434)) +- Bump `org.apache.maven:maven-model` from 3.9.2 to 3.9.3 (#8403) ### Changed - Replace jboss-annotations-api_1.2_spec with jakarta.annotation-api ([#7836](https://github.com/opensearch-project/OpenSearch/pull/7836)) @@ -163,4 +168,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Security [Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD -[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.8...2.x +[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.8...2.x \ No newline at end of file diff --git a/buildSrc/build.gradle b/buildSrc/build.gradle index 35f3fb87560e7..47d4747d4b9a6 100644 --- a/buildSrc/build.gradle +++ b/buildSrc/build.gradle @@ -117,7 +117,7 @@ dependencies { api 'de.thetaphi:forbiddenapis:3.5.1' api 'com.avast.gradle:gradle-docker-compose-plugin:0.16.12' api "org.yaml:snakeyaml:${props.getProperty('snakeyaml')}" - api 'org.apache.maven:maven-model:3.9.2' + api 'org.apache.maven:maven-model:3.9.3' api 'com.networknt:json-schema-validator:1.0.85' api 'org.jruby.jcodings:jcodings:1.0.58' api 'org.jruby.joni:joni:2.2.1' diff --git a/buildSrc/version.properties b/buildSrc/version.properties index f9eac9516cb18..408b03e60cc5d 100644 --- a/buildSrc/version.properties +++ b/buildSrc/version.properties @@ -33,10 +33,10 @@ netty = 4.1.94.Final joda = 2.12.2 # client dependencies -httpclient5 = 5.1.4 -httpcore5 = 5.1.5 -httpclient = 4.5.13 -httpcore = 4.4.15 +httpclient5 = 5.2.1 +httpcore5 = 5.2.2 +httpclient = 4.5.14 +httpcore = 4.4.16 httpasyncclient = 4.1.5 commonslogging = 1.2 commonscodec = 1.15 diff --git a/client/rest/licenses/httpclient5-5.1.4.jar.sha1 b/client/rest/licenses/httpclient5-5.1.4.jar.sha1 deleted file mode 100644 index 3c0cb1335fb88..0000000000000 --- a/client/rest/licenses/httpclient5-5.1.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -208f9eed6d6ab709e2ae7a75b457ef60c0baefa5 \ No newline at end of file diff --git a/client/rest/licenses/httpclient5-5.2.1.jar.sha1 b/client/rest/licenses/httpclient5-5.2.1.jar.sha1 new file mode 100644 index 0000000000000..3555fe22f8e12 --- /dev/null +++ b/client/rest/licenses/httpclient5-5.2.1.jar.sha1 @@ -0,0 +1 @@ +0c900514d3446d9ce5d9dbd90c21192048125440 \ No newline at end of file diff --git a/client/rest/licenses/httpcore5-5.1.5.jar.sha1 b/client/rest/licenses/httpcore5-5.1.5.jar.sha1 deleted file mode 100644 index 8da253152e970..0000000000000 --- a/client/rest/licenses/httpcore5-5.1.5.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -df9da3a1fa2351c4790245400ed28d78a8ddd3fc \ No newline at end of file diff --git a/client/rest/licenses/httpcore5-5.2.2.jar.sha1 b/client/rest/licenses/httpcore5-5.2.2.jar.sha1 new file mode 100644 index 0000000000000..b641256c7d4a4 --- /dev/null +++ b/client/rest/licenses/httpcore5-5.2.2.jar.sha1 @@ -0,0 +1 @@ +6da28f5aa6c2b129ef49632e041a5203ce7507b2 \ No newline at end of file diff --git a/client/rest/licenses/httpcore5-h2-5.1.5.jar.sha1 b/client/rest/licenses/httpcore5-h2-5.1.5.jar.sha1 deleted file mode 100644 index 097e6cc2a3be8..0000000000000 --- a/client/rest/licenses/httpcore5-h2-5.1.5.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -624660339afd5006d427457e6b10b10b32fd86f1 \ No newline at end of file diff --git a/client/rest/licenses/httpcore5-h2-5.2.2.jar.sha1 b/client/rest/licenses/httpcore5-h2-5.2.2.jar.sha1 new file mode 100644 index 0000000000000..94bc0fa49bdb0 --- /dev/null +++ b/client/rest/licenses/httpcore5-h2-5.2.2.jar.sha1 @@ -0,0 +1 @@ +54ee1ed58fe8ac40be1083ea9873a6c734939ab9 \ No newline at end of file diff --git a/client/sniffer/licenses/httpclient5-5.1.4.jar.sha1 b/client/sniffer/licenses/httpclient5-5.1.4.jar.sha1 deleted file mode 100644 index 3c0cb1335fb88..0000000000000 --- a/client/sniffer/licenses/httpclient5-5.1.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -208f9eed6d6ab709e2ae7a75b457ef60c0baefa5 \ No newline at end of file diff --git a/client/sniffer/licenses/httpclient5-5.2.1.jar.sha1 b/client/sniffer/licenses/httpclient5-5.2.1.jar.sha1 new file mode 100644 index 0000000000000..3555fe22f8e12 --- /dev/null +++ b/client/sniffer/licenses/httpclient5-5.2.1.jar.sha1 @@ -0,0 +1 @@ +0c900514d3446d9ce5d9dbd90c21192048125440 \ No newline at end of file diff --git a/client/sniffer/licenses/httpcore5-5.1.5.jar.sha1 b/client/sniffer/licenses/httpcore5-5.1.5.jar.sha1 deleted file mode 100644 index 8da253152e970..0000000000000 --- a/client/sniffer/licenses/httpcore5-5.1.5.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -df9da3a1fa2351c4790245400ed28d78a8ddd3fc \ No newline at end of file diff --git a/client/sniffer/licenses/httpcore5-5.2.2.jar.sha1 b/client/sniffer/licenses/httpcore5-5.2.2.jar.sha1 new file mode 100644 index 0000000000000..b641256c7d4a4 --- /dev/null +++ b/client/sniffer/licenses/httpcore5-5.2.2.jar.sha1 @@ -0,0 +1 @@ +6da28f5aa6c2b129ef49632e041a5203ce7507b2 \ No newline at end of file diff --git a/plugins/discovery-azure-classic/licenses/httpclient-4.5.13.jar.sha1 b/plugins/discovery-azure-classic/licenses/httpclient-4.5.13.jar.sha1 deleted file mode 100644 index 3281e21595b39..0000000000000 --- a/plugins/discovery-azure-classic/licenses/httpclient-4.5.13.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -e5f6cae5ca7ecaac1ec2827a9e2d65ae2869cada \ No newline at end of file diff --git a/plugins/discovery-azure-classic/licenses/httpclient-4.5.14.jar.sha1 b/plugins/discovery-azure-classic/licenses/httpclient-4.5.14.jar.sha1 new file mode 100644 index 0000000000000..66e05851c2e3c --- /dev/null +++ b/plugins/discovery-azure-classic/licenses/httpclient-4.5.14.jar.sha1 @@ -0,0 +1 @@ +1194890e6f56ec29177673f2f12d0b8e627dec98 \ No newline at end of file diff --git a/plugins/discovery-azure-classic/licenses/httpcore-4.4.15.jar.sha1 b/plugins/discovery-azure-classic/licenses/httpcore-4.4.15.jar.sha1 deleted file mode 100644 index 42a03b5d7a376..0000000000000 --- a/plugins/discovery-azure-classic/licenses/httpcore-4.4.15.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -7f2e0c573eaa7a74bac2e89b359e1f73d92a0a1d \ No newline at end of file diff --git a/plugins/discovery-azure-classic/licenses/httpcore-4.4.16.jar.sha1 b/plugins/discovery-azure-classic/licenses/httpcore-4.4.16.jar.sha1 new file mode 100644 index 0000000000000..172110694b5bd --- /dev/null +++ b/plugins/discovery-azure-classic/licenses/httpcore-4.4.16.jar.sha1 @@ -0,0 +1 @@ +51cf043c87253c9f58b539c9f7e44c8894223850 \ No newline at end of file diff --git a/plugins/discovery-ec2/licenses/httpclient-4.5.13.jar.sha1 b/plugins/discovery-ec2/licenses/httpclient-4.5.13.jar.sha1 deleted file mode 100644 index 3281e21595b39..0000000000000 --- a/plugins/discovery-ec2/licenses/httpclient-4.5.13.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -e5f6cae5ca7ecaac1ec2827a9e2d65ae2869cada \ No newline at end of file diff --git a/plugins/discovery-ec2/licenses/httpclient-4.5.14.jar.sha1 b/plugins/discovery-ec2/licenses/httpclient-4.5.14.jar.sha1 new file mode 100644 index 0000000000000..66e05851c2e3c --- /dev/null +++ b/plugins/discovery-ec2/licenses/httpclient-4.5.14.jar.sha1 @@ -0,0 +1 @@ +1194890e6f56ec29177673f2f12d0b8e627dec98 \ No newline at end of file diff --git a/plugins/discovery-ec2/licenses/httpcore-4.4.15.jar.sha1 b/plugins/discovery-ec2/licenses/httpcore-4.4.15.jar.sha1 deleted file mode 100644 index 42a03b5d7a376..0000000000000 --- a/plugins/discovery-ec2/licenses/httpcore-4.4.15.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -7f2e0c573eaa7a74bac2e89b359e1f73d92a0a1d \ No newline at end of file diff --git a/plugins/discovery-ec2/licenses/httpcore-4.4.16.jar.sha1 b/plugins/discovery-ec2/licenses/httpcore-4.4.16.jar.sha1 new file mode 100644 index 0000000000000..172110694b5bd --- /dev/null +++ b/plugins/discovery-ec2/licenses/httpcore-4.4.16.jar.sha1 @@ -0,0 +1 @@ +51cf043c87253c9f58b539c9f7e44c8894223850 \ No newline at end of file diff --git a/plugins/discovery-gce/licenses/httpclient-4.5.13.jar.sha1 b/plugins/discovery-gce/licenses/httpclient-4.5.13.jar.sha1 deleted file mode 100644 index 3281e21595b39..0000000000000 --- a/plugins/discovery-gce/licenses/httpclient-4.5.13.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -e5f6cae5ca7ecaac1ec2827a9e2d65ae2869cada \ No newline at end of file diff --git a/plugins/discovery-gce/licenses/httpclient-4.5.14.jar.sha1 b/plugins/discovery-gce/licenses/httpclient-4.5.14.jar.sha1 new file mode 100644 index 0000000000000..66e05851c2e3c --- /dev/null +++ b/plugins/discovery-gce/licenses/httpclient-4.5.14.jar.sha1 @@ -0,0 +1 @@ +1194890e6f56ec29177673f2f12d0b8e627dec98 \ No newline at end of file diff --git a/plugins/discovery-gce/licenses/httpcore-4.4.15.jar.sha1 b/plugins/discovery-gce/licenses/httpcore-4.4.15.jar.sha1 deleted file mode 100644 index 42a03b5d7a376..0000000000000 --- a/plugins/discovery-gce/licenses/httpcore-4.4.15.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -7f2e0c573eaa7a74bac2e89b359e1f73d92a0a1d \ No newline at end of file diff --git a/plugins/discovery-gce/licenses/httpcore-4.4.16.jar.sha1 b/plugins/discovery-gce/licenses/httpcore-4.4.16.jar.sha1 new file mode 100644 index 0000000000000..172110694b5bd --- /dev/null +++ b/plugins/discovery-gce/licenses/httpcore-4.4.16.jar.sha1 @@ -0,0 +1 @@ +51cf043c87253c9f58b539c9f7e44c8894223850 \ No newline at end of file diff --git a/plugins/repository-hdfs/build.gradle b/plugins/repository-hdfs/build.gradle index 7ac54544f7a1b..3c83e535a91d8 100644 --- a/plugins/repository-hdfs/build.gradle +++ b/plugins/repository-hdfs/build.gradle @@ -66,7 +66,7 @@ dependencies { } api 'org.apache.htrace:htrace-core4:4.2.0-incubating' api "org.apache.logging.log4j:log4j-core:${versions.log4j}" - api 'org.apache.avro:avro:1.11.1' + api 'org.apache.avro:avro:1.11.2' api 'com.google.code.gson:gson:2.10.1' runtimeOnly "com.google.guava:guava:${versions.guava}" api "commons-logging:commons-logging:${versions.commonslogging}" diff --git a/plugins/repository-hdfs/licenses/avro-1.11.1.jar.sha1 b/plugins/repository-hdfs/licenses/avro-1.11.1.jar.sha1 deleted file mode 100644 index f03424516b44e..0000000000000 --- a/plugins/repository-hdfs/licenses/avro-1.11.1.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -81af5d4b9bdaaf4ba41bcb0df5241355ec34c630 \ No newline at end of file diff --git a/plugins/repository-hdfs/licenses/avro-1.11.2.jar.sha1 b/plugins/repository-hdfs/licenses/avro-1.11.2.jar.sha1 new file mode 100644 index 0000000000000..ce1a894e0ce6d --- /dev/null +++ b/plugins/repository-hdfs/licenses/avro-1.11.2.jar.sha1 @@ -0,0 +1 @@ +97e62e8be2b37e849f1bdb5a4f08121d47cc9806 \ No newline at end of file diff --git a/plugins/repository-s3/licenses/httpclient-4.5.13.jar.sha1 b/plugins/repository-s3/licenses/httpclient-4.5.13.jar.sha1 deleted file mode 100644 index 3281e21595b39..0000000000000 --- a/plugins/repository-s3/licenses/httpclient-4.5.13.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -e5f6cae5ca7ecaac1ec2827a9e2d65ae2869cada \ No newline at end of file diff --git a/plugins/repository-s3/licenses/httpclient-4.5.14.jar.sha1 b/plugins/repository-s3/licenses/httpclient-4.5.14.jar.sha1 new file mode 100644 index 0000000000000..66e05851c2e3c --- /dev/null +++ b/plugins/repository-s3/licenses/httpclient-4.5.14.jar.sha1 @@ -0,0 +1 @@ +1194890e6f56ec29177673f2f12d0b8e627dec98 \ No newline at end of file diff --git a/plugins/repository-s3/licenses/httpcore-4.4.15.jar.sha1 b/plugins/repository-s3/licenses/httpcore-4.4.15.jar.sha1 deleted file mode 100644 index 42a03b5d7a376..0000000000000 --- a/plugins/repository-s3/licenses/httpcore-4.4.15.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -7f2e0c573eaa7a74bac2e89b359e1f73d92a0a1d \ No newline at end of file diff --git a/plugins/repository-s3/licenses/httpcore-4.4.16.jar.sha1 b/plugins/repository-s3/licenses/httpcore-4.4.16.jar.sha1 new file mode 100644 index 0000000000000..172110694b5bd --- /dev/null +++ b/plugins/repository-s3/licenses/httpcore-4.4.16.jar.sha1 @@ -0,0 +1 @@ +51cf043c87253c9f58b539c9f7e44c8894223850 \ No newline at end of file diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index cf749eeffd903..49ebce77a59ad 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.ExceptionsHelper; +import org.opensearch.action.ActionListener; import org.opensearch.common.Nullable; import org.opensearch.common.SetOnce; import org.opensearch.common.blobstore.BlobContainer; @@ -296,6 +297,35 @@ private static DeleteObjectsRequest bulkDelete(String bucket, List blobs .build(); } + @Override + public void listBlobsByPrefixInSortedOrder( + String blobNamePrefix, + int limit, + BlobNameSortOrder blobNameSortOrder, + ActionListener> listener + ) { + // As AWS S3 returns list of keys in Lexicographic order, we don't have to fetch all the keys in order to sort them + // We fetch only keys as per the given limit to optimize the fetch. If provided sort order is not Lexicographic, + // we fall-back to default implementation of fetching all the keys and sorting them. + if (blobNameSortOrder != BlobNameSortOrder.LEXICOGRAPHIC) { + super.listBlobsByPrefixInSortedOrder(blobNamePrefix, limit, blobNameSortOrder, listener); + } else { + if (limit < 0) { + throw new IllegalArgumentException("limit should not be a negative value"); + } + String prefix = blobNamePrefix == null ? keyPath : buildKey(blobNamePrefix); + try (AmazonS3Reference clientReference = blobStore.clientReference()) { + List blobs = executeListing(clientReference, listObjectsRequest(prefix, limit), limit).stream() + .flatMap(listing -> listing.contents().stream()) + .map(s3Object -> new PlainBlobMetadata(s3Object.key().substring(keyPath.length()), s3Object.size())) + .collect(Collectors.toList()); + listener.onResponse(blobs.subList(0, Math.min(limit, blobs.size()))); + } catch (final Exception e) { + listener.onFailure(new IOException("Exception when listing blobs by prefix [" + prefix + "]", e)); + } + } + } + @Override public Map listBlobsByPrefix(@Nullable String blobNamePrefix) throws IOException { String prefix = blobNamePrefix == null ? keyPath : buildKey(blobNamePrefix); @@ -339,10 +369,25 @@ public Map children() throws IOException { } private static List executeListing(AmazonS3Reference clientReference, ListObjectsV2Request listObjectsRequest) { + return executeListing(clientReference, listObjectsRequest, -1); + } + + private static List executeListing( + AmazonS3Reference clientReference, + ListObjectsV2Request listObjectsRequest, + int limit + ) { return SocketAccess.doPrivileged(() -> { final List results = new ArrayList<>(); + int totalObjects = 0; ListObjectsV2Iterable listObjectsIterable = clientReference.get().listObjectsV2Paginator(listObjectsRequest); - listObjectsIterable.forEach(results::add); + for (ListObjectsV2Response listObjectsV2Response : listObjectsIterable) { + results.add(listObjectsV2Response); + totalObjects += listObjectsV2Response.contents().size(); + if (limit != -1 && totalObjects > limit) { + break; + } + } return results; }); } @@ -356,6 +401,10 @@ private ListObjectsV2Request listObjectsRequest(String keyPath) { .build(); } + private ListObjectsV2Request listObjectsRequest(String keyPath, int limit) { + return listObjectsRequest(keyPath).toBuilder().maxKeys(Math.min(limit, 1000)).build(); + } + private String buildKey(String blobName) { return keyPath + blobName; } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java index ec16f216f1777..a2a7ca8d8bdd5 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java @@ -33,6 +33,9 @@ package org.opensearch.repositories.s3; import org.mockito.ArgumentCaptor; +import org.opensearch.action.ActionListener; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStoreException; import org.opensearch.common.blobstore.DeleteResult; @@ -74,9 +77,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; +import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -84,12 +91,12 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; public class S3BlobStoreContainerTests extends OpenSearchTestCase { @@ -187,26 +194,34 @@ private static class MockListObjectsV2ResponseIterator implements Iterator keysListed = new ArrayList<>(); + private final List keysListed; private final boolean throwExceptionOnNextInvocation; public MockListObjectsV2ResponseIterator(int totalPageCount, int s3ObjectsPerPage, long s3ObjectSize) { - this.totalPageCount = totalPageCount; - this.s3ObjectsPerPage = s3ObjectsPerPage; - this.s3ObjectSize = s3ObjectSize; - this.throwExceptionOnNextInvocation = false; + this(totalPageCount, s3ObjectsPerPage, s3ObjectSize, ""); + } + + public MockListObjectsV2ResponseIterator(int totalPageCount, int s3ObjectsPerPage, long s3ObjectSize, String blobPath) { + this(totalPageCount, s3ObjectsPerPage, s3ObjectSize, blobPath, false); } public MockListObjectsV2ResponseIterator( int totalPageCount, int s3ObjectsPerPage, long s3ObjectSize, + String blobPath, boolean throwExceptionOnNextInvocation ) { this.totalPageCount = totalPageCount; this.s3ObjectsPerPage = s3ObjectsPerPage; this.s3ObjectSize = s3ObjectSize; this.throwExceptionOnNextInvocation = throwExceptionOnNextInvocation; + keysListed = new ArrayList<>(); + for (int i = 0; i < totalPageCount * s3ObjectsPerPage; i++) { + keysListed.add(blobPath + UUID.randomUUID().toString()); + } + // S3 lists keys in lexicographic order + keysListed.sort(String::compareTo); } @Override @@ -220,11 +235,12 @@ public ListObjectsV2Response next() { throw SdkException.builder().build(); } if (currInvocationCount.getAndIncrement() < totalPageCount) { - String s3ObjectKey = UUID.randomUUID().toString(); - keysListed.add(s3ObjectKey); - return ListObjectsV2Response.builder() - .contents(Collections.nCopies(s3ObjectsPerPage, S3Object.builder().key(s3ObjectKey).size(s3ObjectSize).build())) - .build(); + List s3Objects = new ArrayList<>(); + for (int i = 0; i < s3ObjectsPerPage; i++) { + String s3ObjectKey = keysListed.get((currInvocationCount.get() - 1) * s3ObjectsPerPage + i); + s3Objects.add(S3Object.builder().key(s3ObjectKey).size(s3ObjectSize).build()); + } + return ListObjectsV2Response.builder().contents(s3Objects).build(); } throw new NoSuchElementException(); } @@ -232,6 +248,10 @@ public ListObjectsV2Response next() { public List getKeysListed() { return keysListed; } + + public int numberOfPagesFetched() { + return currInvocationCount.get(); + } } public void testDelete() throws IOException { @@ -273,10 +293,8 @@ public void testDelete() throws IOException { // keysDeleted will have blobPath also assertEquals(listObjectsV2ResponseIterator.getKeysListed().size(), keysDeleted.size() - 1); assertTrue(keysDeleted.contains(blobPath.buildAsString())); - assertArrayEquals( - listObjectsV2ResponseIterator.getKeysListed().toArray(String[]::new), - keysDeleted.stream().filter(key -> !blobPath.buildAsString().equals(key)).toArray(String[]::new) - ); + keysDeleted.remove(blobPath.buildAsString()); + assertEquals(new HashSet<>(listObjectsV2ResponseIterator.getKeysListed()), new HashSet<>(keysDeleted)); } public void testDeleteItemLevelErrorsDuringDelete() { @@ -772,4 +790,112 @@ private static void assertNumberOfMultiparts(final int expectedParts, final long assertEquals("Expected number of parts [" + expectedParts + "] but got [" + result.v1() + "]", expectedParts, (long) result.v1()); assertEquals("Expected remaining [" + expectedRemaining + "] but got [" + result.v2() + "]", expectedRemaining, (long) result.v2()); } + + public void testListBlobsByPrefix() throws IOException { + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + + final S3Client client = mock(S3Client.class); + final AmazonS3Reference clientReference = new AmazonS3Reference(client); + when(blobStore.clientReference()).thenReturn(clientReference); + + BlobPath blobPath = mock(BlobPath.class); + when(blobPath.buildAsString()).thenReturn("/dummy/path"); + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + + final ListObjectsV2Iterable listObjectsV2Iterable = mock(ListObjectsV2Iterable.class); + when(client.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Iterable); + + MockListObjectsV2ResponseIterator iterator = new MockListObjectsV2ResponseIterator(2, 5, 100); + when(listObjectsV2Iterable.iterator()).thenReturn(iterator); + + Map listOfBlobs = blobContainer.listBlobsByPrefix(null); + assertEquals(10, listOfBlobs.size()); + + Set keys = iterator.keysListed.stream() + .map(s -> s.substring(blobPath.buildAsString().length())) + .collect(Collectors.toSet()); + assertEquals(keys, listOfBlobs.keySet()); + } + + private void testListBlobsByPrefixInLexicographicOrder( + int limit, + int expectedNumberofPagesFetched, + BlobContainer.BlobNameSortOrder blobNameSortOrder + ) throws IOException { + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + + final S3Client client = mock(S3Client.class); + final AmazonS3Reference clientReference = new AmazonS3Reference(client); + when(blobStore.clientReference()).thenReturn(clientReference); + + BlobPath blobPath = mock(BlobPath.class); + when(blobPath.buildAsString()).thenReturn("/dummy/path"); + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + + final ListObjectsV2Iterable listObjectsV2Iterable = mock(ListObjectsV2Iterable.class); + when(client.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Iterable); + + final MockListObjectsV2ResponseIterator iterator = new MockListObjectsV2ResponseIterator(2, 5, 100, blobPath.buildAsString()); + when(listObjectsV2Iterable.iterator()).thenReturn(iterator); + + if (limit >= 0) { + blobContainer.listBlobsByPrefixInSortedOrder(null, limit, blobNameSortOrder, new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) { + int actualLimit = Math.max(0, Math.min(limit, 10)); + assertEquals(actualLimit, blobMetadata.size()); + + List keys = iterator.keysListed.stream() + .map(s -> s.substring(blobPath.buildAsString().length())) + .collect(Collectors.toList()); + Comparator keysComparator = String::compareTo; + if (blobNameSortOrder != BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC) { + keysComparator = Collections.reverseOrder(String::compareTo); + } + keys.sort(keysComparator); + List sortedKeys = keys.subList(0, actualLimit); + assertEquals(sortedKeys, blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList())); + assertEquals(expectedNumberofPagesFetched, iterator.numberOfPagesFetched()); + } + + @Override + public void onFailure(Exception e) { + fail("blobContainer.listBlobsByPrefixInLexicographicOrder failed with exception: " + e.getMessage()); + } + }); + } else { + assertThrows( + IllegalArgumentException.class, + () -> blobContainer.listBlobsByPrefixInSortedOrder(null, limit, blobNameSortOrder, new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) {} + + @Override + public void onFailure(Exception e) {} + }) + ); + } + } + + public void testListBlobsByPrefixInLexicographicOrderWithNegativeLimit() throws IOException { + testListBlobsByPrefixInLexicographicOrder(-5, 0, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } + + public void testListBlobsByPrefixInLexicographicOrderWithZeroLimit() throws IOException { + testListBlobsByPrefixInLexicographicOrder(0, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } + + public void testListBlobsByPrefixInLexicographicOrderWithLimitLessThanPageSize() throws IOException { + testListBlobsByPrefixInLexicographicOrder(2, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } + + public void testListBlobsByPrefixInLexicographicOrderWithLimitGreaterThanPageSize() throws IOException { + testListBlobsByPrefixInLexicographicOrder(8, 2, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } + + public void testListBlobsByPrefixInLexicographicOrderWithLimitGreaterThanNumberOfRecords() throws IOException { + testListBlobsByPrefixInLexicographicOrder(12, 2, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } } diff --git a/plugins/telemetry-otel/build.gradle b/plugins/telemetry-otel/build.gradle index 7a56621be5f1e..2c275388cce38 100644 --- a/plugins/telemetry-otel/build.gradle +++ b/plugins/telemetry-otel/build.gradle @@ -54,3 +54,25 @@ thirdPartyAudit { 'io.opentelemetry.sdk.autoconfigure.spi.traces.ConfigurableSpanExporterProvider' ) } + +tasks.named("bundlePlugin").configure { + from('config/telemetry-otel') { + into 'config' + } +} + +tasks.register("writeTestJavaPolicy") { + doLast { + final File tmp = file("${buildDir}/tmp") + if (tmp.exists() == false && tmp.mkdirs() == false) { + throw new GradleException("failed to create temporary directory [${tmp}]") + } + final File javaPolicy = file("${tmp}/java.policy") + javaPolicy.write( + [ + "grant {", + " permission java.io.FilePermission \"config\", \"read\";", + "};" + ].join("\n")) + } +} diff --git a/plugins/telemetry-otel/config/telemetry-otel/log4j2.properties b/plugins/telemetry-otel/config/telemetry-otel/log4j2.properties new file mode 100644 index 0000000000000..544f42bd5513b --- /dev/null +++ b/plugins/telemetry-otel/config/telemetry-otel/log4j2.properties @@ -0,0 +1,27 @@ +# +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + + +appender.tracing.type = RollingFile +appender.tracing.name = tracing +appender.tracing.fileName = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_otel_traces.log +appender.tracing.filePermissions = rw-r----- +appender.tracing.layout.type = PatternLayout +appender.tracing.layout.pattern = %m%n +appender.tracing.filePattern = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_otel_traces-%i.log.gz +appender.tracing.policies.type = Policies +appender.tracing.policies.size.type = SizeBasedTriggeringPolicy +appender.tracing.policies.size.size = 1GB +appender.tracing.strategy.type = DefaultRolloverStrategy +appender.tracing.strategy.max = 4 + + +logger.exporter.name = io.opentelemetry.exporter.logging.LoggingSpanExporter +logger.exporter.level = INFO +logger.exporter.appenderRef.tracing.ref = tracing +logger.exporter.additivity = false diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelResourceProvider.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelResourceProvider.java index 04bade9ec942a..292165979c2f2 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelResourceProvider.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelResourceProvider.java @@ -42,7 +42,7 @@ private OTelResourceProvider() {} public static OpenTelemetry get(Settings settings) { return get( settings, - new LoggingSpanExporter(), + LoggingSpanExporter.create(), ContextPropagators.create(W3CTraceContextPropagator.getInstance()), Sampler.alwaysOn() ); diff --git a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java index b867b90af333c..f1f469544f634 100644 --- a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java @@ -113,8 +113,6 @@ private void printClusterRouting() throws IOException, ParseException { /** * This test verifies that segment replication does not break when primary shards are on lower OS version. It does this * by verifying replica shards contains same number of documents as primary's. - * - * @throws Exception */ public void testIndexingWithPrimaryOnBwcNodes() throws Exception { if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) { @@ -164,8 +162,6 @@ public void testIndexingWithPrimaryOnBwcNodes() throws Exception { * This test creates a cluster with primary on higher version but due to {@link org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider}; * replica shard allocation on lower OpenSearch version is prevented. Thus, this test though cover the use case where * primary shard containing nodes are running on higher OS version while replicas are unassigned. - * - * @throws Exception */ public void testIndexingWithReplicaOnBwcNodes() throws Exception { if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) { diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/PrimaryTermValidationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/PrimaryTermValidationIT.java new file mode 100644 index 0000000000000..6691da81f057d --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/PrimaryTermValidationIT.java @@ -0,0 +1,166 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.opensearch.action.admin.indices.refresh.RefreshResponse; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.cluster.coordination.FollowersChecker; +import org.opensearch.cluster.coordination.LeaderChecker; +import org.opensearch.cluster.health.ClusterHealthStatus; +import org.opensearch.cluster.health.ClusterIndexHealth; +import org.opensearch.common.UUIDs; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.index.shard.ShardNotFoundException; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.disruption.NetworkDisruption; +import org.opensearch.test.transport.MockTransportService; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.hamcrest.Matchers.equalTo; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) + +public class PrimaryTermValidationIT extends RemoteStoreBaseIntegTestCase { + + private static final String INDEX_NAME = "remote-store-test-idx-1"; + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class); + } + + public void testPrimaryTermValidation() throws Exception { + // Follower checker interval is lower compared to leader checker so that the cluster manager can remove the node + // with network partition faster. The follower check retry count is also kept 1. + Settings clusterSettings = Settings.builder() + .put(LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING.getKey(), "1s") + .put(LeaderChecker.LEADER_CHECK_INTERVAL_SETTING.getKey(), "20s") + .put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), 4) + .put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "1s") + .put(FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "1s") + .put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1) + .build(); + internalCluster().startClusterManagerOnlyNode(clusterSettings); + + // Create repository + absolutePath = randomRepoPath().toAbsolutePath(); + assertAcked( + clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath)) + ); + + // Start data nodes and create index + internalCluster().startDataOnlyNodes(2, clusterSettings); + createIndex(INDEX_NAME, remoteTranslogIndexSettings(1)); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + // Get the names of nodes to create network disruption + String primaryNode = primaryNodeName(INDEX_NAME); + String replicaNode = replicaNodeName(INDEX_NAME); + String clusterManagerNode = internalCluster().getClusterManagerName(); + logger.info("Node names : clusterManager={} primary={} replica={}", clusterManagerNode, primaryNode, replicaNode); + + // Index some docs and validate that both primary and replica node has it. Refresh is triggered to trigger segment replication + // to ensure replica is also upto date. + int numOfDocs = randomIntBetween(5, 10); + for (int i = 0; i < numOfDocs; i++) { + indexSameDoc(clusterManagerNode, INDEX_NAME); + } + refresh(INDEX_NAME); + assertBusy( + () -> assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), numOfDocs) + ); + assertBusy( + () -> assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), numOfDocs) + ); + + // Start network disruption - primary node will be isolated + Set nodesInOneSide = Stream.of(clusterManagerNode, replicaNode).collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(primaryNode).collect(Collectors.toCollection(HashSet::new)); + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.DISCONNECT + ); + internalCluster().setDisruptionScheme(networkDisruption); + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + // Ensure the node which is partitioned is removed from the cluster + assertBusy(() -> { + NodesInfoResponse response = client(clusterManagerNode).admin().cluster().prepareNodesInfo().get(); + assertThat(response.getNodes().size(), equalTo(2)); + }); + + // Ensure that the cluster manager has latest information about the index + assertBusy(() -> { + ClusterHealthResponse clusterHealthResponse = client(clusterManagerNode).admin() + .cluster() + .health(new ClusterHealthRequest()) + .actionGet(TimeValue.timeValueSeconds(1)); + assertTrue(clusterHealthResponse.getIndices().containsKey(INDEX_NAME)); + ClusterIndexHealth clusterIndexHealth = clusterHealthResponse.getIndices().get(INDEX_NAME); + assertEquals(ClusterHealthStatus.YELLOW, clusterHealthResponse.getStatus()); + assertEquals(1, clusterIndexHealth.getNumberOfShards()); + assertEquals(1, clusterIndexHealth.getActiveShards()); + assertEquals(1, clusterIndexHealth.getUnassignedShards()); + assertEquals(1, clusterIndexHealth.getUnassignedShards()); + assertEquals(1, clusterIndexHealth.getActivePrimaryShards()); + assertEquals(ClusterHealthStatus.YELLOW, clusterIndexHealth.getStatus()); + }); + + // Index data to the newly promoted primary + indexSameDoc(clusterManagerNode, INDEX_NAME); + RefreshResponse refreshResponse = client(clusterManagerNode).admin() + .indices() + .prepareRefresh(INDEX_NAME) + .setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_HIDDEN_FORBID_CLOSED) + .execute() + .actionGet(); + assertNoFailures(refreshResponse); + assertEquals(1, refreshResponse.getSuccessfulShards()); + assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), numOfDocs + 1); + + // At this point we stop the disruption. Since the follower checker has already failed and cluster manager has removed the node + // from cluster, failed node needs to start discovery process by leader checker call. We stop the disruption to allow the failed + // node to + // communicate with the other node which it assumes has replica. + networkDisruption.stopDisrupting(); + + // When the index call is made to the stale primary, it makes the primary term validation call to the other node (which + // it assumes has the replica node). At this moment, the stale primary realises that it is no more the primary and the caller + // received the following exception. + ShardNotFoundException exception = assertThrows(ShardNotFoundException.class, () -> indexSameDoc(primaryNode, INDEX_NAME)); + assertTrue(exception.getMessage().contains("no such shard")); + ensureStableCluster(3); + ensureGreen(INDEX_NAME); + } + + private IndexResponse indexSameDoc(String nodeName, String indexName) { + return client(nodeName).prepareIndex(indexName) + .setId(UUIDs.randomBase64UUID()) + .setSource("{\"foo\" : \"bar\"}", XContentType.JSON) + .get(); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index d226d0d757638..2b3fcadfc645e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -9,12 +9,12 @@ package org.opensearch.remotestore; import org.junit.After; -import org.junit.Before; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; +import org.opensearch.index.mapper.MapperService; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchIntegTestCase; @@ -74,6 +74,13 @@ protected Settings remoteStoreIndexSettings(int numberOfReplicas) { return remoteStoreIndexSettings(numberOfReplicas, 1); } + protected Settings remoteStoreIndexSettings(int numberOfReplicas, long totalFieldLimit) { + return Settings.builder() + .put(remoteStoreIndexSettings(numberOfReplicas)) + .put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), totalFieldLimit) + .build(); + } + protected Settings remoteTranslogIndexSettings(int numberOfReplicas, int numberOfShards) { return Settings.builder() .put(remoteStoreIndexSettings(numberOfReplicas, numberOfShards)) @@ -92,8 +99,7 @@ protected void putRepository(Path path) { ); } - @Before - public void setup() { + protected void setupRepo() { internalCluster().startClusterManagerOnlyNode(); absolutePath = randomRepoPath().toAbsolutePath(); assertAcked( diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 70a41d74a57c5..77de601b53ec6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -8,6 +8,7 @@ package org.opensearch.remotestore; +import org.junit.Before; import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.admin.indices.recovery.RecoveryResponse; @@ -17,6 +18,7 @@ import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; +import org.opensearch.index.shard.RemoteStoreRefreshListener; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.Plugin; import org.opensearch.test.InternalTestCluster; @@ -50,6 +52,11 @@ protected Collection> nodePlugins() { return Arrays.asList(MockTransportService.TestPlugin.class); } + @Before + public void setup() { + setupRepo(); + } + @Override public Settings indexSettings() { return remoteStoreIndexSettings(0); @@ -277,4 +284,42 @@ public void testRemoteSegmentCleanup() throws Exception { public void testRemoteTranslogCleanup() throws Exception { verifyRemoteStoreCleanup(true); } + + public void testStaleCommitDeletionWithInvokeFlush() throws Exception { + internalCluster().startDataOnlyNodes(3); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l)); + int numberOfIterations = randomIntBetween(5, 15); + indexData(numberOfIterations, true); + String indexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID, "/0/segments/metadata"); + // Delete is async. + assertBusy(() -> { + int actualFileCount = getFileCount(indexPath); + if (numberOfIterations <= RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP) { + assertEquals(numberOfIterations, actualFileCount); + } else { + // As delete is async its possible that the file gets created before the deletion or after + // deletion. + assertTrue(actualFileCount >= 10 || actualFileCount <= 11); + } + }, 30, TimeUnit.SECONDS); + } + + public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception { + internalCluster().startDataOnlyNodes(3); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l)); + int numberOfIterations = randomIntBetween(5, 15); + indexData(numberOfIterations, false); + String indexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID, "/0/segments/metadata"); + assertEquals(1, getFileCount(indexPath)); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java index 0ea87d106c14e..0e4774c1f3454 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java @@ -8,6 +8,7 @@ package org.opensearch.remotestore; +import org.junit.Before; import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats; import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsRequestBuilder; import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse; @@ -28,6 +29,11 @@ public class RemoteStoreStatsIT extends RemoteStoreBaseIntegTestCase { private static final String INDEX_NAME = "remote-store-test-idx-1"; + @Before + public void setup() { + setupRepo(); + } + public void testStatsResponseFromAllNodes() { // Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java index 712747f7479ae..0f3e041dd429a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java @@ -9,6 +9,7 @@ package org.opensearch.remotestore; import com.carrotsearch.randomizedtesting.RandomizedTest; +import org.junit.Before; import org.opensearch.action.admin.indices.close.CloseIndexResponse; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; @@ -31,6 +32,11 @@ public class ReplicaToPrimaryPromotionIT extends RemoteStoreBaseIntegTestCase { private int shard_count = 5; + @Before + public void setup() { + setupRepo(); + } + @Override public Settings indexSettings() { return Settings.builder() diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/RestoreSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/RestoreSnapshotIT.java index 9f492bbaee01a..e362b7f61e8e6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/RestoreSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/RestoreSnapshotIT.java @@ -183,8 +183,8 @@ public void testRestoreRemoteStoreIndicesWithoutRemoteTranslog() throws IOExcept public void testRestoreOperationsShallowCopyEnabled(boolean remoteTranslogEnabled) throws IOException, ExecutionException, InterruptedException { - internalCluster().startClusterManagerOnlyNode(); - final String primaryNode = internalCluster().startNode(); + String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); + String primary = internalCluster().startDataOnlyNode(); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String snapshotRepoName = "test-restore-snapshot-repo"; @@ -216,7 +216,7 @@ public void testRestoreOperationsShallowCopyEnabled(boolean remoteTranslogEnable indexDocuments(client, indexName2, numDocsInIndex2); ensureGreen(indexName1, indexName2); - final String secondNode = internalCluster().startNode(); + internalCluster().startDataOnlyNode(); logger.info("--> snapshot"); CreateSnapshotResponse createSnapshotResponse = client.admin() .cluster() @@ -273,10 +273,12 @@ public void testRestoreOperationsShallowCopyEnabled(boolean remoteTranslogEnable assertDocsPresentInIndex(client, restoredIndexName2, numDocsInIndex2); // deleting data for restoredIndexName1 and restoring from remote store. - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(restoredIndexName1))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary)); ensureRed(restoredIndexName1); - assertAcked(client().admin().indices().prepareClose(restoredIndexName1)); - client().admin() + // Re-initialize client to make sure we are not using client from stopped node. + client = client(clusterManagerNode); + assertAcked(client.admin().indices().prepareClose(restoredIndexName1)); + client.admin() .cluster() .restoreRemoteStore(new RestoreRemoteStoreRequest().indices(restoredIndexName1), PlainActionFuture.newFuture()); ensureYellowAndNoInitializingShards(restoredIndexName1); @@ -300,7 +302,7 @@ public void testRestoreOperationsShallowCopyEnabled(boolean remoteTranslogEnable assertEquals(restoreSnapshotResponse3.status(), RestStatus.ACCEPTED); ensureGreen(restoredIndexName1Seg); - GetIndexResponse getIndexResponse = client().admin() + GetIndexResponse getIndexResponse = client.admin() .indices() .getIndex(new GetIndexRequest().indices(restoredIndexName1Seg).includeDefaults(true)) .get(); @@ -331,7 +333,7 @@ public void testRestoreOperationsShallowCopyEnabled(boolean remoteTranslogEnable assertEquals(restoreSnapshotResponse4.status(), RestStatus.ACCEPTED); ensureGreen(restoredIndexName1Doc); - getIndexResponse = client().admin() + getIndexResponse = client.admin() .indices() .getIndex(new GetIndexRequest().indices(restoredIndexName1Doc).includeDefaults(true)) .get(); @@ -347,8 +349,8 @@ public void testRestoreOperationsShallowCopyEnabled(boolean remoteTranslogEnable } public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException { - internalCluster().startClusterManagerOnlyNode(); - internalCluster().startNode(); + String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); + String primary = internalCluster().startDataOnlyNode(); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String snapshotRepoName = "test-restore-snapshot-repo"; @@ -378,7 +380,7 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException { indexDocuments(client, indexName2, numDocsInIndex2); ensureGreen(indexName1, indexName2); - final String secondNode = internalCluster().startNode(); + internalCluster().startDataOnlyNode(); logger.info("--> snapshot"); CreateSnapshotResponse createSnapshotResponse = client.admin() .cluster() @@ -435,10 +437,12 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException { assertDocsPresentInIndex(client, restoredIndexName2, numDocsInIndex2); // deleting data for restoredIndexName1 and restoring from remote store. - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(indexName1))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary)); ensureRed(indexName1); - assertAcked(client().admin().indices().prepareClose(indexName1)); - client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(indexName1), PlainActionFuture.newFuture()); + // Re-initialize client to make sure we are not using client from stopped node. + client = client(clusterManagerNode); + assertAcked(client.admin().indices().prepareClose(indexName1)); + client.admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(indexName1), PlainActionFuture.newFuture()); ensureYellowAndNoInitializingShards(indexName1); ensureGreen(indexName1); assertDocsPresentInIndex(client(), indexName1, numDocsInIndex1); @@ -449,8 +453,8 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException { } public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException { - internalCluster().startClusterManagerOnlyNode(); - final String primaryNode = internalCluster().startNode(); + String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); + String primary = internalCluster().startDataOnlyNode(); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String snapshotRepoName = "test-restore-snapshot-repo"; @@ -479,7 +483,7 @@ public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException indexDocuments(client, indexName2, numDocsInIndex2); ensureGreen(indexName1, indexName2); - final String secondNode = internalCluster().startNode(); + internalCluster().startDataOnlyNode(); logger.info("--> snapshot"); CreateSnapshotResponse createSnapshotResponse = client.admin() @@ -513,9 +517,11 @@ public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException assertDocsPresentInIndex(client(), restoredIndexName1, numDocsInIndex1); // deleting data for restoredIndexName1 and restoring from remote store. - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(restoredIndexName1))); - assertAcked(client().admin().indices().prepareClose(restoredIndexName1)); - client().admin() + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary)); + // Re-initialize client to make sure we are not using client from stopped node. + client = client(clusterManagerNode); + assertAcked(client.admin().indices().prepareClose(restoredIndexName1)); + client.admin() .cluster() .restoreRemoteStore(new RestoreRemoteStoreRequest().indices(restoredIndexName1), PlainActionFuture.newFuture()); ensureYellowAndNoInitializingShards(restoredIndexName1); diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java index ac38768c9f3d3..e626824e7e271 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java @@ -32,10 +32,14 @@ package org.opensearch.common.blobstore; +import org.opensearch.action.ActionListener; + import java.io.IOException; import java.io.InputStream; import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; +import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Map; @@ -191,4 +195,47 @@ default long readBlobPreferredLength() { * @throws IOException if there were any failures in reading from the blob container. */ Map listBlobsByPrefix(String blobNamePrefix) throws IOException; + + /** + * The type representing sort order of blob names + */ + enum BlobNameSortOrder { + + LEXICOGRAPHIC(Comparator.comparing(BlobMetadata::name)); + + final Comparator comparator; + + public Comparator comparator() { + return comparator; + } + + BlobNameSortOrder(final Comparator comparator) { + this.comparator = comparator; + } + } + + /** + * Lists all blobs in the container that match the specified prefix in lexicographic order + * @param blobNamePrefix The prefix to match against blob names in the container. + * @param limit Limits the result size to min(limit, number of keys) + * @param blobNameSortOrder Comparator to sort keys with + * @param listener the listener to be notified upon request completion + */ + default void listBlobsByPrefixInSortedOrder( + String blobNamePrefix, + int limit, + BlobNameSortOrder blobNameSortOrder, + ActionListener> listener + ) { + if (limit < 0) { + throw new IllegalArgumentException("limit should not be a negative value"); + } + try { + List blobNames = new ArrayList<>(listBlobsByPrefix(blobNamePrefix).values()); + blobNames.sort(blobNameSortOrder.comparator()); + listener.onResponse(blobNames.subList(0, Math.min(blobNames.size(), limit))); + } catch (Exception e) { + listener.onFailure(e); + } + } } diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionDependency.java b/server/src/main/java/org/opensearch/extensions/ExtensionDependency.java index 1423a30bbe307..56c9f0387b13e 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionDependency.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionDependency.java @@ -16,10 +16,6 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; -import org.opensearch.core.common.Strings; -import org.opensearch.core.xcontent.XContentParser; - -import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; /** * This class handles the dependent extensions information @@ -60,39 +56,6 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVersion(version); } - public static ExtensionDependency parse(XContentParser parser) throws IOException { - String uniqueId = null; - Version version = null; - ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); - while (parser.nextToken() != XContentParser.Token.END_OBJECT) { - String fieldName = parser.currentName(); - parser.nextToken(); - - switch (fieldName) { - case UNIQUE_ID: - uniqueId = parser.text(); - break; - case VERSION: - try { - version = Version.fromString(parser.text()); - } catch (IllegalArgumentException e) { - throw e; - } - break; - default: - parser.skipChildren(); - break; - } - } - if (Strings.isNullOrEmpty(uniqueId)) { - throw new IOException("Required field [uniqueId] is missing in the request for the dependent extension"); - } else if (version == null) { - throw new IOException("Required field [version] is missing in the request for the dependent extension"); - } - return new ExtensionDependency(uniqueId, version); - - } - /** * The uniqueId of the dependency extension * diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java index 9987497b5fac0..cb22c8d864b1b 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java @@ -105,7 +105,7 @@ public static enum OpenSearchRequestType { /** * Instantiate a new ExtensionsManager object to handle requests and responses from extensions. This is called during Node bootstrap. * - * @param additionalSettings Additional settings to read in from extensions.yml + * @param additionalSettings Additional settings to read in from extension initialization request * @throws IOException If the extensions discovery file is not properly retrieved. */ public ExtensionsManager(Set> additionalSettings) throws IOException { @@ -504,4 +504,8 @@ void setAddSettingsUpdateConsumerRequestHandler(AddSettingsUpdateConsumerRequest Settings getEnvironmentSettings() { return environmentSettings; } + + public Set> getAdditionalSettings() { + return this.additionalSettings; + } } diff --git a/server/src/main/java/org/opensearch/extensions/rest/RestInitializeExtensionAction.java b/server/src/main/java/org/opensearch/extensions/rest/RestInitializeExtensionAction.java index e0806f8172278..f47f342617732 100644 --- a/server/src/main/java/org/opensearch/extensions/rest/RestInitializeExtensionAction.java +++ b/server/src/main/java/org/opensearch/extensions/rest/RestInitializeExtensionAction.java @@ -8,9 +8,14 @@ package org.opensearch.extensions.rest; +import org.opensearch.Version; import org.opensearch.client.node.NodeClient; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.core.common.Strings; +import org.opensearch.core.xcontent.MediaType; import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.core.xcontent.XContentParser; import org.opensearch.extensions.ExtensionDependency; import org.opensearch.extensions.ExtensionScopedSettings; import org.opensearch.extensions.ExtensionsManager; @@ -23,12 +28,16 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; -import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; import static org.opensearch.rest.RestRequest.Method.POST; /** @@ -62,36 +71,79 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client String openSearchVersion = null; String minimumCompatibleVersion = null; List dependencies = new ArrayList<>(); + Set additionalSettingsKeys = extensionsManager.getAdditionalSettings() + .stream() + .map(s -> s.getKey()) + .collect(Collectors.toSet()); - try (XContentParser parser = request.contentParser()) { - parser.nextToken(); - ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); - while (parser.nextToken() != XContentParser.Token.END_OBJECT) { - String currentFieldName = parser.currentName(); - parser.nextToken(); - if ("name".equals(currentFieldName)) { - name = parser.text(); - } else if ("uniqueId".equals(currentFieldName)) { - uniqueId = parser.text(); - } else if ("hostAddress".equals(currentFieldName)) { - hostAddress = parser.text(); - } else if ("port".equals(currentFieldName)) { - port = parser.text(); - } else if ("version".equals(currentFieldName)) { - version = parser.text(); - } else if ("opensearchVersion".equals(currentFieldName)) { - openSearchVersion = parser.text(); - } else if ("minimumCompatibleVersion".equals(currentFieldName)) { - minimumCompatibleVersion = parser.text(); - } else if ("dependencies".equals(currentFieldName)) { - ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser); - while (parser.nextToken() != XContentParser.Token.END_ARRAY) { - dependencies.add(ExtensionDependency.parse(parser)); + Tuple> unreadExtensionTuple = XContentHelper.convertToMap( + request.content(), + false, + request.getXContentType().xContent().mediaType() + ); + Map extensionMap = unreadExtensionTuple.v2(); + + ExtensionScopedSettings extAdditionalSettings = new ExtensionScopedSettings(extensionsManager.getAdditionalSettings()); + + try { + // checking to see whether any required fields are missing from extension initialization request or not + String[] requiredFields = { + "name", + "uniqueId", + "hostAddress", + "port", + "version", + "opensearchVersion", + "minimumCompatibleVersion" }; + List missingFields = Arrays.stream(requiredFields) + .filter(field -> !extensionMap.containsKey(field)) + .collect(Collectors.toList()); + if (!missingFields.isEmpty()) { + throw new IOException("Extension is missing these required fields : " + missingFields); + } + + // Parse extension dependencies + List extensionDependencyList = new ArrayList(); + if (extensionMap.get("dependencies") != null) { + List> extensionDependencies = new ArrayList<>( + (Collection>) extensionMap.get("dependencies") + ); + for (HashMap dependency : extensionDependencies) { + if (Strings.isNullOrEmpty((String) dependency.get("uniqueId"))) { + throw new IOException("Required field [uniqueId] is missing in the request for the dependent extension"); + } else if (dependency.get("version") == null) { + throw new IOException("Required field [version] is missing in the request for the dependent extension"); } + extensionDependencyList.add( + new ExtensionDependency( + dependency.get("uniqueId").toString(), + Version.fromString(dependency.get("version").toString()) + ) + ); } } + + Map additionalSettingsMap = extensionMap.entrySet() + .stream() + .filter(kv -> additionalSettingsKeys.contains(kv.getKey())) + .collect(Collectors.toMap(map -> map.getKey(), map -> map.getValue())); + + Settings.Builder output = Settings.builder(); + output.loadFromMap(additionalSettingsMap); + extAdditionalSettings.applySettings(output.build()); + + // Create extension read from initialization request + name = extensionMap.get("name").toString(); + uniqueId = extensionMap.get("uniqueId").toString(); + hostAddress = extensionMap.get("hostAddress").toString(); + port = extensionMap.get("port").toString(); + version = extensionMap.get("version").toString(); + openSearchVersion = extensionMap.get("opensearchVersion").toString(); + minimumCompatibleVersion = extensionMap.get("minimumCompatibleVersion").toString(); + dependencies = extensionDependencyList; } catch (IOException e) { - throw new IOException("Missing attribute", e); + logger.warn("loading extension has been failed because of exception : " + e.getMessage()); + return channel -> channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); } Extension extension = new Extension( @@ -103,8 +155,7 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client openSearchVersion, minimumCompatibleVersion, dependencies, - // TODO add this to the API (https://github.com/opensearch-project/OpenSearch/issues/8032) - new ExtensionScopedSettings(Collections.emptySet()) + extAdditionalSettings ); try { extensionsManager.loadExtension(extension); diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java new file mode 100644 index 0000000000000..1eeadfe228a45 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java @@ -0,0 +1,51 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import java.util.Arrays; + +/** + * Utils for remote store + * + * @opensearch.internal + */ +public class RemoteStoreUtils { + public static final int LONG_MAX_LENGTH = String.valueOf(Long.MAX_VALUE).length(); + + /** + * This method subtracts given numbers from Long.MAX_VALUE and returns a string representation of the result. + * The resultant string is guaranteed to be of the same length that of Long.MAX_VALUE. If shorter, we add left padding + * of 0s to the string. + * @param num number to get the inverted long string for + * @return String value of Long.MAX_VALUE - num + */ + public static String invertLong(long num) { + if (num < 0) { + throw new IllegalArgumentException("Negative long values are not allowed"); + } + String invertedLong = String.valueOf(Long.MAX_VALUE - num); + char[] characterArray = new char[LONG_MAX_LENGTH - invertedLong.length()]; + Arrays.fill(characterArray, '0'); + + return new String(characterArray) + invertedLong; + } + + /** + * This method converts the given string into long and subtracts it from Long.MAX_VALUE + * @param str long in string format to be inverted + * @return long value of the invert result + */ + public static long invertLong(String str) { + long num = Long.parseLong(str); + if (num < 0) { + throw new IllegalArgumentException("Strings representing negative long values are not allowed"); + } + return Long.MAX_VALUE - num; + } +} diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index d89d51c713d70..01c0a12d463ea 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2632,7 +2632,7 @@ public void restoreFromSnapshotAndRemoteStore( assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: " + recoveryState.getRecoverySource(); StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, repositoriesService, listener); + storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, repositoriesService, listener, threadPool); } catch (Exception e) { listener.onFailure(e); } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 7cfaaafcadd39..ddca12d9283f3 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -85,7 +85,7 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres // Visible for testing static final Set EXCLUDE_FILES = Set.of("write.lock"); // Visible for testing - static final int LAST_N_METADATA_FILES_TO_KEEP = 10; + public static final int LAST_N_METADATA_FILES_TO_KEEP = 10; private final IndexShard indexShard; private final Directory storeDirectory; @@ -200,9 +200,8 @@ private synchronized void syncSegments(boolean isRetry) { // if a new segments_N file is present in local that is not uploaded to remote store yet, it // is considered as a first refresh post commit. A cleanup of stale commit files is triggered. // This is done to avoid delete post each refresh. - // Ideally, we want this to be done in async flow. (GitHub issue #4315) if (isRefreshAfterCommit()) { - deleteStaleCommits(); + remoteDirectory.deleteStaleSegmentsAsync(LAST_N_METADATA_FILES_TO_KEEP); } try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { @@ -381,14 +380,6 @@ private String getChecksumOfLocalFile(String file) throws IOException { return localSegmentChecksumMap.get(file); } - private void deleteStaleCommits() { - try { - remoteDirectory.deleteStaleSegments(LAST_N_METADATA_FILES_TO_KEEP); - } catch (IOException e) { - logger.info("Exception while deleting stale commits from remote segment store, will retry delete post next commit", e); - } - } - /** * Updates the last refresh time and refresh seq no which is seen by local store. */ diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 119524e8caf8a..da4e9113143af 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -69,6 +69,7 @@ import org.opensearch.repositories.IndexId; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.channels.FileChannel; @@ -356,7 +357,8 @@ void recoverFromSnapshotAndRemoteStore( final IndexShard indexShard, Repository repository, RepositoriesService repositoriesService, - ActionListener listener + ActionListener listener, + ThreadPool threadPool ) { try { if (canRecover(indexShard)) { @@ -384,7 +386,10 @@ void recoverFromSnapshotAndRemoteStore( remoteStoreRepository = shallowCopyShardMetadata.getRemoteStoreRepository(); } - RemoteSegmentStoreDirectoryFactory directoryFactory = new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService); + RemoteSegmentStoreDirectoryFactory directoryFactory = new RemoteSegmentStoreDirectoryFactory( + () -> repositoriesService, + threadPool + ); RemoteSegmentStoreDirectory sourceRemoteDirectory = (RemoteSegmentStoreDirectory) directoryFactory.newDirectory( remoteStoreRepository, indexUUID, diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index addd8a24af9c5..ac129aca8baf7 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -27,6 +27,7 @@ import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; +import org.opensearch.threadpool.ThreadPool; import java.io.FileNotFoundException; import java.io.IOException; @@ -75,6 +76,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement private final RemoteStoreLockManager mdLockManager; + private final ThreadPool threadPool; + /** * To prevent explosion of refresh metadata files, we replace refresh files for the given primary term and generation * This is achieved by uploading refresh metadata file with the same UUID suffix. @@ -96,15 +99,23 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement private static final Logger logger = LogManager.getLogger(RemoteSegmentStoreDirectory.class); + /** + * AtomicBoolean that ensures only one staleCommitDeletion activity is scheduled at a time. + * Visible for testing + */ + protected final AtomicBoolean canDeleteStaleCommits = new AtomicBoolean(true); + public RemoteSegmentStoreDirectory( RemoteDirectory remoteDataDirectory, RemoteDirectory remoteMetadataDirectory, - RemoteStoreLockManager mdLockManager + RemoteStoreLockManager mdLockManager, + ThreadPool threadPool ) throws IOException { super(remoteDataDirectory); this.remoteDataDirectory = remoteDataDirectory; this.remoteMetadataDirectory = remoteMetadataDirectory; this.mdLockManager = mdLockManager; + this.threadPool = threadPool; init(); } @@ -574,7 +585,7 @@ public Map getSegmentsUploadedToRemoteStore(lon * @param lastNMetadataFilesToKeep number of metadata files to keep * @throws IOException in case of I/O error while reading from / writing to remote segment store */ - public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException { + private void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException { Collection metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX); List sortedMetadataFileList = metadataFiles.stream().sorted(METADATA_FILENAME_COMPARATOR).collect(Collectors.toList()); if (sortedMetadataFileList.size() <= lastNMetadataFilesToKeep) { @@ -656,6 +667,33 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException } } + /** + * Delete stale segment and metadata files asynchronously. + * This method calls {@link RemoteSegmentStoreDirectory#deleteStaleSegments(int)} in an async manner. + * @param lastNMetadataFilesToKeep number of metadata files to keep + */ + public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) { + if (canDeleteStaleCommits.compareAndSet(true, false)) { + try { + threadPool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> { + try { + deleteStaleSegments(lastNMetadataFilesToKeep); + } catch (Exception e) { + logger.info( + "Exception while deleting stale commits from remote segment store, will retry delete post next commit", + e + ); + } finally { + canDeleteStaleCommits.set(true); + } + }); + } catch (Exception e) { + logger.info("Exception occurred while scheduling deleteStaleCommits", e); + canDeleteStaleCommits.set(true); + } + } + } + /* Tries to delete shard level directory if it is empty Return true if it deleted it successfully @@ -680,7 +718,7 @@ private boolean deleteIfEmpty() throws IOException { } public void close() throws IOException { - deleteStaleSegments(0); + deleteStaleSegmentsAsync(0); deleteIfEmpty(); } } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index 03995d5913fb3..3bec84f287ce4 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -20,6 +20,7 @@ import org.opensearch.repositories.Repository; import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.util.function.Supplier; @@ -34,8 +35,11 @@ public class RemoteSegmentStoreDirectoryFactory implements IndexStorePlugin.Dire private final Supplier repositoriesService; - public RemoteSegmentStoreDirectoryFactory(Supplier repositoriesService) { + private final ThreadPool threadPool; + + public RemoteSegmentStoreDirectoryFactory(Supplier repositoriesService, ThreadPool threadPool) { this.repositoriesService = repositoriesService; + this.threadPool = threadPool; } @Override @@ -62,7 +66,7 @@ public Directory newDirectory(String repositoryName, String indexUUID, String sh shardId ); - return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool); } catch (RepositoryMissingException e) { throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e); } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 3742c817118da..aecf5659de7fe 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -717,7 +717,8 @@ protected Node( clusterService.setRerouteService(rerouteService); final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory = new RemoteSegmentStoreDirectoryFactory( - repositoriesServiceReference::get + repositoriesServiceReference::get, + threadPool ); final IndicesService indicesService = new IndicesService( @@ -1500,7 +1501,7 @@ public synchronized void close() throws IOException { toClose.add(injector.getInstance(NodeEnvironment.class)); toClose.add(stopWatch::stop); if (FeatureFlags.isEnabled(TELEMETRY)) { - toClose.add(() -> injector.getInstance(TracerFactory.class)); + toClose.add(injector.getInstance(TracerFactory.class)); } if (logger.isTraceEnabled()) { diff --git a/server/src/test/java/org/opensearch/common/blobstore/fs/FsBlobContainerTests.java b/server/src/test/java/org/opensearch/common/blobstore/fs/FsBlobContainerTests.java index 6c36368bfe446..f139a5d4e3bb1 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/fs/FsBlobContainerTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/fs/FsBlobContainerTests.java @@ -34,6 +34,9 @@ import org.apache.lucene.tests.mockfile.FilterFileSystemProvider; import org.apache.lucene.tests.mockfile.FilterSeekableByteChannel; import org.apache.lucene.tests.util.LuceneTestCase; +import org.opensearch.action.ActionListener; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.io.PathUtils; import org.opensearch.common.io.PathUtilsForTesting; @@ -54,10 +57,14 @@ import java.nio.file.Path; import java.nio.file.attribute.FileAttribute; import java.nio.file.spi.FileSystemProvider; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Locale; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -118,6 +125,79 @@ public void testIsTempBlobName() { assertThat(FsBlobContainer.isTempBlobName(tempBlobName), is(true)); } + private void testListBlobsByPrefixInSortedOrder(int limit, BlobContainer.BlobNameSortOrder blobNameSortOrder) throws IOException { + + final Path path = PathUtils.get(createTempDir().toString()); + + List blobsInFileSystem = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + final String blobName = randomAlphaOfLengthBetween(1, 20).toLowerCase(Locale.ROOT); + final byte[] blobData = randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb + Files.write(path.resolve(blobName), blobData); + blobsInFileSystem.add(blobName); + } + + final FsBlobContainer container = new FsBlobContainer( + new FsBlobStore(randomIntBetween(1, 8) * 1024, path, false), + BlobPath.cleanPath(), + path + ); + + if (limit >= 0) { + container.listBlobsByPrefixInSortedOrder(null, limit, blobNameSortOrder, new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) { + int actualLimit = Math.min(limit, 10); + assertEquals(actualLimit, blobMetadata.size()); + + if (blobNameSortOrder == BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC) { + blobsInFileSystem.sort(String::compareTo); + } else { + blobsInFileSystem.sort(Collections.reverseOrder(String::compareTo)); + } + List keys = blobsInFileSystem.subList(0, actualLimit); + assertEquals(keys, blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList())); + } + + @Override + public void onFailure(Exception e) { + fail("blobContainer.listBlobsByPrefixInLexicographicOrder failed with exception: " + e.getMessage()); + } + }); + } else { + assertThrows( + IllegalArgumentException.class, + () -> container.listBlobsByPrefixInSortedOrder(null, limit, blobNameSortOrder, new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) {} + + @Override + public void onFailure(Exception e) {} + }) + ); + } + } + + public void testListBlobsByPrefixInLexicographicOrderWithNegativeLimit() throws IOException { + testListBlobsByPrefixInSortedOrder(-5, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } + + public void testListBlobsByPrefixInLexicographicOrderWithZeroLimit() throws IOException { + testListBlobsByPrefixInSortedOrder(0, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } + + public void testListBlobsByPrefixInLexicographicOrderWithLimitLessThanNumberOfRecords() throws IOException { + testListBlobsByPrefixInSortedOrder(8, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } + + public void testListBlobsByPrefixInLexicographicOrderWithLimitNumberOfRecords() throws IOException { + testListBlobsByPrefixInSortedOrder(10, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } + + public void testListBlobsByPrefixInLexicographicOrderWithLimitGreaterThanNumberOfRecords() throws IOException { + testListBlobsByPrefixInSortedOrder(12, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } + static class MockFileSystemProvider extends FilterFileSystemProvider { final Consumer onRead; diff --git a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java index f8ec138d8eff2..713a70c6a7d3e 100644 --- a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java +++ b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java @@ -45,8 +45,6 @@ import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.ClusterSettingsResponse; import org.opensearch.common.util.FeatureFlags; -import org.opensearch.common.xcontent.json.JsonXContent; -import org.opensearch.core.xcontent.XContentParser; import org.opensearch.env.EnvironmentSettingsResponse; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; @@ -398,16 +396,6 @@ public void testExtensionDependency() throws Exception { } } - public void testParseExtensionDependency() throws Exception { - XContentParser parser = createParser(JsonXContent.jsonXContent, "{\"uniqueId\": \"test1\", \"version\": \"2.0.0\"}"); - - assertEquals(XContentParser.Token.START_OBJECT, parser.nextToken()); - ExtensionDependency dependency = ExtensionDependency.parse(parser); - - assertEquals("test1", dependency.getUniqueId()); - assertEquals(Version.fromString("2.0.0"), dependency.getVersion()); - } - public void testInitialize() throws Exception { ExtensionsManager extensionsManager = new ExtensionsManager(Set.of()); diff --git a/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java b/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java index 8d027b7fca9c2..7dd616c678e74 100644 --- a/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java +++ b/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java @@ -9,24 +9,33 @@ package org.opensearch.extensions.rest; import java.util.Collections; +import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import org.junit.After; import org.junit.Before; +import org.mockito.Mockito; import org.opensearch.Version; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.common.xcontent.XContentType; import org.opensearch.extensions.ExtensionsManager; +import org.opensearch.extensions.ExtensionsSettings; import org.opensearch.indices.breaker.NoneCircuitBreakerService; import org.opensearch.rest.RestRequest; import org.opensearch.rest.RestStatus; @@ -88,8 +97,12 @@ public void testRestInitializeExtensionActionResponse() throws Exception { ExtensionsManager extensionsManager = mock(ExtensionsManager.class); RestInitializeExtensionAction restInitializeExtensionAction = new RestInitializeExtensionAction(extensionsManager); final String content = "{\"name\":\"ad-extension\",\"uniqueId\":\"ad-extension\",\"hostAddress\":\"127.0.0.1\"," - + "\"port\":\"4532\",\"version\":\"1.0\",\"opensearchVersion\":\"3.0.0\"," - + "\"minimumCompatibleVersion\":\"3.0.0\"}"; + + "\"port\":\"4532\",\"version\":\"1.0\",\"opensearchVersion\":\"" + + Version.CURRENT.toString() + + "\"," + + "\"minimumCompatibleVersion\":\"" + + Version.CURRENT.minimumCompatibilityVersion().toString() + + "\"}"; RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withContent(new BytesArray(content), XContentType.JSON) .withMethod(RestRequest.Method.POST) .build(); @@ -106,8 +119,12 @@ public void testRestInitializeExtensionActionFailure() throws Exception { RestInitializeExtensionAction restInitializeExtensionAction = new RestInitializeExtensionAction(extensionsManager); final String content = "{\"name\":\"ad-extension\",\"uniqueId\":\"\",\"hostAddress\":\"127.0.0.1\"," - + "\"port\":\"4532\",\"version\":\"1.0\",\"opensearchVersion\":\"3.0.0\"," - + "\"minimumCompatibleVersion\":\"3.0.0\"}"; + + "\"port\":\"4532\",\"version\":\"1.0\",\"opensearchVersion\":\"" + + Version.CURRENT.toString() + + "\"," + + "\"minimumCompatibleVersion\":\"" + + Version.CURRENT.minimumCompatibilityVersion().toString() + + "\"}"; RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withContent(new BytesArray(content), XContentType.JSON) .withMethod(RestRequest.Method.POST) .build(); @@ -121,4 +138,98 @@ public void testRestInitializeExtensionActionFailure() throws Exception { ); } + public void testRestInitializeExtensionActionResponseWithAdditionalSettings() throws Exception { + Setting boolSetting = Setting.boolSetting("boolSetting", false, Setting.Property.ExtensionScope); + Setting stringSetting = Setting.simpleString("stringSetting", "default", Setting.Property.ExtensionScope); + Setting intSetting = Setting.intSetting("intSetting", 0, Setting.Property.ExtensionScope); + Setting listSetting = Setting.listSetting( + "listSetting", + List.of("first", "second", "third"), + Function.identity(), + Setting.Property.ExtensionScope + ); + ExtensionsManager extensionsManager = new ExtensionsManager(Set.of(boolSetting, stringSetting, intSetting, listSetting)); + ExtensionsManager spy = spy(extensionsManager); + + // optionally, you can stub out some methods: + when(spy.getAdditionalSettings()).thenCallRealMethod(); + Mockito.doCallRealMethod().when(spy).loadExtension(any(ExtensionsSettings.Extension.class)); + Mockito.doNothing().when(spy).initialize(); + RestInitializeExtensionAction restInitializeExtensionAction = new RestInitializeExtensionAction(spy); + final String content = "{\"name\":\"ad-extension\",\"uniqueId\":\"ad-extension\",\"hostAddress\":\"127.0.0.1\"," + + "\"port\":\"4532\",\"version\":\"1.0\",\"opensearchVersion\":\"" + + Version.CURRENT.toString() + + "\"," + + "\"minimumCompatibleVersion\":\"" + + Version.CURRENT.minimumCompatibilityVersion().toString() + + "\",\"boolSetting\":true,\"stringSetting\":\"customSetting\",\"intSetting\":5,\"listSetting\":[\"one\",\"two\",\"three\"]}"; + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withContent(new BytesArray(content), XContentType.JSON) + .withMethod(RestRequest.Method.POST) + .build(); + + FakeRestChannel channel = new FakeRestChannel(request, false, 0); + restInitializeExtensionAction.handleRequest(request, channel, null); + + assertEquals(channel.capturedResponse().status(), RestStatus.ACCEPTED); + assertTrue(channel.capturedResponse().content().utf8ToString().contains("A request to initialize an extension has been sent.")); + + Optional extension = spy.lookupExtensionSettingsById("ad-extension"); + assertTrue(extension.isPresent()); + assertEquals(true, extension.get().getAdditionalSettings().get(boolSetting)); + assertEquals("customSetting", extension.get().getAdditionalSettings().get(stringSetting)); + assertEquals(5, extension.get().getAdditionalSettings().get(intSetting)); + + List listSettingValue = (List) extension.get().getAdditionalSettings().get(listSetting); + assertTrue(listSettingValue.contains("one")); + assertTrue(listSettingValue.contains("two")); + assertTrue(listSettingValue.contains("three")); + } + + public void testRestInitializeExtensionActionResponseWithAdditionalSettingsUsingDefault() throws Exception { + Setting boolSetting = Setting.boolSetting("boolSetting", false, Setting.Property.ExtensionScope); + Setting stringSetting = Setting.simpleString("stringSetting", "default", Setting.Property.ExtensionScope); + Setting intSetting = Setting.intSetting("intSetting", 0, Setting.Property.ExtensionScope); + Setting listSetting = Setting.listSetting( + "listSetting", + List.of("first", "second", "third"), + Function.identity(), + Setting.Property.ExtensionScope + ); + ExtensionsManager extensionsManager = new ExtensionsManager(Set.of(boolSetting, stringSetting, intSetting, listSetting)); + ExtensionsManager spy = spy(extensionsManager); + + // optionally, you can stub out some methods: + when(spy.getAdditionalSettings()).thenCallRealMethod(); + Mockito.doCallRealMethod().when(spy).loadExtension(any(ExtensionsSettings.Extension.class)); + Mockito.doNothing().when(spy).initialize(); + RestInitializeExtensionAction restInitializeExtensionAction = new RestInitializeExtensionAction(spy); + final String content = "{\"name\":\"ad-extension\",\"uniqueId\":\"ad-extension\",\"hostAddress\":\"127.0.0.1\"," + + "\"port\":\"4532\",\"version\":\"1.0\",\"opensearchVersion\":\"" + + Version.CURRENT.toString() + + "\"," + + "\"minimumCompatibleVersion\":\"" + + Version.CURRENT.minimumCompatibilityVersion().toString() + + "\"}"; + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withContent(new BytesArray(content), XContentType.JSON) + .withMethod(RestRequest.Method.POST) + .build(); + + FakeRestChannel channel = new FakeRestChannel(request, false, 0); + restInitializeExtensionAction.handleRequest(request, channel, null); + + assertEquals(channel.capturedResponse().status(), RestStatus.ACCEPTED); + assertTrue(channel.capturedResponse().content().utf8ToString().contains("A request to initialize an extension has been sent.")); + + Optional extension = spy.lookupExtensionSettingsById("ad-extension"); + assertTrue(extension.isPresent()); + assertEquals(false, extension.get().getAdditionalSettings().get(boolSetting)); + assertEquals("default", extension.get().getAdditionalSettings().get(stringSetting)); + assertEquals(0, extension.get().getAdditionalSettings().get(intSetting)); + + List listSettingValue = (List) extension.get().getAdditionalSettings().get(listSetting); + assertTrue(listSettingValue.contains("first")); + assertTrue(listSettingValue.contains("second")); + assertTrue(listSettingValue.contains("third")); + } + } diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index d9d87196ca289..32b8fb5a4dc62 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -252,7 +252,7 @@ private IndexService newIndexService(IndexModule module) throws IOException { writableRegistry(), () -> false, null, - new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService), + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool), translogFactorySupplier ); } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java new file mode 100644 index 0000000000000..5b9135afb66f3 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import org.opensearch.test.OpenSearchTestCase; + +public class RemoteStoreUtilsTests extends OpenSearchTestCase { + + public void testInvertToStrInvalid() { + assertThrows(IllegalArgumentException.class, () -> RemoteStoreUtils.invertLong(-1)); + } + + public void testInvertToStrValid() { + assertEquals("9223372036854774573", RemoteStoreUtils.invertLong(1234)); + assertEquals("0000000000000001234", RemoteStoreUtils.invertLong(9223372036854774573L)); + } + + public void testInvertToLongInvalid() { + assertThrows(IllegalArgumentException.class, () -> RemoteStoreUtils.invertLong("-5")); + } + + public void testInvertToLongValid() { + assertEquals(1234, RemoteStoreUtils.invertLong("9223372036854774573")); + assertEquals(9223372036854774573L, RemoteStoreUtils.invertLong("0000000000000001234")); + } + + public void testinvert() { + assertEquals(0, RemoteStoreUtils.invertLong(RemoteStoreUtils.invertLong(0))); + assertEquals(Long.MAX_VALUE, RemoteStoreUtils.invertLong(RemoteStoreUtils.invertLong(Long.MAX_VALUE))); + for (int i = 0; i < 10; i++) { + long num = randomLongBetween(1, Long.MAX_VALUE); + assertEquals(num, RemoteStoreUtils.invertLong(RemoteStoreUtils.invertLong(num))); + } + } +} diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java index 7a9cbc12d823b..324315505987b 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java @@ -24,6 +24,7 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.file.Path; @@ -41,14 +42,16 @@ public class RemoteSegmentStoreDirectoryFactoryTests extends OpenSearchTestCase private Supplier repositoriesServiceSupplier; private RepositoriesService repositoriesService; + private ThreadPool threadPool; private RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory; @Before public void setup() { repositoriesServiceSupplier = mock(Supplier.class); repositoriesService = mock(RepositoriesService.class); + threadPool = mock(ThreadPool.class); when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService); - remoteSegmentStoreDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(repositoriesServiceSupplier); + remoteSegmentStoreDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(repositoriesServiceSupplier, threadPool); } public void testNewDirectory() throws IOException { diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 3417e7b0aee04..66e4b9a357b85 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -30,12 +30,14 @@ import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.file.NoSuchFileException; @@ -45,6 +47,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -56,6 +59,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.doReturn; +import static org.hamcrest.CoreMatchers.is; public class RemoteSegmentStoreDirectoryTests extends IndexShardTestCase { private RemoteDirectory remoteDataDirectory; @@ -65,21 +69,31 @@ public class RemoteSegmentStoreDirectoryTests extends IndexShardTestCase { private RemoteSegmentStoreDirectory remoteSegmentStoreDirectory; private IndexShard indexShard; private SegmentInfos segmentInfos; + private ThreadPool threadPool; @Before public void setup() throws IOException { remoteDataDirectory = mock(RemoteDirectory.class); remoteMetadataDirectory = mock(RemoteDirectory.class); mdLockManager = mock(RemoteStoreMetadataLockManager.class); + threadPool = mock(ThreadPool.class); - remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(remoteDataDirectory, remoteMetadataDirectory, mdLockManager); + remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory( + remoteDataDirectory, + remoteMetadataDirectory, + mdLockManager, + threadPool + ); Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build(); + ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService(); indexShard = newStartedShard(false, indexSettings, new NRTReplicationEngineFactory()); try (Store store = indexShard.store()) { segmentInfos = store.readLastCommittedSegmentsInfo(); } + + when(threadPool.executor(ThreadPool.Names.REMOTE_PURGE)).thenReturn(executorService); } @After @@ -766,41 +780,76 @@ public void testIncorrectChecksumCorruptIndexException() throws IOException { assertThrows(CorruptIndexException.class, () -> remoteSegmentStoreDirectory.init()); } - public void testDeleteStaleCommitsException() throws IOException { + public void testDeleteStaleCommitsException() throws Exception { + populateMetadata(); when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenThrow( new IOException("Error reading") ); - assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.deleteStaleSegments(5)); + // popluateMetadata() adds stub to return 3 metadata files + // We are passing lastNMetadataFilesToKeep=2 here to validate that in case of exception deleteFile is not + // invoked + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); + + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); + verify(remoteMetadataDirectory, times(0)).deleteFile(any(String.class)); + } + + public void testDeleteStaleCommitsExceptionWhileScheduling() throws Exception { + populateMetadata(); + doThrow(new IllegalArgumentException()).when(threadPool).executor(any(String.class)); + + // popluateMetadata() adds stub to return 3 metadata files + // We are passing lastNMetadataFilesToKeep=2 here to validate that in case of exception deleteFile is not + // invoked + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); + + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); + verify(remoteMetadataDirectory, times(0)).deleteFile(any(String.class)); + } + + public void testDeleteStaleCommitsWithDeletionAlreadyInProgress() throws Exception { + populateMetadata(); + remoteSegmentStoreDirectory.canDeleteStaleCommits.set(false); + + // popluateMetadata() adds stub to return 3 metadata files + // We are passing lastNMetadataFilesToKeep=2 here to validate that in case of exception deleteFile is not + // invoked + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); + + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(false))); + verify(remoteMetadataDirectory, times(0)).deleteFile(any(String.class)); } - public void testDeleteStaleCommitsWithinThreshold() throws IOException { + public void testDeleteStaleCommitsWithinThreshold() throws Exception { populateMetadata(); // popluateMetadata() adds stub to return 3 metadata files // We are passing lastNMetadataFilesToKeep=5 here so that none of the metadata files will be deleted - remoteSegmentStoreDirectory.deleteStaleSegments(5); + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(5); + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); verify(remoteMetadataDirectory, times(0)).openInput(any(String.class), eq(IOContext.DEFAULT)); } - public void testDeleteStaleCommitsActualDelete() throws IOException { + public void testDeleteStaleCommitsActualDelete() throws Exception { Map> metadataFilenameContentMapping = populateMetadata(); remoteSegmentStoreDirectory.init(); // popluateMetadata() adds stub to return 3 metadata files // We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted - remoteSegmentStoreDirectory.deleteStaleSegments(2); + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); for (String metadata : metadataFilenameContentMapping.get("metadata__1__5__abc").values()) { String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]; verify(remoteDataDirectory).deleteFile(uploadedFilename); } ; + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); verify(remoteMetadataDirectory).deleteFile("metadata__1__5__abc"); } - public void testDeleteStaleCommitsActualDeleteIOException() throws IOException { + public void testDeleteStaleCommitsActualDeleteIOException() throws Exception { Map> metadataFilenameContentMapping = populateMetadata(); remoteSegmentStoreDirectory.init(); @@ -813,17 +862,18 @@ public void testDeleteStaleCommitsActualDeleteIOException() throws IOException { doThrow(new IOException("Error")).when(remoteDataDirectory).deleteFile(segmentFileWithException); // popluateMetadata() adds stub to return 3 metadata files // We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted - remoteSegmentStoreDirectory.deleteStaleSegments(2); + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); for (String metadata : metadataFilenameContentMapping.get("metadata__1__5__abc").values()) { String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]; verify(remoteDataDirectory).deleteFile(uploadedFilename); } ; + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); verify(remoteMetadataDirectory, times(0)).deleteFile("metadata__1__5__abc"); } - public void testDeleteStaleCommitsActualDeleteNoSuchFileException() throws IOException { + public void testDeleteStaleCommitsActualDeleteNoSuchFileException() throws Exception { Map> metadataFilenameContentMapping = populateMetadata(); remoteSegmentStoreDirectory.init(); @@ -836,13 +886,14 @@ public void testDeleteStaleCommitsActualDeleteNoSuchFileException() throws IOExc doThrow(new NoSuchFileException(segmentFileWithException)).when(remoteDataDirectory).deleteFile(segmentFileWithException); // popluateMetadata() adds stub to return 3 metadata files // We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted - remoteSegmentStoreDirectory.deleteStaleSegments(2); + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); for (String metadata : metadataFilenameContentMapping.get("metadata__1__5__abc").values()) { String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]; verify(remoteDataDirectory).deleteFile(uploadedFilename); } ; + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); verify(remoteMetadataDirectory).deleteFile("metadata__1__5__abc"); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 0bb2b604e8f1a..88899a1b282af 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -1835,7 +1835,7 @@ public void onFailure(final Exception e) { emptyMap(), null, emptyMap(), - new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService), + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool), repositoriesServiceReference::get, fileCacheCleaner ); diff --git a/settings.gradle b/settings.gradle index bf899d04c1e08..d908e941b3d6b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -87,7 +87,8 @@ List projects = [ 'test:fixtures:minio-fixture', 'test:fixtures:old-elasticsearch', 'test:fixtures:s3-fixture', - 'test:logger-usage' + 'test:logger-usage', + 'test:telemetry' ] /** diff --git a/test/framework/build.gradle b/test/framework/build.gradle index 255f554db7a79..2532fdf1938fd 100644 --- a/test/framework/build.gradle +++ b/test/framework/build.gradle @@ -38,6 +38,7 @@ dependencies { api project(':libs:opensearch-nio') api project(":server") api project(":libs:opensearch-cli") + api project(":test:telemetry") api "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}" api "junit:junit:${versions.junit}" api "org.hamcrest:hamcrest:${versions.hamcrest}" diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index ea9e9342673db..7f3819563dcbd 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -666,7 +666,7 @@ protected RemoteSegmentStoreDirectory createRemoteSegmentStoreDirectory(ShardId RemoteStoreLockManager remoteStoreLockManager = new RemoteStoreMetadataLockManager( new RemoteBufferedOutputDirectory(getBlobContainer(remoteShardPath.resolveIndex())) ); - return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, remoteStoreLockManager); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, remoteStoreLockManager, threadPool); } private RemoteDirectory newRemoteDirectory(Path f) throws IOException { diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index caa5b90016740..fec45219ace81 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -109,6 +109,7 @@ import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.common.xcontent.XContentType; @@ -152,6 +153,7 @@ import org.opensearch.test.disruption.ServiceDisruptionScheme; import org.opensearch.test.store.MockFSIndexStore; import org.opensearch.test.transport.MockTransportService; +import org.opensearch.test.telemetry.MockTelemetryPlugin; import org.opensearch.transport.TransportInterceptor; import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportRequestHandler; @@ -776,6 +778,7 @@ protected Settings featureFlagSettings() { for (Setting builtInFlag : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) { featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY)); } + featureSettings.put(FeatureFlags.TELEMETRY_SETTING.getKey(), true); return featureSettings.build(); } @@ -2101,6 +2104,7 @@ protected Collection> getMockPlugins() { if (addMockGeoShapeFieldMapper()) { mocks.add(TestGeoShapeFieldMapperPlugin.class); } + mocks.add(MockTelemetryPlugin.class); return Collections.unmodifiableList(mocks); } diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchSingleNodeTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchSingleNodeTestCase.java index bf797ef6b310b..91c48f1679f9a 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchSingleNodeTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchSingleNodeTestCase.java @@ -48,6 +48,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.Strings; @@ -66,6 +67,8 @@ import org.opensearch.plugins.Plugin; import org.opensearch.script.MockScriptService; import org.opensearch.search.internal.SearchContext; +import org.opensearch.telemetry.TelemetrySettings; +import org.opensearch.test.telemetry.MockTelemetryPlugin; import org.opensearch.transport.TransportSettings; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -242,6 +245,8 @@ private Node newNode() { .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false) .putList(DISCOVERY_SEED_HOSTS_SETTING.getKey()) // empty list disables a port scan for other nodes .putList(INITIAL_CLUSTER_MANAGER_NODES_SETTING.getKey(), nodeName) + .put(FeatureFlags.TELEMETRY_SETTING.getKey(), true) + .put(TelemetrySettings.TRACER_ENABLED_SETTING.getKey(), true) .put(nodeSettings()) // allow test cases to provide their own settings or override these .build(); @@ -254,6 +259,7 @@ private Node newNode() { plugins.add(MockHttpTransport.TestPlugin.class); } plugins.add(MockScriptService.TestPlugin.class); + plugins.add(MockTelemetryPlugin.class); Node node = new MockNode(settings, plugins, forbidPrivateIndexSettings()); try { node.start(); diff --git a/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetry.java b/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetry.java new file mode 100644 index 0000000000000..c02ab1d737303 --- /dev/null +++ b/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetry.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.test.telemetry; + +import org.opensearch.telemetry.Telemetry; +import org.opensearch.telemetry.TelemetrySettings; +import org.opensearch.telemetry.metrics.MetricsTelemetry; +import org.opensearch.test.telemetry.tracing.MockTracingTelemetry; +import org.opensearch.telemetry.tracing.TracingTelemetry; + +/** + * Mock {@link Telemetry} implementation for testing. + */ +public class MockTelemetry implements Telemetry { + + private final TelemetrySettings settings; + + /** + * Constructor with settings. + * @param settings telemetry settings. + */ + public MockTelemetry(TelemetrySettings settings) { + this.settings = settings; + } + + @Override + public TracingTelemetry getTracingTelemetry() { + return new MockTracingTelemetry(); + } + + @Override + public MetricsTelemetry getMetricsTelemetry() { + return new MetricsTelemetry() { + }; + } +} diff --git a/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetryPlugin.java b/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetryPlugin.java new file mode 100644 index 0000000000000..41cc5c1e77a34 --- /dev/null +++ b/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetryPlugin.java @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.test.telemetry; + +import java.util.Optional; +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.TelemetryPlugin; +import org.opensearch.telemetry.Telemetry; +import org.opensearch.telemetry.TelemetrySettings; + +/** + * Mock {@link TelemetryPlugin} implementation for testing. + */ +public class MockTelemetryPlugin extends Plugin implements TelemetryPlugin { + private static final String MOCK_TRACER_NAME = "mock"; + + /** + * Base constructor. + */ + public MockTelemetryPlugin() { + + } + + @Override + public Optional getTelemetry(TelemetrySettings settings) { + return Optional.of(new MockTelemetry(settings)); + } + + @Override + public String getName() { + return MOCK_TRACER_NAME; + } +} diff --git a/test/telemetry/build.gradle b/test/telemetry/build.gradle new file mode 100644 index 0000000000000..fbabe43aa5e5a --- /dev/null +++ b/test/telemetry/build.gradle @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +apply plugin: 'opensearch.build' +apply plugin: 'opensearch.publish' + +dependencies { + api project(":libs:opensearch-common") + api project(":libs:opensearch-telemetry") +} + +tasks.named('forbiddenApisMain').configure { + //package does not depend on core, so only jdk signatures should be checked + replaceSignatureFiles 'jdk-signatures' +} + +test.enabled = false diff --git a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockSpan.java b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockSpan.java new file mode 100644 index 0000000000000..4779d9796e11e --- /dev/null +++ b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockSpan.java @@ -0,0 +1,159 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.test.telemetry.tracing; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Supplier; +import org.opensearch.telemetry.tracing.AbstractSpan; +import org.opensearch.telemetry.tracing.Span; + +/** + * MockSpan for testing and strict check validations. Not to be used for production cases. + */ +public class MockSpan extends AbstractSpan { + private final SpanProcessor spanProcessor; + private final Map metadata; + private final String traceId; + private final String spanId; + private boolean hasEnded; + private final Long startTime; + private Long endTime; + + private final Object lock = new Object(); + + private static final Supplier randomSupplier = ThreadLocalRandom::current; + + /** + * Base Constructor. + * @param spanName span name + * @param parentSpan parent span + * @param spanProcessor span processor + */ + public MockSpan(String spanName, Span parentSpan, SpanProcessor spanProcessor) { + this( + spanName, + parentSpan, + parentSpan != null ? parentSpan.getTraceId() : IdGenerator.generateTraceId(), + IdGenerator.generateSpanId(), + spanProcessor + ); + } + + /** + * Constructor with traceId and SpanIds + * @param spanName Span Name + * @param parentSpan Parent Span + * @param traceId Trace ID + * @param spanId Span ID + * @param spanProcessor Span Processor + */ + public MockSpan(String spanName, Span parentSpan, String traceId, String spanId, SpanProcessor spanProcessor) { + super(spanName, parentSpan); + this.spanProcessor = spanProcessor; + this.metadata = new HashMap<>(); + this.traceId = traceId; + this.spanId = spanId; + this.startTime = System.nanoTime(); + } + + @Override + public void endSpan() { + synchronized (lock) { + if (hasEnded) { + return; + } + endTime = System.nanoTime(); + hasEnded = true; + } + spanProcessor.onEnd(this); + } + + @Override + public void addAttribute(String key, String value) { + putMetadata(key, value); + } + + @Override + public void addAttribute(String key, Long value) { + putMetadata(key, value); + } + + @Override + public void addAttribute(String key, Double value) { + putMetadata(key, value); + } + + @Override + public void addAttribute(String key, Boolean value) { + putMetadata(key, value); + } + + @Override + public void addEvent(String event) { + putMetadata(event, null); + } + + private void putMetadata(String key, Object value) { + metadata.put(key, value); + } + + @Override + public String getTraceId() { + return traceId; + } + + @Override + public String getSpanId() { + return spanId; + } + + /** + * Returns whether the span is ended or not. + * @return span end status. + */ + public boolean hasEnded() { + synchronized (lock) { + return hasEnded; + } + } + + /** + * Returns the start time of the span. + * @return start time of the span. + */ + public Long getStartTime() { + return startTime; + } + + /** + * Returns the start time of the span. + * @return end time of the span. + */ + public Long getEndTime() { + return endTime; + } + + private static class IdGenerator { + private static String generateSpanId() { + long id = randomSupplier.get().nextLong(); + return Long.toHexString(id); + } + + private static String generateTraceId() { + long idHi = randomSupplier.get().nextLong(); + long idLo = randomSupplier.get().nextLong(); + long result = idLo | (idHi << 32); + return Long.toHexString(result); + } + + } +} diff --git a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingContextPropagator.java b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingContextPropagator.java new file mode 100644 index 0000000000000..7e3f5a9031100 --- /dev/null +++ b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingContextPropagator.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.test.telemetry.tracing; + +import java.util.Locale; +import java.util.Map; +import java.util.function.BiConsumer; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.TracingContextPropagator; + +/** + * Mock {@link TracingContextPropagator} to persist the span for internode communication. + */ +public class MockTracingContextPropagator implements TracingContextPropagator { + + private static final String TRACE_PARENT = "traceparent"; + private static final String SEPARATOR = "~"; + private final SpanProcessor spanProcessor; + + /** + * Constructor + * @param spanProcessor span processor. + */ + public MockTracingContextPropagator(SpanProcessor spanProcessor) { + this.spanProcessor = spanProcessor; + } + + @Override + public Span extract(Map props) { + String value = props.get(TRACE_PARENT); + if (value != null) { + String[] values = value.split(SEPARATOR); + String traceId = values[0]; + String spanId = values[1]; + return new MockSpan(null, null, traceId, spanId, spanProcessor); + } else { + return null; + } + } + + @Override + public void inject(Span currentSpan, BiConsumer setter) { + if (currentSpan instanceof MockSpan) { + String traceId = currentSpan.getTraceId(); + String spanId = currentSpan.getSpanId(); + String traceParent = String.format(Locale.ROOT, "%s%s%s", traceId, SEPARATOR, spanId); + setter.accept(TRACE_PARENT, traceParent); + } + } +} diff --git a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingTelemetry.java b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingTelemetry.java new file mode 100644 index 0000000000000..531b4ce36c36a --- /dev/null +++ b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingTelemetry.java @@ -0,0 +1,46 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.test.telemetry.tracing; + +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.TracingContextPropagator; +import org.opensearch.telemetry.tracing.TracingTelemetry; + +/** + * Mock {@link TracingTelemetry} implementation for testing. + */ +public class MockTracingTelemetry implements TracingTelemetry { + + private final SpanProcessor spanProcessor = new StrictCheckSpanProcessor(); + + /** + * Base constructor. + */ + public MockTracingTelemetry() { + + } + + @Override + public Span createSpan(String spanName, Span parentSpan) { + Span span = new MockSpan(spanName, parentSpan, spanProcessor); + spanProcessor.onStart(span); + return span; + } + + @Override + public TracingContextPropagator getContextPropagator() { + return new MockTracingContextPropagator(spanProcessor); + } + + @Override + public void close() { + ((StrictCheckSpanProcessor) spanProcessor).ensureAllSpansAreClosed(); + ((StrictCheckSpanProcessor) spanProcessor).clear(); + } +} diff --git a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/SpanProcessor.java b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/SpanProcessor.java new file mode 100644 index 0000000000000..cb9d0dbd428e4 --- /dev/null +++ b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/SpanProcessor.java @@ -0,0 +1,28 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.test.telemetry.tracing; + +import org.opensearch.telemetry.tracing.Span; + +/** + * Processes the span and can perform any action on the span start and end. + */ +public interface SpanProcessor { + /** + * Logic to be executed on span start. + * @param span span which is starting. + */ + void onStart(Span span); + + /** + * Logic to be executed on span end. + * @param span span which is ending. + */ + void onEnd(Span span); +} diff --git a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/StrictCheckSpanProcessor.java b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/StrictCheckSpanProcessor.java new file mode 100644 index 0000000000000..34d4d96809755 --- /dev/null +++ b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/StrictCheckSpanProcessor.java @@ -0,0 +1,84 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.test.telemetry.tracing; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.opensearch.telemetry.tracing.Span; + +/** + * Strict check span processor to validate the spans. + */ +public class StrictCheckSpanProcessor implements SpanProcessor { + private final Map spanMap = new ConcurrentHashMap<>(); + + /** + * Base constructor. + */ + public StrictCheckSpanProcessor() { + + } + + @Override + public void onStart(Span span) { + spanMap.put(span.getSpanId(), Thread.currentThread().getStackTrace()); + } + + @Override + public void onEnd(Span span) { + spanMap.remove(span.getSpanId()); + } + + /** + * Ensures that all the spans are closed. Throws exception message with stack trace of the method form + * where the span was created. We can enhance it to print all the failed spans in a single go based on + * the usability. + */ + public void ensureAllSpansAreClosed() { + if (!spanMap.isEmpty()) { + for (Map.Entry entry : spanMap.entrySet()) { + StackTraceElement[] filteredStackTrace = getFilteredStackTrace(entry.getValue()); + AssertionError error = new AssertionError( + String.format( + Locale.ROOT, + " Total [%d] spans are not ended properly. " + "Find below the stack trace for one of the un-ended span", + spanMap.size() + ) + ); + error.setStackTrace(filteredStackTrace); + spanMap.clear(); + throw error; + } + } + } + + /** + * Clears the state. + */ + public void clear() { + spanMap.clear(); + } + + private StackTraceElement[] getFilteredStackTrace(StackTraceElement[] stackTraceElements) { + int filteredElementsCount = 0; + while (filteredElementsCount < stackTraceElements.length) { + String className = stackTraceElements[filteredElementsCount].getClassName(); + if (className.startsWith("java.lang.Thread") + || className.startsWith("org.opensearch.telemetry") + || className.startsWith("org.opensearch.tracing")) { + filteredElementsCount++; + } else { + break; + } + } + return Arrays.copyOfRange(stackTraceElements, filteredElementsCount, stackTraceElements.length); + } +} diff --git a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/package-info.java b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/package-info.java new file mode 100644 index 0000000000000..83e2b4035acbf --- /dev/null +++ b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Base opensearch package. */ +package org.opensearch.test.telemetry.tracing;