Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
bharath-techie authored May 15, 2023
2 parents 73a2301 + f27d14a commit a0fd8ab
Show file tree
Hide file tree
Showing 13 changed files with 27 additions and 28 deletions.
9 changes: 5 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ buildscript {
opensearch_group = "org.opensearch"
isSnapshot = "true" == System.getProperty("build.snapshot", "true")
opensearch_version = System.getProperty("opensearch.version", "3.0.0-SNAPSHOT")
opensearch_plugin_version = System.getProperty("bwc.version", "2.7.0.0")
opensearch_plugin_version = System.getProperty("bwc.version", "2.8.0.0")
buildVersionQualifier = System.getProperty("build.version_qualifier", "")
// 2.0.0-rc1-SNAPSHOT -> 2.0.0.0-rc1-SNAPSHOT
version_tokens = opensearch_version.tokenize('-')
Expand Down Expand Up @@ -256,16 +256,17 @@ ext.getPluginResource = { download_to_folder, download_from_src ->
}

String baseName = "asynSearchCluster"
String bwcVersion = "2.7.0.0"
String bwcVersionShort = "2.8.0"
String bwcVersion = bwcVersionShort + ".0"
String bwcFilePath = "src/test/resources/org/opensearch/search/asynchronous/bwc/"
String bwcRemoteFile = "https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/2.7.0/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-asynchronous-search-2.7.0.0.zip"
String bwcRemoteFile = "https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/"+ bwcVersionShort + "/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-asynchronous-search-"+ bwcVersion +".zip"
getPluginResource(bwcFilePath + bwcVersion, bwcRemoteFile)
// Creates two test clusters of previous version and loads opensearch plugin of bwcVersion
2.times { i ->
testClusters {
"${baseName}$i" {
testDistribution = "ARCHIVE"
versions = ["2.7.0",opensearch_version]
versions = ["2.8.0",opensearch_version]
numberOfNodes = 3
plugin(provider(new Callable<RegularFile>() {
@Override
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.4.2-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-7.6.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Version;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput;
Expand Down Expand Up @@ -82,7 +81,7 @@ public SearchResponse getSearchResponse() {
asynchronousSearchPersistenceModel.getResponse())));
try (NamedWriteableAwareStreamInput wrapperStreamInput = new NamedWriteableAwareStreamInput(bytesReference.streamInput(),
namedWriteableRegistry)) {
wrapperStreamInput.setVersion(Version.readVersion(wrapperStreamInput));
wrapperStreamInput.setVersion(wrapperStreamInput.readVersion());
return new SearchResponse(wrapperStreamInput);
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Failed to parse search response for asynchronous search [{}] Response : [{}] ",
Expand All @@ -102,7 +101,7 @@ public Exception getSearchError() {
.decode(asynchronousSearchPersistenceModel.getError())));
try (NamedWriteableAwareStreamInput wrapperStreamInput = new NamedWriteableAwareStreamInput(bytesReference.streamInput(),
namedWriteableRegistry)) {
wrapperStreamInput.setVersion(Version.readVersion(wrapperStreamInput));
wrapperStreamInput.setVersion(wrapperStreamInput.readVersion());
return wrapperStreamInput.readException();
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Failed to parse search error for asynchronous search [{}] Error : [{}] ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private String serializeResponse(SearchResponse response) throws IOException {
return null;
}
try (BytesStreamOutput out = new BytesStreamOutput()) {
Version.writeVersion(Version.CURRENT, out);
out.writeVersion(Version.CURRENT);
response.writeTo(out);
byte[] bytes = BytesReference.toBytes(out.bytes());
return Base64.getUrlEncoder().withoutPadding().encodeToString(bytes);
Expand All @@ -68,7 +68,7 @@ private String serializeError(Exception error) throws IOException {
return null;
}
try (BytesStreamOutput out = new BytesStreamOutput()) {
Version.writeVersion(Version.CURRENT, out);
out.writeVersion(Version.CURRENT);
out.writeException(error instanceof OpenSearchException ? error : new OpenSearchException(error));
byte[] bytes = BytesReference.toBytes(out.bytes());
return Base64.getUrlEncoder().withoutPadding().encodeToString(bytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.search.asynchronous.management;

import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.search.asynchronous.context.AsynchronousSearchContext;
import org.opensearch.search.asynchronous.plugin.AsynchronousSearchPlugin;
import org.opensearch.search.asynchronous.response.AcknowledgedResponse;
Expand All @@ -23,7 +24,6 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Randomness;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.component.AbstractLifecycleComponent;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
Expand All @@ -32,7 +32,6 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.search.asynchronous.settings.LegacyOpendistroAsynchronousSearchSettings;
import org.opensearch.tasks.Task;
Expand All @@ -46,6 +45,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -196,8 +196,8 @@ public final void performCleanUp() {
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we have to execute under the system context so that if security is enabled the sync is authorized
threadContext.markAsSystemContext();
ImmutableOpenMap<String, DiscoveryNode> dataNodes = clusterService.state().nodes().getDataNodes();
List<DiscoveryNode> nodes = Stream.of(dataNodes.values().toArray(DiscoveryNode.class))
final Map<String, DiscoveryNode> dataNodes = clusterService.state().nodes().getDataNodes();
List<DiscoveryNode> nodes = Stream.of(dataNodes.values().toArray(new DiscoveryNode[0]))
.collect(Collectors.toList());
if (nodes == null || nodes.isEmpty()) {
logger.debug("Found empty data nodes with asynchronous search enabled attribute [{}] for response clean up", dataNodes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.search.asynchronous.service;

import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.search.asynchronous.context.persistence.AsynchronousSearchPersistenceModel;
Expand All @@ -30,7 +31,6 @@
import org.opensearch.common.io.stream.NotSerializableExceptionWrapper;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.engine.DocumentMissingException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.search.asynchronous.context.active;

import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.search.asynchronous.context.AsynchronousSearchContextId;
import org.opensearch.search.asynchronous.listener.AsynchronousSearchContextEventListener;
import org.opensearch.search.asynchronous.listener.AsynchronousSearchProgressListener;
Expand All @@ -18,7 +19,6 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ExecutorBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.search.asynchronous.context.permits;

import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.search.asynchronous.context.AsynchronousSearchContextId;
import org.opensearch.search.asynchronous.plugin.AsynchronousSearchPlugin;
import org.opensearch.action.ActionListener;
Expand All @@ -15,7 +16,6 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.TestThreadPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.search.asynchronous.integTests;

import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.search.asynchronous.action.DeleteAsynchronousSearchAction;
import org.opensearch.search.asynchronous.action.SubmitAsynchronousSearchAction;
import org.opensearch.search.asynchronous.commons.AsynchronousSearchIntegTestCase;
Expand All @@ -30,7 +31,6 @@
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchService;
import org.opensearch.search.aggregations.InternalAggregation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public void testNodewiseStats() throws InterruptedException {
submitAsynchronousSearchRequest.waitForCompletionTimeout(TimeValue.timeValueSeconds(2));
submitAsynchronousSearchRequest.keepOnCompletion(true);
List<DiscoveryNode> dataNodes = new LinkedList<>();
clusterService().state().nodes().getDataNodes().iterator().forEachRemaining(node -> {
dataNodes.add(node.value);
clusterService().state().nodes().getDataNodes().values().iterator().forEachRemaining(node -> {
dataNodes.add(node);
});
assertFalse(dataNodes.isEmpty());
DiscoveryNode randomDataNode = dataNodes.get(randomInt(dataNodes.size() - 1));
Expand Down Expand Up @@ -118,8 +118,8 @@ public void testStatsAcrossNodes() throws InterruptedException, ExecutionExcepti
client().prepareIndex(index).setId("3").setSource("field1", "quick"));

List<DiscoveryNode> dataNodes = new LinkedList<>();
clusterService().state().nodes().getDataNodes().iterator().forEachRemaining(node -> {
dataNodes.add(node.value);
clusterService().state().nodes().getDataNodes().values().iterator().forEachRemaining(node -> {
dataNodes.add(node);
});
assertFalse(dataNodes.isEmpty());
int numThreads = 20;
Expand Down Expand Up @@ -243,8 +243,8 @@ public void testThrottledAsynchronousSearchCount() throws InterruptedException,
client().prepareIndex(index).setId("3").setSource("field1", "quick"));

List<DiscoveryNode> dataNodes = new LinkedList<>();
clusterService().state().nodes().getDataNodes().iterator().forEachRemaining(node -> {
dataNodes.add(node.value);
clusterService().state().nodes().getDataNodes().values().iterator().forEachRemaining(node -> {
dataNodes.add(node);
});
assertFalse(dataNodes.isEmpty());
DiscoveryNode randomDataNode = dataNodes.get(randomInt(dataNodes.size() - 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.search.asynchronous.integTests;

import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.search.asynchronous.commons.AsynchronousSearchSingleNodeTestCase;
import org.opensearch.search.asynchronous.context.active.AsynchronousSearchActiveStore;
import org.opensearch.search.asynchronous.id.AsynchronousSearchId;
Expand All @@ -24,7 +25,6 @@
import org.opensearch.common.TriConsumer;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.index.query.MatchQueryBuilder;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.opensearch.search.asynchronous.service.AsynchronousSearchPersistenceService;
import org.opensearch.search.asynchronous.response.AcknowledgedResponse;
import org.opensearch.search.asynchronous.service.AsynchronousSearchService;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterChangedEvent;
Expand Down Expand Up @@ -197,8 +196,8 @@ private static IndexMetadata createIndexMetadata(final Index index, final long v
// Create the routing table for a cluster state.
private static RoutingTable createRoutingTable(final long version, final Metadata metadata) {
final RoutingTable.Builder builder = RoutingTable.builder().version(version);
for (ObjectCursor<IndexMetadata> cursor : metadata.indices().values()) {
builder.addAsNew(cursor.value);
for (final IndexMetadata indexMetadata : metadata.indices().values()) {
builder.addAsNew(indexMetadata);
}
return builder.build();
}
Expand Down
Binary file not shown.

0 comments on commit a0fd8ab

Please sign in to comment.