diff --git a/CHANGELOG.md b/CHANGELOG.md index 9aa944e9d7c41..4cb48990b6737 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,10 +41,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `org.apache.commons:commons-compress` from 1.22 to 1.23.0 - Bump `org.apache.commons:commons-configuration2` from 2.8.0 to 2.9.0 - Bump `com.netflix.nebula:nebula-publishing-plugin` from 19.2.0 to 20.3.0 -- Bump `org.apache.commons:commons-compress` from 1.22 to 1.23.0 - Bump `com.diffplug.spotless` from 6.17.0 to 6.18.0 - Bump `io.opencensus:opencensus-api` from 0.18.0 to 0.31.1 ([#7291](https://github.com/opensearch-project/OpenSearch/pull/7291)) -- Bump `com.azure:azure-core` from 1.34.0 to 1.39.0 ### Changed - [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948)) @@ -77,6 +75,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix compression support for h2c protocol ([#4944](https://github.com/opensearch-project/OpenSearch/pull/4944)) - Support OpenSSL Provider with default Netty allocator ([#5460](https://github.com/opensearch-project/OpenSearch/pull/5460)) - Replaces ZipInputStream with ZipFile to fix Zip Slip vulnerability ([#7230](https://github.com/opensearch-project/OpenSearch/pull/7230)) +- Add missing validation/parsing of SearchBackpressureMode of SearchBackpressureSettings ([#7541](https://github.com/opensearch-project/OpenSearch/pull/7541)) ### Security @@ -89,6 +88,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244)) - Add 'unsigned_long' numeric field type ([#6237](https://github.com/opensearch-project/OpenSearch/pull/6237)) - Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375)) +- Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244)) +- Adds ExtensionsManager.lookupExtensionSettingsById ([#7466](https://github.com/opensearch-project/OpenSearch/pull/7466)) ### Dependencies - Bump `com.netflix.nebula:gradle-info-plugin` from 12.0.0 to 12.1.0 @@ -102,14 +103,17 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `commons-io:commons-io` from 2.7 to 2.11.0 - Bump `org.apache.shiro:shiro-core` from 1.9.1 to 1.11.0 ([#7397](https://github.com/opensearch-project/OpenSearch/pull/7397)) - Bump `jetty-server` in hdfs-fixture from 9.4.49.v20220914 to 9.4.51.v20230217 ([#7405](https://github.com/opensearch-project/OpenSearch/pull/7405)) -- Bump `com.networknt:json-schema-validator` from 1.0.78 to 1.0.81 (#7460) +- OpenJDK Update (April 2023 Patch releases) ([#7448](https://github.com/opensearch-project/OpenSearch/pull/7448) - Bump `org.apache.commons:commons-compress` from 1.22 to 1.23.0 (#7462) +- Bump `com.azure:azure-core` from 1.34.0 to 1.39.0 +- Bump `com.networknt:json-schema-validator` from 1.0.78 to 1.0.81 (#7460) - Bump Apache Lucene to 9.6.0 ([#7505](https://github.com/opensearch-project/OpenSearch/pull/7505)) - Bump `com.google.cloud:google-cloud-core-http` from 1.93.3 to 2.17.0 (#7488) ### Changed - Enable `./gradlew build` on MacOS by disabling bcw tests ([#7303](https://github.com/opensearch-project/OpenSearch/pull/7303)) - Moved concurrent-search from sandbox plugin to server module behind feature flag ([#7203](https://github.com/opensearch-project/OpenSearch/pull/7203)) +- Allow access to indices cache clear APIs for read only indexes ([#7303](https://github.com/opensearch-project/OpenSearch/pull/7303)) ### Deprecated @@ -117,6 +121,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Fixed - Replaces ZipInputStream with ZipFile to fix Zip Slip vulnerability ([#7230](https://github.com/opensearch-project/OpenSearch/pull/7230)) +- Add missing validation/parsing of SearchBackpressureMode of SearchBackpressureSettings ([#7541](https://github.com/opensearch-project/OpenSearch/pull/7541)) ### Security diff --git a/DEVELOPER_GUIDE.md b/DEVELOPER_GUIDE.md index 25fca31587cbc..0baf626142238 100644 --- a/DEVELOPER_GUIDE.md +++ b/DEVELOPER_GUIDE.md @@ -614,8 +614,9 @@ Pass a list of files or directories to limit your search. ### Lucene Snapshots -The Github workflow in [lucene-snapshots.yml](.github/workflows/lucene-snapshots.yml) is a Github worfklow executable by maintainers to build a top-down snapshot build of lucene. +The Github workflow in [lucene-snapshots.yml](.github/workflows/lucene-snapshots.yml) is a GitHub workflow executable by maintainers to build a top-down snapshot build of Lucene. These snapshots are available to test compatibility with upcoming changes to Lucene by updating the version at [version.properties](buildsrc/version.properties) with the `version-snapshot-sha` version. Example: `lucene = 10.0.0-snapshot-2e941fc`. +Note that these snapshots do not follow the Maven [naming convention](https://maven.apache.org/guides/getting-started/index.html#what-is-a-snapshot-version) with a (case sensitive) SNAPSHOT suffix, so these artifacts are considered "releases" by build systems such as the `mavenContent` repository filter in Gradle or `releases` artifact policies in Maven. ### Flaky Tests @@ -626,6 +627,6 @@ If you encounter a build/test failure in CI that is unrelated to the change in y 1. Follow failed CI links, and locate the failing test(s). 2. Copy-paste the failure into a comment of your PR. 3. Search through [issues](https://github.com/opensearch-project/OpenSearch/issues?q=is%3Aopen+is%3Aissue+label%3A%22flaky-test%22) using the name of the failed test for whether this is a known flaky test. -5. If an existing issue is found, paste a link to the known issue in a comment to your PR. -6. If no existing issue is found, open one. -7. Retry CI via the GitHub UX or by pushing an update to your PR. +4. If an existing issue is found, paste a link to the known issue in a comment to your PR. +5. If no existing issue is found, open one. +6. Retry CI via the GitHub UX or by pushing an update to your PR. diff --git a/libs/core/src/main/java/org/opensearch/Version.java b/libs/core/src/main/java/org/opensearch/Version.java index ef3876c3c33dd..a1f61fb02432e 100644 --- a/libs/core/src/main/java/org/opensearch/Version.java +++ b/libs/core/src/main/java/org/opensearch/Version.java @@ -87,7 +87,7 @@ public class Version implements Comparable, ToXContentFragment { public static final Version V_2_6_1 = new Version(2060199, org.apache.lucene.util.Version.LUCENE_9_5_0); public static final Version V_2_7_0 = new Version(2070099, org.apache.lucene.util.Version.LUCENE_9_5_0); public static final Version V_2_7_1 = new Version(2070199, org.apache.lucene.util.Version.LUCENE_9_5_0); - public static final Version V_2_8_0 = new Version(2080099, org.apache.lucene.util.Version.LUCENE_9_5_0); + public static final Version V_2_8_0 = new Version(2080099, org.apache.lucene.util.Version.LUCENE_9_6_0); public static final Version V_3_0_0 = new Version(3000099, org.apache.lucene.util.Version.LUCENE_9_6_0); public static final Version CURRENT = V_3_0_0; diff --git a/libs/core/src/main/java/org/opensearch/core/common/io/stream/BaseStreamInput.java b/libs/core/src/main/java/org/opensearch/core/common/io/stream/BaseStreamInput.java new file mode 100644 index 0000000000000..a0e50722bf01d --- /dev/null +++ b/libs/core/src/main/java/org/opensearch/core/common/io/stream/BaseStreamInput.java @@ -0,0 +1,19 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.core.common.io.stream; + +import java.io.InputStream; + +/** + * Foundation class for reading core types off the transport stream + * + * todo: refactor {@code StreamInput} primitive readers to this class + * + * @opensearch.internal + */ +public abstract class BaseStreamInput extends InputStream {} diff --git a/libs/core/src/main/java/org/opensearch/core/common/io/stream/BaseStreamOutput.java b/libs/core/src/main/java/org/opensearch/core/common/io/stream/BaseStreamOutput.java new file mode 100644 index 0000000000000..f7a8862fa5f2c --- /dev/null +++ b/libs/core/src/main/java/org/opensearch/core/common/io/stream/BaseStreamOutput.java @@ -0,0 +1,19 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.core.common.io.stream; + +import java.io.OutputStream; + +/** + * Foundation class for writing core types over the transport stream + * + * todo: refactor {@code StreamOutput} primitive writers to this class + * + * @opensearch.internal + */ +public abstract class BaseStreamOutput extends OutputStream {} diff --git a/libs/core/src/main/java/org/opensearch/core/common/io/stream/BaseWriteable.java b/libs/core/src/main/java/org/opensearch/core/common/io/stream/BaseWriteable.java new file mode 100644 index 0000000000000..56172e7c6a50e --- /dev/null +++ b/libs/core/src/main/java/org/opensearch/core/common/io/stream/BaseWriteable.java @@ -0,0 +1,130 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.core.common.io.stream; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Implementers can be written to a {@code StreamOutput} and read from a {@code StreamInput}. This allows them to be "thrown + * across the wire" using OpenSearch's internal protocol. If the implementer also implements equals and hashCode then a copy made by + * serializing and deserializing must be equal and have the same hashCode. It isn't required that such a copy be entirely unchanged. + * + * @opensearch.internal + */ +public interface BaseWriteable { + /** + * A WriteableRegistry registers {@link Writer} methods for writing data types over a + * {@link BaseStreamOutput} channel and {@link Reader} methods for reading data from a + * {@link BaseStreamInput} channel. + * + * @opensearch.internal + */ + class WriteableRegistry { + private static final Map, Writer> WRITER_REGISTRY = new ConcurrentHashMap<>(); + private static final Map> READER_REGISTRY = new ConcurrentHashMap<>(); + + /** + * registers a streamable writer + * + * @opensearch.internal + */ + public static > void registerWriter(final Class clazz, final W writer) { + if (WRITER_REGISTRY.containsKey(clazz)) { + throw new IllegalArgumentException("Streamable writer already registered for type [" + clazz.getName() + "]"); + } + WRITER_REGISTRY.put(clazz, writer); + } + + /** + * registers a streamable reader + * + * @opensearch.internal + */ + public static > void registerReader(final byte ordinal, final R reader) { + if (READER_REGISTRY.containsKey(ordinal)) { + throw new IllegalArgumentException("Streamable reader already registered for ordinal [" + (int) ordinal + "]"); + } + READER_REGISTRY.put(ordinal, reader); + } + + /** + * Returns the registered writer keyed by the class type + */ + @SuppressWarnings("unchecked") + public static > W getWriter(final Class clazz) { + return (W) WRITER_REGISTRY.get(clazz); + } + + /** + * Returns the ristered reader keyed by the unique ordinal + */ + @SuppressWarnings("unchecked") + public static > R getReader(final byte b) { + return (R) READER_REGISTRY.get(b); + } + } + + /** + * Write this into the {@linkplain BaseStreamOutput}. + */ + void writeTo(final S out) throws IOException; + + /** + * Reference to a method that can write some object to a {@link BaseStreamOutput}. + *

+ * By convention this is a method from {@link BaseStreamOutput} itself (e.g., {@code StreamOutput#writeString}). If the value can be + * {@code null}, then the "optional" variant of methods should be used! + *

+ * Most classes should implement {@code Writeable} and the {@code Writeable#writeTo(BaseStreamOutput)} method should use + * {@link BaseStreamOutput} methods directly or this indirectly: + *


+     * public void writeTo(StreamOutput out) throws IOException {
+     *     out.writeVInt(someValue);
+     *     out.writeMapOfLists(someMap, StreamOutput::writeString, StreamOutput::writeString);
+     * }
+     * 
+ */ + @FunctionalInterface + interface Writer { + + /** + * Write {@code V}-type {@code value} to the {@code out}put stream. + * + * @param out Output to write the {@code value} too + * @param value The value to add + */ + void write(final S out, V value) throws IOException; + } + + /** + * Reference to a method that can read some object from a stream. By convention this is a constructor that takes + * {@linkplain BaseStreamInput} as an argument for most classes and a static method for things like enums. Returning null from one of these + * is always wrong - for that we use methods like {@code StreamInput#readOptionalWriteable(Reader)}. + *

+ * As most classes will implement this via a constructor (or a static method in the case of enumerations), it's something that should + * look like: + *


+     * public MyClass(final StreamInput in) throws IOException {
+     *     this.someValue = in.readVInt();
+     *     this.someMap = in.readMapOfLists(StreamInput::readString, StreamInput::readString);
+     * }
+     * 
+ */ + @FunctionalInterface + interface Reader { + + /** + * Read {@code V}-type value from a stream. + * + * @param in Input to read the value from + */ + V read(final S in) throws IOException; + } +} diff --git a/libs/core/src/main/java/org/opensearch/core/common/io/stream/package-info.java b/libs/core/src/main/java/org/opensearch/core/common/io/stream/package-info.java new file mode 100644 index 0000000000000..76d0842466b96 --- /dev/null +++ b/libs/core/src/main/java/org/opensearch/core/common/io/stream/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +/** Core transport stream classes */ +package org.opensearch.core.common.io.stream; diff --git a/modules/geo/src/main/java/org/opensearch/geo/search/aggregations/bucket/geogrid/BaseGeoGrid.java b/modules/geo/src/main/java/org/opensearch/geo/search/aggregations/bucket/geogrid/BaseGeoGrid.java index f8cfcabc10593..9b5ed7777204e 100644 --- a/modules/geo/src/main/java/org/opensearch/geo/search/aggregations/bucket/geogrid/BaseGeoGrid.java +++ b/modules/geo/src/main/java/org/opensearch/geo/search/aggregations/bucket/geogrid/BaseGeoGrid.java @@ -33,7 +33,6 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.common.io.stream.Writeable; import org.opensearch.common.util.LongObjectPagedHashMap; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.search.aggregations.InternalAggregation; @@ -69,7 +68,7 @@ protected BaseGeoGrid(String name, int requiredSize, List buc this.buckets = buckets; } - protected abstract Writeable.Reader getBucketReader(); + protected abstract Reader getBucketReader(); /** * Read from a stream. diff --git a/modules/geo/src/main/java/org/opensearch/geo/search/aggregations/bucket/geogrid/GeoHashGrid.java b/modules/geo/src/main/java/org/opensearch/geo/search/aggregations/bucket/geogrid/GeoHashGrid.java index aa1d5504ad24f..9b6713ac033ae 100644 --- a/modules/geo/src/main/java/org/opensearch/geo/search/aggregations/bucket/geogrid/GeoHashGrid.java +++ b/modules/geo/src/main/java/org/opensearch/geo/search/aggregations/bucket/geogrid/GeoHashGrid.java @@ -76,7 +76,7 @@ protected InternalGeoHashGridBucket createBucket(long hashAsLong, long docCount, } @Override - protected Reader getBucketReader() { + protected Reader getBucketReader() { return InternalGeoHashGridBucket::new; } diff --git a/modules/geo/src/main/java/org/opensearch/geo/search/aggregations/bucket/geogrid/GeoTileGrid.java b/modules/geo/src/main/java/org/opensearch/geo/search/aggregations/bucket/geogrid/GeoTileGrid.java index 91c523c80855e..bf45080759a07 100644 --- a/modules/geo/src/main/java/org/opensearch/geo/search/aggregations/bucket/geogrid/GeoTileGrid.java +++ b/modules/geo/src/main/java/org/opensearch/geo/search/aggregations/bucket/geogrid/GeoTileGrid.java @@ -76,7 +76,7 @@ protected InternalGeoTileGridBucket createBucket(long hashAsLong, long docCount, } @Override - protected Reader getBucketReader() { + protected Reader getBucketReader() { return InternalGeoTileGridBucket::new; } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cluster.put_settings/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/cluster.put_settings/10_basic.yml index 825bac9f91649..4c263ac9f743a 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/cluster.put_settings/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cluster.put_settings/10_basic.yml @@ -69,3 +69,32 @@ include_defaults: true - match: {defaults.node.attr.testattr: "test"} + +--- +"Test set search backpressure mode": + + - do: + cluster.put_settings: + body: + persistent: + search_backpressure.mode: "monitor_only" + + - match: {persistent: {search_backpressure: {mode: "monitor_only"}}} + +--- +"Test set invalid search backpressure mode": + + - skip: + version: "- 2.9.99" + reason: "Parsing and validation of SearchBackpressureMode does not exist in versions < 3.0" + + - do: + catch: bad_request + cluster.put_settings: + body: + persistent: + search_backpressure.mode: "monitor-only" + + - match: {error.root_cause.0.type: "illegal_argument_exception"} + - match: { error.root_cause.0.reason: "Invalid SearchBackpressureMode: monitor-only" } + - match: { status: 400 } diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/cache/clear/ClearIndicesCacheBlocksIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/cache/clear/ClearIndicesCacheBlocksIT.java index 126bb57501abb..305b39ac3ad0c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/cache/clear/ClearIndicesCacheBlocksIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/cache/clear/ClearIndicesCacheBlocksIT.java @@ -55,7 +55,12 @@ public void testClearIndicesCacheWithBlocks() { NumShards numShards = getNumShards("test"); // Request is not blocked - for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE)) { + for (String blockSetting : Arrays.asList( + SETTING_BLOCKS_READ, + SETTING_BLOCKS_WRITE, + SETTING_READ_ONLY, + SETTING_READ_ONLY_ALLOW_DELETE + )) { try { enableIndexBlock("test", blockSetting); ClearIndicesCacheResponse clearIndicesCacheResponse = client().admin() @@ -73,7 +78,7 @@ public void testClearIndicesCacheWithBlocks() { } } // Request is blocked - for (String blockSetting : Arrays.asList(SETTING_READ_ONLY, SETTING_BLOCKS_METADATA, SETTING_READ_ONLY_ALLOW_DELETE)) { + for (String blockSetting : Arrays.asList(SETTING_BLOCKS_METADATA)) { try { enableIndexBlock("test", blockSetting); assertBlocked( diff --git a/server/src/internalClusterTest/java/org/opensearch/search/preference/SearchPreferenceIT.java b/server/src/internalClusterTest/java/org/opensearch/search/preference/SearchPreferenceIT.java index a9d40c03839c6..1b68320bcbdf5 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/preference/SearchPreferenceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/preference/SearchPreferenceIT.java @@ -142,33 +142,27 @@ public void testSimplePreference() { client().prepareIndex("test").setSource("field1", "value1").get(); refresh(); - SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_local").execute().actionGet(); + SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).get(); assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); + searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_local").execute().actionGet(); assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); - searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_primary").execute().actionGet(); - assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_primary").execute().actionGet(); assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); - searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet(); + searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_primary_first").execute().actionGet(); assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); + searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet(); assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); - searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica_first").execute().actionGet(); - assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica_first").execute().actionGet(); assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); - searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("1234").execute().actionGet(); - assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("1234").execute().actionGet(); assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); - searchResponse = client().prepareSearch().setQuery(matchAllQuery()).get(); - assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); } public void testThatSpecifyingNonExistingNodesReturnsUsefulError() { diff --git a/server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java b/server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java index 538c71b17b2fd..8601e2b6d6be9 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java @@ -148,7 +148,6 @@ public void testProfileMatchesRegular() throws Exception { .setProfile(false) .addSort("id.keyword", SortOrder.ASC) .setSearchType(SearchType.QUERY_THEN_FETCH) - .setPreference("_primary") .setRequestCache(false); SearchRequestBuilder profile = client().prepareSearch("test") @@ -156,7 +155,6 @@ public void testProfileMatchesRegular() throws Exception { .setProfile(true) .addSort("id.keyword", SortOrder.ASC) .setSearchType(SearchType.QUERY_THEN_FETCH) - .setPreference("_primary") .setRequestCache(false); MultiSearchResponse.Item[] responses = client().prepareMultiSearch().add(vanilla).add(profile).get().getResponses(); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java b/server/src/main/java/org/opensearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java index 12f1c78cea0c7..bd94866ace553 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java @@ -129,11 +129,11 @@ protected ShardsIterator shards(ClusterState clusterState, ClearIndicesCacheRequ @Override protected ClusterBlockException checkGlobalBlock(ClusterState state, ClearIndicesCacheRequest request) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); } @Override protected ClusterBlockException checkRequestBlock(ClusterState state, ClearIndicesCacheRequest request, String[] concreteIndices) { - return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, concreteIndices); + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices); } } diff --git a/server/src/main/java/org/opensearch/common/geo/GeoBoundingBox.java b/server/src/main/java/org/opensearch/common/geo/GeoBoundingBox.java index 8e3db45854f28..9609c10b6614f 100644 --- a/server/src/main/java/org/opensearch/common/geo/GeoBoundingBox.java +++ b/server/src/main/java/org/opensearch/common/geo/GeoBoundingBox.java @@ -79,8 +79,8 @@ public GeoBoundingBox(GeoPoint topLeft, GeoPoint bottomRight) { } public GeoBoundingBox(StreamInput input) throws IOException { - this.topLeft = input.readGeoPoint(); - this.bottomRight = input.readGeoPoint(); + this.topLeft = new GeoPoint(input); + this.bottomRight = new GeoPoint(input); } public boolean isUnbounded() { @@ -164,8 +164,8 @@ public boolean pointInBounds(double lon, double lat) { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeGeoPoint(topLeft); - out.writeGeoPoint(bottomRight); + topLeft.writeTo(out); + bottomRight.writeTo(out); } @Override diff --git a/server/src/main/java/org/opensearch/common/geo/GeoPoint.java b/server/src/main/java/org/opensearch/common/geo/GeoPoint.java index b5c2d6a846f92..874f0ffb80be1 100644 --- a/server/src/main/java/org/opensearch/common/geo/GeoPoint.java +++ b/server/src/main/java/org/opensearch/common/geo/GeoPoint.java @@ -40,6 +40,11 @@ import org.apache.lucene.util.BytesRef; import org.opensearch.OpenSearchParseException; import org.opensearch.common.geo.GeoUtils.EffectivePoint; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.BaseWriteable.Reader; +import org.opensearch.core.common.io.stream.BaseWriteable.Writer; +import org.opensearch.core.common.io.stream.BaseWriteable.WriteableRegistry; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.geometry.Geometry; @@ -87,6 +92,22 @@ public GeoPoint(GeoPoint template) { this(template.getLat(), template.getLon()); } + public GeoPoint(final StreamInput in) throws IOException { + this.lat = in.readDouble(); + this.lon = in.readDouble(); + } + + /** + * Register this type as a streamable so it can be serialized over the wire + */ + public static void registerStreamables() { + WriteableRegistry.>registerWriter(GeoPoint.class, (o, v) -> { + o.writeByte((byte) 22); + ((GeoPoint) v).writeTo(o); + }); + WriteableRegistry.>registerReader(Byte.valueOf((byte) 22), GeoPoint::new); + } + public GeoPoint reset(double lat, double lon) { this.lat = lat; this.lon = lon; @@ -210,6 +231,11 @@ public GeoPoint resetFromGeoHash(long geohashLong) { return this.resetFromIndexHash(BitUtil.flipFlop((geohashLong >>> 4) << ((level * 5) + 2))); } + public void writeTo(final StreamOutput out) throws IOException { + out.writeDouble(this.lat); + out.writeDouble(this.lon); + } + public double lat() { return this.lat; } diff --git a/server/src/main/java/org/opensearch/common/io/stream/StreamInput.java b/server/src/main/java/org/opensearch/common/io/stream/StreamInput.java index f1b4ffe2219aa..2b51d6c469fcf 100644 --- a/server/src/main/java/org/opensearch/common/io/stream/StreamInput.java +++ b/server/src/main/java/org/opensearch/common/io/stream/StreamInput.java @@ -50,20 +50,18 @@ import org.opensearch.common.Strings; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.geo.GeoPoint; import org.opensearch.common.settings.SecureString; import org.opensearch.common.text.Text; -import org.opensearch.common.time.DateUtils; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.io.stream.BaseStreamInput; +import org.opensearch.core.common.io.stream.BaseWriteable; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; -import org.opensearch.script.JodaCompatibleZonedDateTime; import java.io.ByteArrayInputStream; import java.io.EOFException; import java.io.FileNotFoundException; import java.io.FilterInputStream; import java.io.IOException; -import java.io.InputStream; import java.math.BigInteger; import java.nio.file.AccessDeniedException; import java.nio.file.AtomicMoveNotSupportedException; @@ -108,7 +106,7 @@ * * @opensearch.internal */ -public abstract class StreamInput extends InputStream { +public abstract class StreamInput extends BaseStreamInput { private Version version = Version.CURRENT; @@ -686,6 +684,11 @@ public Map readMap() throws IOException { @Nullable public Object readGenericValue() throws IOException { byte type = readByte(); + BaseWriteable.Reader r = BaseWriteable.WriteableRegistry.getReader(type); + if (r != null) { + return r.read(this); + } + switch (type) { case -1: return null; @@ -715,8 +718,6 @@ public Object readGenericValue() throws IOException { return readByte(); case 12: return readDate(); - case 13: - return readDateTime(); case 14: return readBytesReference(); case 15: @@ -733,8 +734,6 @@ public Object readGenericValue() throws IOException { return readDoubleArray(); case 21: return readBytesRef(); - case 22: - return readGeoPoint(); case 23: return readZonedDateTime(); case 24: @@ -778,14 +777,6 @@ private List readArrayList() throws IOException { return list; } - private JodaCompatibleZonedDateTime readDateTime() throws IOException { - // we reuse DateTime to communicate with older nodes that don't know about the joda compat layer, but - // here we are on a new node so we always want a compat datetime - final ZoneId zoneId = DateUtils.dateTimeZoneToZoneId(DateTimeZone.forID(readString())); - long millis = readLong(); - return new JodaCompatibleZonedDateTime(Instant.ofEpochMilli(millis), zoneId); - } - private ZonedDateTime readZonedDateTime() throws IOException { final String timeZoneId = readString(); return ZonedDateTime.ofInstant(Instant.ofEpochMilli(readLong()), ZoneId.of(timeZoneId)); @@ -833,13 +824,6 @@ private Date readDate() throws IOException { return new Date(readLong()); } - /** - * Reads a {@link GeoPoint} from this stream input - */ - public GeoPoint readGeoPoint() throws IOException { - return new GeoPoint(readDouble(), readDouble()); - } - /** * Read a {@linkplain DateTimeZone}. */ @@ -1181,7 +1165,7 @@ public C readOptionalNamedWriteable(Class category * @return the list of objects * @throws IOException if an I/O exception occurs reading the list */ - public List readList(final Writeable.Reader reader) throws IOException { + public List readList(final BaseWriteable.Reader reader) throws IOException { return readCollection(reader, ArrayList::new, Collections.emptyList()); } @@ -1223,8 +1207,11 @@ public Set readSet(Writeable.Reader reader) throws IOException { /** * Reads a collection of objects */ - private > C readCollection(Writeable.Reader reader, IntFunction constructor, C empty) - throws IOException { + private > C readCollection( + BaseWriteable.Reader reader, + IntFunction constructor, + C empty + ) throws IOException { int count = readArraySize(); if (count == 0) { return empty; diff --git a/server/src/main/java/org/opensearch/common/io/stream/StreamOutput.java b/server/src/main/java/org/opensearch/common/io/stream/StreamOutput.java index da17cddfcf97b..b0f4f6c8a6139 100644 --- a/server/src/main/java/org/opensearch/common/io/stream/StreamOutput.java +++ b/server/src/main/java/org/opensearch/common/io/stream/StreamOutput.java @@ -51,18 +51,17 @@ import org.opensearch.common.Nullable; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.geo.GeoPoint; import org.opensearch.common.io.stream.Writeable.Writer; import org.opensearch.common.settings.SecureString; import org.opensearch.common.text.Text; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.io.stream.BaseStreamOutput; +import org.opensearch.core.common.io.stream.BaseWriteable; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; -import org.opensearch.script.JodaCompatibleZonedDateTime; import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.OutputStream; import java.math.BigInteger; import java.nio.file.AccessDeniedException; import java.nio.file.AtomicMoveNotSupportedException; @@ -103,7 +102,7 @@ * * @opensearch.internal */ -public abstract class StreamOutput extends OutputStream { +public abstract class StreamOutput extends BaseStreamOutput { private static final int MAX_NESTED_EXCEPTION_LEVEL = 100; @@ -663,10 +662,10 @@ public final void writeOptionalInstant(@Nullable Instant instant) throws IOExcep } } - private static final Map, Writer> WRITERS; + private static final Map, BaseWriteable.Writer> WRITERS; static { - Map, Writer> writers = new HashMap<>(); + Map, BaseWriteable.Writer> writers = new HashMap<>(); writers.put(String.class, (o, v) -> { o.writeByte((byte) 0); o.writeString((String) v); @@ -773,25 +772,12 @@ public final void writeOptionalInstant(@Nullable Instant instant) throws IOExcep o.writeByte((byte) 21); o.writeBytesRef((BytesRef) v); }); - writers.put(GeoPoint.class, (o, v) -> { - o.writeByte((byte) 22); - o.writeGeoPoint((GeoPoint) v); - }); writers.put(ZonedDateTime.class, (o, v) -> { o.writeByte((byte) 23); final ZonedDateTime zonedDateTime = (ZonedDateTime) v; o.writeString(zonedDateTime.getZone().getId()); o.writeLong(zonedDateTime.toInstant().toEpochMilli()); }); - writers.put(JodaCompatibleZonedDateTime.class, (o, v) -> { - // write the joda compatibility datetime as joda datetime - o.writeByte((byte) 13); - final JodaCompatibleZonedDateTime zonedDateTime = (JodaCompatibleZonedDateTime) v; - String zoneId = zonedDateTime.getZonedDateTime().getZone().getId(); - // joda does not understand "Z" for utc, so we must special case - o.writeString(zoneId.equals("Z") ? DateTimeZone.UTC.getID() : zoneId); - o.writeLong(zonedDateTime.toInstant().toEpochMilli()); - }); writers.put(Set.class, (o, v) -> { if (v instanceof LinkedHashSet) { o.writeByte((byte) 24); @@ -838,7 +824,12 @@ public void writeGenericValue(@Nullable Object value) throws IOException { return; } final Class type = getGenericType(value); - final Writer writer = WRITERS.get(type); + BaseWriteable.Writer writer = BaseWriteable.WriteableRegistry.getWriter(type); + if (writer == null) { + // fallback to this local hashmap + // todo: move all writers to the registry + writer = WRITERS.get(type); + } if (writer != null) { writer.write(this, value); } else { @@ -1145,14 +1136,6 @@ public void writeOptionalNamedWriteable(@Nullable NamedWriteable namedWriteable) } } - /** - * Writes the given {@link GeoPoint} to the stream - */ - public void writeGeoPoint(GeoPoint geoPoint) throws IOException { - writeDouble(geoPoint.lat()); - writeDouble(geoPoint.lon()); - } - /** * Write a {@linkplain DateTimeZone} to the stream. */ @@ -1193,7 +1176,7 @@ public void writeOptionalZoneId(@Nullable ZoneId timeZone) throws IOException { /** * Writes a collection to this stream. The corresponding collection can be read from a stream input using - * {@link StreamInput#readList(Writeable.Reader)}. + * {@link StreamInput#readList(BaseWriteable.Reader)}. * * @param collection the collection to write to this stream * @throws IOException if an I/O exception occurs writing the collection @@ -1224,7 +1207,7 @@ public void writeCollection(final Collection collection, final Writer /** * Writes a collection of a strings. The corresponding collection can be read from a stream input using - * {@link StreamInput#readList(Writeable.Reader)}. + * {@link StreamInput#readList(BaseWriteable.Reader)}. * * @param collection the collection of strings * @throws IOException if an I/O exception occurs writing the collection @@ -1235,7 +1218,7 @@ public void writeStringCollection(final Collection collection) throws IO /** * Writes an optional collection of a strings. The corresponding collection can be read from a stream input using - * {@link StreamInput#readList(Writeable.Reader)}. + * {@link StreamInput#readList(BaseWriteable.Reader)}. * * @param collection the collection of strings * @throws IOException if an I/O exception occurs writing the collection diff --git a/server/src/main/java/org/opensearch/common/io/stream/Writeable.java b/server/src/main/java/org/opensearch/common/io/stream/Writeable.java index 5fd227db6ca83..c04cd7977fdc0 100644 --- a/server/src/main/java/org/opensearch/common/io/stream/Writeable.java +++ b/server/src/main/java/org/opensearch/common/io/stream/Writeable.java @@ -32,6 +32,8 @@ package org.opensearch.common.io.stream; +import org.opensearch.core.common.io.stream.BaseWriteable; + import java.io.IOException; /** @@ -41,7 +43,7 @@ * * @opensearch.internal */ -public interface Writeable { +public interface Writeable extends BaseWriteable { /** * Write this into the {@linkplain StreamOutput}. @@ -64,17 +66,7 @@ public interface Writeable { * */ @FunctionalInterface - interface Writer { - - /** - * Write {@code V}-type {@code value} to the {@code out}put stream. - * - * @param out Output to write the {@code value} too - * @param value The value to add - */ - void write(StreamOutput out, V value) throws IOException; - - } + interface Writer extends BaseWriteable.Writer {} /** * Reference to a method that can read some object from a stream. By convention this is a constructor that takes @@ -91,15 +83,6 @@ interface Writer { * */ @FunctionalInterface - interface Reader { - - /** - * Read {@code V}-type value from a stream. - * - * @param in Input to read the value from - */ - V read(StreamInput in) throws IOException; - - } + interface Reader extends BaseWriteable.Reader {} } diff --git a/server/src/main/java/org/opensearch/index/query/GeoDistanceQueryBuilder.java b/server/src/main/java/org/opensearch/index/query/GeoDistanceQueryBuilder.java index e3ad2781f5546..bc52d8fe6a6df 100644 --- a/server/src/main/java/org/opensearch/index/query/GeoDistanceQueryBuilder.java +++ b/server/src/main/java/org/opensearch/index/query/GeoDistanceQueryBuilder.java @@ -113,7 +113,7 @@ public GeoDistanceQueryBuilder(StreamInput in) throws IOException { fieldName = in.readString(); distance = in.readDouble(); validationMethod = GeoValidationMethod.readFromStream(in); - center = in.readGeoPoint(); + center = new GeoPoint(in); geoDistance = GeoDistance.readFromStream(in); ignoreUnmapped = in.readBoolean(); } @@ -123,7 +123,7 @@ protected void doWriteTo(StreamOutput out) throws IOException { out.writeString(fieldName); out.writeDouble(distance); validationMethod.writeTo(out); - out.writeGeoPoint(center); + center.writeTo(out); geoDistance.writeTo(out); out.writeBoolean(ignoreUnmapped); } diff --git a/server/src/main/java/org/opensearch/index/query/GeoPolygonQueryBuilder.java b/server/src/main/java/org/opensearch/index/query/GeoPolygonQueryBuilder.java index e0c7d44a08fac..f5e40fec78c25 100644 --- a/server/src/main/java/org/opensearch/index/query/GeoPolygonQueryBuilder.java +++ b/server/src/main/java/org/opensearch/index/query/GeoPolygonQueryBuilder.java @@ -114,7 +114,7 @@ public GeoPolygonQueryBuilder(StreamInput in) throws IOException { int size = in.readVInt(); shell = new ArrayList<>(size); for (int i = 0; i < size; i++) { - shell.add(in.readGeoPoint()); + shell.add(new GeoPoint(in)); } validationMethod = GeoValidationMethod.readFromStream(in); ignoreUnmapped = in.readBoolean(); @@ -125,7 +125,7 @@ protected void doWriteTo(StreamOutput out) throws IOException { out.writeString(fieldName); out.writeVInt(shell.size()); for (GeoPoint point : shell) { - out.writeGeoPoint(point); + point.writeTo(out); } validationMethod.writeTo(out); out.writeBoolean(ignoreUnmapped); diff --git a/server/src/main/java/org/opensearch/script/JodaCompatibleZonedDateTime.java b/server/src/main/java/org/opensearch/script/JodaCompatibleZonedDateTime.java index 08306b3f275a8..8d14a4fae992d 100644 --- a/server/src/main/java/org/opensearch/script/JodaCompatibleZonedDateTime.java +++ b/server/src/main/java/org/opensearch/script/JodaCompatibleZonedDateTime.java @@ -32,8 +32,15 @@ package org.opensearch.script; +import org.joda.time.DateTimeZone; import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.BaseWriteable.Reader; +import org.opensearch.core.common.io.stream.BaseWriteable.Writer; import org.opensearch.common.time.DateFormatter; +import org.opensearch.common.time.DateUtils; +import org.opensearch.core.common.io.stream.BaseWriteable.WriteableRegistry; import java.time.DayOfWeek; import java.time.Instant; @@ -77,6 +84,26 @@ public JodaCompatibleZonedDateTime(Instant instant, ZoneId zone) { this.dt = ZonedDateTime.ofInstant(instant, zone); } + /** + * Register this type as a streamable so it can be serialized over the wire + */ + public static void registerStreamables() { + WriteableRegistry.>registerWriter(JodaCompatibleZonedDateTime.class, (o, v) -> { + // write the joda compatibility datetime as joda datetime + o.writeByte((byte) 13); + final JodaCompatibleZonedDateTime zonedDateTime = (JodaCompatibleZonedDateTime) v; + String zoneId = zonedDateTime.getZonedDateTime().getZone().getId(); + // joda does not understand "Z" for utc, so we must special case + o.writeString(zoneId.equals("Z") ? DateTimeZone.UTC.getID() : zoneId); + o.writeLong(zonedDateTime.toInstant().toEpochMilli()); + }); + WriteableRegistry.>registerReader(Byte.valueOf((byte) 13), (i) -> { + final ZoneId zoneId = DateUtils.dateTimeZoneToZoneId(DateTimeZone.forID(i.readString())); + long millis = i.readLong(); + return new JodaCompatibleZonedDateTime(Instant.ofEpochMilli(millis), zoneId); + }); + } + // access the underlying ZonedDateTime public ZonedDateTime getZonedDateTime() { return dt; diff --git a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureMode.java b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureMode.java index a0e4e3c0d25aa..72b58f1d3de02 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureMode.java +++ b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureMode.java @@ -8,6 +8,8 @@ package org.opensearch.search.backpressure.settings; +import java.util.Locale; + /** * Defines the search backpressure mode. */ @@ -38,7 +40,7 @@ public String getName() { } public static SearchBackpressureMode fromName(String name) { - switch (name) { + switch (name.toLowerCase(Locale.ROOT)) { case "disabled": return DISABLED; case "monitor_only": diff --git a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java index a80f399460ff4..d20e3e50d419f 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java +++ b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java @@ -43,9 +43,10 @@ private static class Defaults { * Defines the search backpressure mode. It can be either "disabled", "monitor_only" or "enforced". */ private volatile SearchBackpressureMode mode; - public static final Setting SETTING_MODE = Setting.simpleString( + public static final Setting SETTING_MODE = new Setting<>( "search_backpressure.mode", Defaults.MODE, + SearchBackpressureMode::fromName, Setting.Property.Dynamic, Setting.Property.NodeScope ); @@ -113,8 +114,8 @@ public SearchBackpressureSettings(Settings settings, ClusterSettings clusterSett interval = new TimeValue(SETTING_INTERVAL_MILLIS.get(settings)); - mode = SearchBackpressureMode.fromName(SETTING_MODE.get(settings)); - clusterSettings.addSettingsUpdateConsumer(SETTING_MODE, s -> this.setMode(SearchBackpressureMode.fromName(s))); + mode = SETTING_MODE.get(settings); + clusterSettings.addSettingsUpdateConsumer(SETTING_MODE, this::setMode); clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_RATIO, searchShardTaskSettings::setCancellationRatio); clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_RATE, searchShardTaskSettings::setCancellationRate); clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_BURST, searchShardTaskSettings::setCancellationBurst); diff --git a/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java b/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java index 062d3c0b41116..05a5e4cfa9833 100644 --- a/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java +++ b/server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java @@ -270,7 +270,7 @@ public SearchSourceBuilder(StreamInput in) throws IOException { fetchFields = in.readList(FieldAndFormat::new); } pointInTimeBuilder = in.readOptionalWriteable(PointInTimeBuilder::new); - if (in.getVersion().onOrAfter(Version.V_3_0_0)) { // TODO: Update if/when we backport to 2.x + if (in.getVersion().onOrAfter(Version.V_2_8_0)) { if (in.readBoolean()) { searchPipelineSource = in.readMap(); } @@ -334,7 +334,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeList(fetchFields); } out.writeOptionalWriteable(pointInTimeBuilder); - if (out.getVersion().onOrAfter(Version.V_3_0_0)) { // TODO: Update if/when we backport to 2.x + if (out.getVersion().onOrAfter(Version.V_2_8_0)) { out.writeBoolean(searchPipelineSource != null); if (searchPipelineSource != null) { out.writeMap(searchPipelineSource); diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index 2b5dee229b8cb..e96f1d11c89d0 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -44,6 +44,7 @@ import org.opensearch.common.Nullable; import org.opensearch.common.Strings; import org.opensearch.common.component.AbstractLifecycleComponent; +import org.opensearch.common.geo.GeoPoint; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; @@ -61,6 +62,7 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.node.NodeClosedException; import org.opensearch.node.ReportingService; +import org.opensearch.script.JodaCompatibleZonedDateTime; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskManager; import org.opensearch.threadpool.Scheduler; @@ -161,6 +163,14 @@ public boolean isClosed() { public void close() {} }; + static { + // registers server specific streamables + registerStreamables(); + } + + /** does nothing. easy way to ensure class is loaded */ + public static void ensureClassloaded() {} + /** * Build the service. * @@ -231,6 +241,15 @@ public TransportService( ); } + /** + * Registers server specific types as a streamables for serialization + * over the {@link StreamOutput} and {@link StreamInput} wire + */ + private static void registerStreamables() { + JodaCompatibleZonedDateTime.registerStreamables(); + GeoPoint.registerStreamables(); + } + public RemoteClusterService getRemoteClusterService() { return remoteClusterService; } diff --git a/server/src/test/java/org/opensearch/action/admin/indices/cache/clear/TransportClearIndicesCacheActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/cache/clear/TransportClearIndicesCacheActionTests.java new file mode 100644 index 0000000000000..cce0927d4994f --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/indices/cache/clear/TransportClearIndicesCacheActionTests.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.cache.clear; + +import org.opensearch.action.support.ActionFilters; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlock; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.block.ClusterBlocks; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.indices.IndicesService; +import org.opensearch.rest.RestStatus; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.transport.TransportService; + +import java.util.EnumSet; + +import static org.mockito.Mockito.mock; + +public class TransportClearIndicesCacheActionTests extends OpenSearchTestCase { + private final TransportClearIndicesCacheAction action = new TransportClearIndicesCacheAction( + mock(ClusterService.class), + mock(TransportService.class), + mock(IndicesService.class), + mock(ActionFilters.class), + mock(IndexNameExpressionResolver.class) + ); + + private final ClusterBlock writeClusterBlock = new ClusterBlock( + 1, + "uuid", + "", + true, + true, + true, + RestStatus.OK, + EnumSet.of(ClusterBlockLevel.METADATA_WRITE) + ); + + private final ClusterBlock readClusterBlock = new ClusterBlock( + 1, + "uuid", + "", + true, + true, + true, + RestStatus.OK, + EnumSet.of(ClusterBlockLevel.METADATA_READ) + ); + + public void testGlobalBlockCheck() { + ClusterBlocks.Builder builder = ClusterBlocks.builder(); + builder.addGlobalBlock(writeClusterBlock); + ClusterState metadataWriteBlockedState = ClusterState.builder(ClusterState.EMPTY_STATE).blocks(builder).build(); + assertNull(action.checkGlobalBlock(metadataWriteBlockedState, new ClearIndicesCacheRequest())); + + builder = ClusterBlocks.builder(); + builder.addGlobalBlock(readClusterBlock); + ClusterState metadataReadBlockedState = ClusterState.builder(ClusterState.EMPTY_STATE).blocks(builder).build(); + assertNotNull(action.checkGlobalBlock(metadataReadBlockedState, new ClearIndicesCacheRequest())); + } + + public void testIndexBlockCheck() { + String indexName = "test"; + ClusterBlocks.Builder builder = ClusterBlocks.builder(); + builder.addIndexBlock(indexName, writeClusterBlock); + ClusterState metadataWriteBlockedState = ClusterState.builder(ClusterState.EMPTY_STATE).blocks(builder).build(); + assertNull(action.checkRequestBlock(metadataWriteBlockedState, new ClearIndicesCacheRequest(), new String[] { indexName })); + + builder = ClusterBlocks.builder(); + builder.addIndexBlock(indexName, readClusterBlock); + ClusterState metadataReadBlockedState = ClusterState.builder(ClusterState.EMPTY_STATE).blocks(builder).build(); + assertNotNull(action.checkRequestBlock(metadataReadBlockedState, new ClearIndicesCacheRequest(), new String[] { indexName })); + } +} diff --git a/server/src/test/java/org/opensearch/common/io/stream/BaseStreamTests.java b/server/src/test/java/org/opensearch/common/io/stream/BaseStreamTests.java index b92e59e43e0db..bd970be5e977d 100644 --- a/server/src/test/java/org/opensearch/common/io/stream/BaseStreamTests.java +++ b/server/src/test/java/org/opensearch/common/io/stream/BaseStreamTests.java @@ -40,12 +40,14 @@ import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.SecureString; +import org.opensearch.script.JodaCompatibleZonedDateTime; import org.opensearch.test.OpenSearchTestCase; import java.io.ByteArrayInputStream; import java.io.EOFException; import java.io.IOException; import java.time.Instant; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -396,6 +398,18 @@ public void testOptionalInstantSerialization() throws IOException { } } + public void testJodaDateTimeSerialization() throws IOException { + final BytesStreamOutput output = new BytesStreamOutput(); + long millis = randomIntBetween(0, Integer.MAX_VALUE); + JodaCompatibleZonedDateTime time = new JodaCompatibleZonedDateTime(Instant.ofEpochMilli(millis), ZoneOffset.ofHours(-7)); + output.writeGenericValue(time); + + final BytesReference bytesReference = output.bytes(); + final StreamInput input = getStreamInput(bytesReference); + Object inTime = input.readGenericValue(); + assertEquals(time, inTime); + } + static final class WriteableString implements Writeable { final String string; diff --git a/server/src/test/java/org/opensearch/common/io/stream/BytesStreamsTests.java b/server/src/test/java/org/opensearch/common/io/stream/BytesStreamsTests.java index bff35ad9fc975..36b84560bce41 100644 --- a/server/src/test/java/org/opensearch/common/io/stream/BytesStreamsTests.java +++ b/server/src/test/java/org/opensearch/common/io/stream/BytesStreamsTests.java @@ -639,9 +639,9 @@ public void testReadWriteGeoPoint() throws IOException { try (BytesStreamOutput out = new BytesStreamOutput()) { GeoPoint geoPoint = new GeoPoint(randomDouble(), randomDouble()); - out.writeGeoPoint(geoPoint); + geoPoint.writeTo(out); StreamInput wrap = out.bytes().streamInput(); - GeoPoint point = wrap.readGeoPoint(); + GeoPoint point = new GeoPoint(wrap); assertEquals(point, geoPoint); } } diff --git a/server/src/test/java/org/opensearch/common/settings/SettingsModuleTests.java b/server/src/test/java/org/opensearch/common/settings/SettingsModuleTests.java index cfc123e210a16..b15d3518e2f99 100644 --- a/server/src/test/java/org/opensearch/common/settings/SettingsModuleTests.java +++ b/server/src/test/java/org/opensearch/common/settings/SettingsModuleTests.java @@ -240,48 +240,46 @@ public void testOldMaxClauseCountSetting() { ); } - public void testDynamicNodeSettingsRegistration() throws Exception { - try (FeatureFlagSetter f = FeatureFlagSetter.set(FeatureFlags.EXTENSIONS)) { - Settings settings = Settings.builder().put("some.custom.setting", "2.0").build(); - SettingsModule module = new SettingsModule(settings, Setting.floatSetting("some.custom.setting", 1.0f, Property.NodeScope)); - assertNotNull(module.getClusterSettings().get("some.custom.setting")); - // For unregistered setting the value is expected to be null - assertNull(module.getClusterSettings().get("some.custom.setting2")); - assertInstanceBinding(module, Settings.class, (s) -> s == settings); + public void testDynamicNodeSettingsRegistration() { + FeatureFlagSetter.set(FeatureFlags.EXTENSIONS); + Settings settings = Settings.builder().put("some.custom.setting", "2.0").build(); + SettingsModule module = new SettingsModule(settings, Setting.floatSetting("some.custom.setting", 1.0f, Property.NodeScope)); + assertNotNull(module.getClusterSettings().get("some.custom.setting")); + // For unregistered setting the value is expected to be null + assertNull(module.getClusterSettings().get("some.custom.setting2")); + assertInstanceBinding(module, Settings.class, (s) -> s == settings); - assertTrue(module.registerDynamicSetting(Setting.floatSetting("some.custom.setting2", 1.0f, Property.NodeScope))); - assertNotNull(module.getClusterSettings().get("some.custom.setting2")); - // verify if some.custom.setting still exists - assertNotNull(module.getClusterSettings().get("some.custom.setting")); + assertTrue(module.registerDynamicSetting(Setting.floatSetting("some.custom.setting2", 1.0f, Property.NodeScope))); + assertNotNull(module.getClusterSettings().get("some.custom.setting2")); + // verify if some.custom.setting still exists + assertNotNull(module.getClusterSettings().get("some.custom.setting")); - // verify exception is thrown when setting registration fails - expectThrows( - SettingsException.class, - () -> module.registerDynamicSetting(Setting.floatSetting("some.custom.setting", 1.0f, Property.NodeScope)) - ); - } + // verify exception is thrown when setting registration fails + expectThrows( + SettingsException.class, + () -> module.registerDynamicSetting(Setting.floatSetting("some.custom.setting", 1.0f, Property.NodeScope)) + ); } - public void testDynamicIndexSettingsRegistration() throws Exception { - try (FeatureFlagSetter f = FeatureFlagSetter.set(FeatureFlags.EXTENSIONS)) { - Settings settings = Settings.builder().put("some.custom.setting", "2.0").build(); - SettingsModule module = new SettingsModule(settings, Setting.floatSetting("some.custom.setting", 1.0f, Property.NodeScope)); - assertNotNull(module.getClusterSettings().get("some.custom.setting")); - // For unregistered setting the value is expected to be null - assertNull(module.getIndexScopedSettings().get("index.custom.setting2")); - assertInstanceBinding(module, Settings.class, (s) -> s == settings); + public void testDynamicIndexSettingsRegistration() { + FeatureFlagSetter.set(FeatureFlags.EXTENSIONS); + Settings settings = Settings.builder().put("some.custom.setting", "2.0").build(); + SettingsModule module = new SettingsModule(settings, Setting.floatSetting("some.custom.setting", 1.0f, Property.NodeScope)); + assertNotNull(module.getClusterSettings().get("some.custom.setting")); + // For unregistered setting the value is expected to be null + assertNull(module.getIndexScopedSettings().get("index.custom.setting2")); + assertInstanceBinding(module, Settings.class, (s) -> s == settings); - assertTrue(module.registerDynamicSetting(Setting.floatSetting("index.custom.setting2", 1.0f, Property.IndexScope))); - assertNotNull(module.getIndexScopedSettings().get("index.custom.setting2")); + assertTrue(module.registerDynamicSetting(Setting.floatSetting("index.custom.setting2", 1.0f, Property.IndexScope))); + assertNotNull(module.getIndexScopedSettings().get("index.custom.setting2")); - // verify if some.custom.setting still exists - assertNotNull(module.getClusterSettings().get("some.custom.setting")); + // verify if some.custom.setting still exists + assertNotNull(module.getClusterSettings().get("some.custom.setting")); - // verify exception is thrown when setting registration fails - expectThrows( - SettingsException.class, - () -> module.registerDynamicSetting(Setting.floatSetting("index.custom.setting2", 1.0f, Property.IndexScope)) - ); - } + // verify exception is thrown when setting registration fails + expectThrows( + SettingsException.class, + () -> module.registerDynamicSetting(Setting.floatSetting("index.custom.setting2", 1.0f, Property.IndexScope)) + ); } } diff --git a/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java b/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java index ca16efdf11d7d..f175308482b15 100644 --- a/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java +++ b/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java @@ -15,12 +15,11 @@ public class FeatureFlagTests extends OpenSearchTestCase { private final String FLAG_PREFIX = "opensearch.experimental.feature."; - public void testFeatureFlagSet() throws Exception { + public void testFeatureFlagSet() { final String testFlag = FLAG_PREFIX + "testFlag"; - try (FeatureFlagSetter f = FeatureFlagSetter.set(testFlag)) { - assertNotNull(System.getProperty(testFlag)); - assertTrue(FeatureFlags.isEnabled(testFlag)); - } + FeatureFlagSetter.set(testFlag); + assertNotNull(System.getProperty(testFlag)); + assertTrue(FeatureFlags.isEnabled(testFlag)); } public void testMissingFeatureFlag() { diff --git a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java index 65fb0322ffa2f..7bee57bb2beec 100644 --- a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java +++ b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java @@ -44,6 +44,7 @@ import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.ClusterSettingsResponse; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.env.EnvironmentSettingsResponse; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; @@ -62,7 +63,6 @@ import org.opensearch.common.settings.WriteableSetting.SettingType; import org.opensearch.common.settings.SettingsModule; import org.opensearch.common.transport.TransportAddress; -import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.env.Environment; @@ -94,8 +94,6 @@ import org.opensearch.usage.UsageService; public class ExtensionsManagerTests extends OpenSearchTestCase { - - private FeatureFlagSetter featureFlagSetter; private TransportService transportService; private ActionModule actionModule; private RestController restController; @@ -134,7 +132,7 @@ public class ExtensionsManagerTests extends OpenSearchTestCase { @Before public void setup() throws Exception { - featureFlagSetter = FeatureFlagSetter.set(FeatureFlags.EXTENSIONS); + FeatureFlagSetter.set(FeatureFlags.EXTENSIONS); Settings settings = Settings.builder().put("cluster.name", "test").build(); transport = new MockNioTransport( settings, @@ -195,7 +193,6 @@ public void tearDown() throws Exception { transportService.close(); client.close(); ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); - featureFlagSetter.close(); } public void testDiscover() throws Exception { diff --git a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java index 3d4d505147a04..8f65271b9fcc9 100644 --- a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java @@ -1045,18 +1045,17 @@ public void testSetRemoteTranslogBufferIntervalFailsWhenEmpty() { @SuppressForbidden(reason = "sets the SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY feature flag") public void testExtendedCompatibilityVersionForRemoteSnapshot() throws Exception { - try (FeatureFlagSetter f = FeatureFlagSetter.set(FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY)) { - IndexMetadata metadata = newIndexMeta( - "index", - Settings.builder() - .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey()) - .build() - ); - IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY); - assertTrue(settings.isRemoteSnapshot()); - assertEquals(SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_MINIMUM_VERSION, settings.getExtendedCompatibilitySnapshotVersion()); - } + FeatureFlagSetter.set(FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY); + IndexMetadata metadata = newIndexMeta( + "index", + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey()) + .build() + ); + IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY); + assertTrue(settings.isRemoteSnapshot()); + assertEquals(SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_MINIMUM_VERSION, settings.getExtendedCompatibilitySnapshotVersion()); } public void testExtendedCompatibilityVersionForNonRemoteSnapshot() { diff --git a/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java index fed07e3805642..b23394d1edd9c 100644 --- a/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java @@ -246,21 +246,18 @@ public void testReadOldIndices() throws Exception { final String pathToTestIndex = "/indices/bwc/es-6.3.0/testIndex-es-6.3.0.zip"; Path tmp = createTempDir(); TestUtil.unzip(getClass().getResourceAsStream(pathToTestIndex), tmp); - try (FeatureFlagSetter f = FeatureFlagSetter.set(FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY)) { - final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( - "index", - Settings.builder() - .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) - .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey()) - .build() - ); - try (Store store = createStore(newFSDirectory(tmp))) { - EngineConfig config = config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); - try ( - ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, new TranslogStats(), true, Function.identity(), true) - ) { - assertVisibleCount(readOnlyEngine, 1, false); - } + FeatureFlagSetter.set(FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( + "index", + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey()) + .build() + ); + try (Store store = createStore(newFSDirectory(tmp))) { + EngineConfig config = config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, new TranslogStats(), true, Function.identity(), true)) { + assertVisibleCount(readOnlyEngine, 1, false); } } } diff --git a/server/src/test/java/org/opensearch/index/store/StoreTests.java b/server/src/test/java/org/opensearch/index/store/StoreTests.java index b810aa7d197f2..b11e8554027b1 100644 --- a/server/src/test/java/org/opensearch/index/store/StoreTests.java +++ b/server/src/test/java/org/opensearch/index/store/StoreTests.java @@ -1273,7 +1273,8 @@ public void testReadSegmentsFromOldIndices() throws Exception { final ShardId shardId = new ShardId("index", "_na_", 1); Store store = null; - try (FeatureFlagSetter f = FeatureFlagSetter.set(FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY)) { + try { + FeatureFlagSetter.set(FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY); IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( "index", Settings.builder() diff --git a/server/src/test/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettingsTests.java b/server/src/test/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettingsTests.java new file mode 100644 index 0000000000000..a02ca3cf877ad --- /dev/null +++ b/server/src/test/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettingsTests.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.settings; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchTestCase; + +public class SearchBackpressureSettingsTests extends OpenSearchTestCase { + + /** + * Validate proper construction of SearchBackpressureSettings object with a valid mode. + */ + public void testSearchBackpressureSettings() { + Settings settings = Settings.builder().put("search_backpressure.mode", "monitor_only").build(); + ClusterSettings cs = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + SearchBackpressureSettings sbs = new SearchBackpressureSettings(settings, cs); + assertEquals(SearchBackpressureMode.MONITOR_ONLY, sbs.getMode()); + assertEquals(settings, sbs.getSettings()); + assertEquals(cs, sbs.getClusterSettings()); + } + + /** + * Validate construction of SearchBackpressureSettings object gets rejected + * on invalid SearchBackpressureMode value. + */ + public void testSearchBackpressureSettingValidateInvalidMode() { + Settings settings = Settings.builder().put("search_backpressure.mode", "foo").build(); + assertThrows( + IllegalArgumentException.class, + () -> new SearchBackpressureSettings(settings, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) + ); + } +} diff --git a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java index bfa97fb1d3fe7..33ad845ea647e 100644 --- a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java +++ b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java @@ -135,7 +135,7 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso sizes.put(ThreadPool.Names.TRANSLOG_TRANSFER, ThreadPool::halfAllocatedProcessorsMaxTen); sizes.put(ThreadPool.Names.TRANSLOG_SYNC, n -> 4 * n); sizes.put(ThreadPool.Names.REMOTE_PURGE, ThreadPool::halfAllocatedProcessorsMaxFive); - sizes.put(ThreadPool.Names.REMOTE_REFRESH, n -> 4 * n); + sizes.put(ThreadPool.Names.REMOTE_REFRESH, ThreadPool::halfAllocatedProcessorsMaxTen); return sizes.get(threadPoolName).apply(numberOfProcessors); } diff --git a/test/framework/src/main/java/org/opensearch/test/FeatureFlagSetter.java b/test/framework/src/main/java/org/opensearch/test/FeatureFlagSetter.java index 26e884e707964..eddcf9c738bb3 100644 --- a/test/framework/src/main/java/org/opensearch/test/FeatureFlagSetter.java +++ b/test/framework/src/main/java/org/opensearch/test/FeatureFlagSetter.java @@ -8,32 +8,57 @@ package org.opensearch.test; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.util.concurrent.ConcurrentCollections; import java.security.AccessController; import java.security.PrivilegedAction; +import java.util.Set; /** * Helper class that wraps the lifecycle of setting and finally clearing of - * a {@link org.opensearch.common.util.FeatureFlags} string in an {@link AutoCloseable}. + * a {@link org.opensearch.common.util.FeatureFlags} string. */ -public class FeatureFlagSetter implements AutoCloseable { +public class FeatureFlagSetter { - private final String flag; + private static FeatureFlagSetter INSTANCE = null; - private FeatureFlagSetter(String flag) { - this.flag = flag; + private static synchronized FeatureFlagSetter getInstance() { + if (INSTANCE == null) { + INSTANCE = new FeatureFlagSetter(); + } + return INSTANCE; } + public static synchronized void set(String flag) { + getInstance().setFlag(flag); + } + + public static synchronized void clear() { + if (INSTANCE != null) { + INSTANCE.clearAll(); + INSTANCE = null; + } + } + + private static final Logger LOGGER = LogManager.getLogger(FeatureFlagSetter.class); + private final Set flags = ConcurrentCollections.newConcurrentSet(); + @SuppressForbidden(reason = "Enables setting of feature flags") - public static final FeatureFlagSetter set(String flag) { + private void setFlag(String flag) { + flags.add(flag); AccessController.doPrivileged((PrivilegedAction) () -> System.setProperty(flag, "true")); - return new FeatureFlagSetter(flag); + LOGGER.info("set feature_flag={}", flag); } - @SuppressForbidden(reason = "Clears the set feature flag on close") - @Override - public void close() throws Exception { - AccessController.doPrivileged((PrivilegedAction) () -> System.clearProperty(this.flag)); + @SuppressForbidden(reason = "Clears the set feature flags") + private void clearAll() { + for (String flag : flags) { + AccessController.doPrivileged((PrivilegedAction) () -> System.clearProperty(flag)); + } + LOGGER.info("unset feature_flags={}", flags); + flags.clear(); } } diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java index aed9064c9b09f..7722b59313b5f 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java @@ -103,7 +103,6 @@ import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContent; import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.core.xcontent.MediaType; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.core.xcontent.XContentParser.Token; import org.opensearch.env.Environment; @@ -129,6 +128,7 @@ import org.opensearch.test.junit.listeners.LoggingListener; import org.opensearch.test.junit.listeners.ReproduceInfoPrinter; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; import org.opensearch.transport.nio.MockNioTransportPlugin; import org.joda.time.DateTimeZone; import org.junit.After; @@ -220,6 +220,12 @@ public static void resetPortCounter() { portGenerator.set(0); } + @Override + public void tearDown() throws Exception { + FeatureFlagSetter.clear(); + super.tearDown(); + } + // Allows distinguishing between parallel test processes public static final String TEST_WORKER_VM_ID; @@ -258,6 +264,7 @@ public void append(LogEvent event) { })); BootstrapForTesting.ensureInitialized(); + TransportService.ensureClassloaded(); // ensure server streamables are registered // filter out joda timezones that are deprecated for the java time migration List jodaTZIds = DateTimeZone.getAvailableIDs()