diff --git a/.github/workflows/gradle-check.yml b/.github/workflows/gradle-check.yml index dec5ee15d0bea..cbaa7fa10fbb6 100644 --- a/.github/workflows/gradle-check.yml +++ b/.github/workflows/gradle-check.yml @@ -2,9 +2,9 @@ name: Gradle Check (Jenkins) on: push: branches-ignore: - - 'backport/*' - - 'create-pull-request/*' - - 'dependabot/*' + - 'backport/**' + - 'create-pull-request/**' + - 'dependabot/**' pull_request_target: types: [opened, synchronize, reopened] diff --git a/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchCluster.java b/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchCluster.java index ef52adab6377a..0f5348d5a8dcf 100644 --- a/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchCluster.java +++ b/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchCluster.java @@ -84,6 +84,8 @@ public class OpenSearchCluster implements TestClusterConfiguration, Named { private final ArchiveOperations archiveOperations; private int nodeIndex = 0; + private int zoneCount = 1; + public OpenSearchCluster( String clusterName, Project project, @@ -104,13 +106,21 @@ public OpenSearchCluster( this.bwcJdk = bwcJdk; // Always add the first node - addNode(clusterName + "-0"); + String zone = hasZoneProperty() ? "zone-1" : ""; + addNode(clusterName + "-0", zone); // configure the cluster name eagerly so all nodes know about it this.nodes.all((node) -> node.defaultConfig.put("cluster.name", safeName(clusterName))); addWaitForClusterHealth(); } + public void setNumberOfZones(int zoneCount) { + if (zoneCount < 1) { + throw new IllegalArgumentException("Number of zones should be >= 1 but was " + zoneCount + " for " + this); + } + this.zoneCount = zoneCount; + } + public void setNumberOfNodes(int numberOfNodes) { checkFrozen(); @@ -124,12 +134,31 @@ public void setNumberOfNodes(int numberOfNodes) { ); } - for (int i = nodes.size(); i < numberOfNodes; i++) { - addNode(clusterName + "-" + i); + if (numberOfNodes < zoneCount) { + throw new IllegalArgumentException( + "Number of nodes should be >= zoneCount but was " + numberOfNodes + " for " + this.zoneCount + ); } + + if (hasZoneProperty()) { + int currentZone; + for (int i = nodes.size(); i < numberOfNodes; i++) { + currentZone = i % zoneCount + 1; + String zoneName = "zone-" + currentZone; + addNode(clusterName + "-" + i, zoneName); + } + } else { + for (int i = nodes.size(); i < numberOfNodes; i++) { + addNode(clusterName + "-" + i, ""); + } + } + } + + private boolean hasZoneProperty() { + return this.project.findProperty("numZones") != null; } - private void addNode(String nodeName) { + private void addNode(String nodeName, String zoneName) { OpenSearchNode newNode = new OpenSearchNode( path, nodeName, @@ -138,7 +167,8 @@ private void addNode(String nodeName) { fileSystemOperations, archiveOperations, workingDirBase, - bwcJdk + bwcJdk, + zoneName ); // configure the cluster name eagerly newNode.defaultConfig.put("cluster.name", safeName(clusterName)); diff --git a/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java b/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java index b051c15e81d6d..ab765efde7885 100644 --- a/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java +++ b/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java @@ -32,6 +32,7 @@ package org.opensearch.gradle.testclusters; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; import org.opensearch.gradle.Architecture; import org.opensearch.gradle.DistributionDownloadPlugin; import org.opensearch.gradle.OpenSearchDistribution; @@ -175,6 +176,8 @@ public class OpenSearchNode implements TestClusterConfiguration { private final Config legacyESConfig; private Config currentConfig; + private String zone; + OpenSearchNode( String path, String name, @@ -183,7 +186,8 @@ public class OpenSearchNode implements TestClusterConfiguration { FileSystemOperations fileSystemOperations, ArchiveOperations archiveOperations, File workingDirBase, - Jdk bwcJdk + Jdk bwcJdk, + String zone ) { this.path = path; this.name = name; @@ -205,6 +209,7 @@ public class OpenSearchNode implements TestClusterConfiguration { opensearchConfig = Config.getOpenSearchConfig(workingDir); legacyESConfig = Config.getLegacyESConfig(workingDir); currentConfig = opensearchConfig; + this.zone = zone; } /* @@ -1239,6 +1244,10 @@ private void createConfiguration() { baseConfig.put("path.logs", confPathLogs.toAbsolutePath().toString()); baseConfig.put("path.shared_data", workingDir.resolve("sharedData").toString()); baseConfig.put("node.attr.testattr", "test"); + if (StringUtils.isNotBlank(zone)) { + baseConfig.put("cluster.routing.allocation.awareness.attributes", "zone"); + baseConfig.put("node.attr.zone", zone); + } baseConfig.put("node.portsfile", "true"); baseConfig.put("http.port", httpPort); if (getVersion().onOrAfter(Version.fromString("6.7.0"))) { diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java index efcc13921c398..3da0f81023f72 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java @@ -885,7 +885,8 @@ public void testApiNamingConventions() throws Exception { "nodes.hot_threads", "nodes.usage", "nodes.reload_secure_settings", - "search_shards", }; + "search_shards", + "remote_store.restore", }; List booleanReturnMethods = Arrays.asList("security.enable_user", "security.disable_user", "security.change_password"); Set deprecatedMethods = new HashSet<>(); deprecatedMethods.add("indices.force_merge"); diff --git a/distribution/docker/src/docker/config/log4j2.properties b/distribution/docker/src/docker/config/log4j2.properties index a8c54137c7fd2..761478a9fdc6e 100644 --- a/distribution/docker/src/docker/config/log4j2.properties +++ b/distribution/docker/src/docker/config/log4j2.properties @@ -53,3 +53,13 @@ logger.index_indexing_slowlog.name = index.indexing.slowlog.index logger.index_indexing_slowlog.level = trace logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling.ref = index_indexing_slowlog_rolling logger.index_indexing_slowlog.additivity = false + +appender.task_detailslog_rolling.type = Console +appender.task_detailslog_rolling.name = task_detailslog_rolling +appender.task_detailslog_rolling.layout.type = OpenSearchJsonLayout +appender.task_detailslog_rolling.layout.type_name = task_detailslog + +logger.task_detailslog_rolling.name = task.detailslog +logger.task_detailslog_rolling.level = trace +logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling.ref = task_detailslog_rolling +logger.task_detailslog_rolling.additivity = false diff --git a/distribution/src/config/log4j2.properties b/distribution/src/config/log4j2.properties index 4820396c79eb7..bb27aaf2e22e6 100644 --- a/distribution/src/config/log4j2.properties +++ b/distribution/src/config/log4j2.properties @@ -195,3 +195,40 @@ logger.index_indexing_slowlog.level = trace logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling.ref = index_indexing_slowlog_rolling logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling_old.ref = index_indexing_slowlog_rolling_old logger.index_indexing_slowlog.additivity = false + +######## Task details log JSON #################### +appender.task_detailslog_rolling.type = RollingFile +appender.task_detailslog_rolling.name = task_detailslog_rolling +appender.task_detailslog_rolling.fileName = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_task_detailslog.json +appender.task_detailslog_rolling.filePermissions = rw-r----- +appender.task_detailslog_rolling.layout.type = OpenSearchJsonLayout +appender.task_detailslog_rolling.layout.type_name = task_detailslog +appender.task_detailslog_rolling.layout.opensearchmessagefields=taskId,type,action,description,start_time_millis,resource_stats,metadata + +appender.task_detailslog_rolling.filePattern = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_task_detailslog-%i.json.gz +appender.task_detailslog_rolling.policies.type = Policies +appender.task_detailslog_rolling.policies.size.type = SizeBasedTriggeringPolicy +appender.task_detailslog_rolling.policies.size.size = 1GB +appender.task_detailslog_rolling.strategy.type = DefaultRolloverStrategy +appender.task_detailslog_rolling.strategy.max = 4 +################################################# +######## Task details log - old style pattern #### +appender.task_detailslog_rolling_old.type = RollingFile +appender.task_detailslog_rolling_old.name = task_detailslog_rolling_old +appender.task_detailslog_rolling_old.fileName = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_task_detailslog.log +appender.task_detailslog_rolling_old.filePermissions = rw-r----- +appender.task_detailslog_rolling_old.layout.type = PatternLayout +appender.task_detailslog_rolling_old.layout.pattern = [%d{ISO8601}][%-5p][%-25c{1.}] [%node_name]%marker %m%n + +appender.task_detailslog_rolling_old.filePattern = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_task_detailslog-%i.log.gz +appender.task_detailslog_rolling_old.policies.type = Policies +appender.task_detailslog_rolling_old.policies.size.type = SizeBasedTriggeringPolicy +appender.task_detailslog_rolling_old.policies.size.size = 1GB +appender.task_detailslog_rolling_old.strategy.type = DefaultRolloverStrategy +appender.task_detailslog_rolling_old.strategy.max = 4 +################################################# +logger.task_detailslog_rolling.name = task.detailslog +logger.task_detailslog_rolling.level = trace +logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling.ref = task_detailslog_rolling +logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling_old.ref = task_detailslog_rolling_old +logger.task_detailslog_rolling.additivity = false diff --git a/distribution/tools/upgrade-cli/src/test/java/org/opensearch/upgrade/ImportLog4jPropertiesTaskTests.java b/distribution/tools/upgrade-cli/src/test/java/org/opensearch/upgrade/ImportLog4jPropertiesTaskTests.java index 7f67e08c66b9e..96544d3297ad4 100644 --- a/distribution/tools/upgrade-cli/src/test/java/org/opensearch/upgrade/ImportLog4jPropertiesTaskTests.java +++ b/distribution/tools/upgrade-cli/src/test/java/org/opensearch/upgrade/ImportLog4jPropertiesTaskTests.java @@ -67,7 +67,7 @@ public void testImportLog4jPropertiesTask() throws IOException { Properties properties = new Properties(); properties.load(Files.newInputStream(taskInput.getOpenSearchConfig().resolve(ImportLog4jPropertiesTask.LOG4J_PROPERTIES))); assertThat(properties, is(notNullValue())); - assertThat(properties.entrySet(), hasSize(137)); + assertThat(properties.entrySet(), hasSize(165)); assertThat(properties.get("appender.rolling.layout.type"), equalTo("OpenSearchJsonLayout")); assertThat( properties.get("appender.deprecation_rolling.fileName"), diff --git a/distribution/tools/upgrade-cli/src/test/resources/config/log4j2.properties b/distribution/tools/upgrade-cli/src/test/resources/config/log4j2.properties index b9ad71121165a..4b92d3fc62376 100644 --- a/distribution/tools/upgrade-cli/src/test/resources/config/log4j2.properties +++ b/distribution/tools/upgrade-cli/src/test/resources/config/log4j2.properties @@ -176,3 +176,38 @@ logger.index_indexing_slowlog.level = trace logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling.ref = index_indexing_slowlog_rolling logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling_old.ref = index_indexing_slowlog_rolling_old logger.index_indexing_slowlog.additivity = false + +######## Task details log JSON #################### +appender.task_detailslog_rolling.type = RollingFile +appender.task_detailslog_rolling.name = task_detailslog_rolling +appender.task_detailslog_rolling.fileName = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_task_detailslog.json +appender.task_detailslog_rolling.layout.type = ESJsonLayout +appender.task_detailslog_rolling.layout.type_name = task_detailslog +appender.task_detailslog_rolling.layout.esmessagefields=taskId,type,action,description,start_time_millis,resource_stats,metadata + +appender.task_detailslog_rolling.filePattern = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_task_detailslog-%i.json.gz +appender.task_detailslog_rolling.policies.type = Policies +appender.task_detailslog_rolling.policies.size.type = SizeBasedTriggeringPolicy +appender.task_detailslog_rolling.policies.size.size = 1GB +appender.task_detailslog_rolling.strategy.type = DefaultRolloverStrategy +appender.task_detailslog_rolling.strategy.max = 4 +################################################# +######## Task details log - old style pattern #### +appender.task_detailslog_rolling_old.type = RollingFile +appender.task_detailslog_rolling_old.name = task_detailslog_rolling_old +appender.task_detailslog_rolling_old.fileName = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_task_detailslog.log +appender.task_detailslog_rolling_old.layout.type = PatternLayout +appender.task_detailslog_rolling_old.layout.pattern = [%d{ISO8601}][%-5p][%-25c{1.}] [%node_name]%marker %m%n + +appender.task_detailslog_rolling_old.filePattern = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_task_detailslog-%i.log.gz +appender.task_detailslog_rolling_old.policies.type = Policies +appender.task_detailslog_rolling_old.policies.size.type = SizeBasedTriggeringPolicy +appender.task_detailslog_rolling_old.policies.size.size = 1GB +appender.task_detailslog_rolling_old.strategy.type = DefaultRolloverStrategy +appender.task_detailslog_rolling_old.strategy.max = 4 +################################################# +logger.task_detailslog_rolling.name = task.detailslog +logger.task_detailslog_rolling.level = trace +logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling.ref = task_detailslog_rolling +logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling_old.ref = task_detailslog_rolling_old +logger.task_detailslog_rolling.additivity = false diff --git a/gradle/run.gradle b/gradle/run.gradle index 5a1fed06c0ef7..639479e97d28f 100644 --- a/gradle/run.gradle +++ b/gradle/run.gradle @@ -31,9 +31,14 @@ import org.opensearch.gradle.testclusters.RunTask apply plugin: 'opensearch.testclusters' +def numNodes = findProperty('numNodes') as Integer ?: 1 +def numZones = findProperty('numZones') as Integer ?: 1 + testClusters { runTask { testDistribution = 'archive' + if (numZones > 1) numberOfZones = numZones + if (numNodes > 1) numberOfNodes = numNodes } } diff --git a/release-notes/opensearch.release-notes-2.2.0.md b/release-notes/opensearch.release-notes-2.2.0.md new file mode 100644 index 0000000000000..74e76cfe46b5a --- /dev/null +++ b/release-notes/opensearch.release-notes-2.2.0.md @@ -0,0 +1,79 @@ +## 2022-08-05 Version 2.2.0 Release Notes + +### Features/Enhancements + +* Task consumer Integration ([#2293](https://github.com/opensearch-project/opensearch/pull/2293)) ([#4141](https://github.com/opensearch-project/opensearch/pull/4141)) +* [Backport 2.x] [Segment Replication] Add SegmentReplicationTargetService to orchestrate replication events. ([#4074](https://github.com/opensearch-project/opensearch/pull/4074)) +* Support task resource tracking in OpenSearch ([#3982](https://github.com/opensearch-project/opensearch/pull/3982)) ([#4087](https://github.com/opensearch-project/opensearch/pull/4087)) +* Making shard copy count a multiple of attribute count ([#3462](https://github.com/opensearch-project/opensearch/pull/3462)) ([#4086](https://github.com/opensearch-project/opensearch/pull/4086)) +* [Backport 2.x] [Segment Rreplication] Adding CheckpointRefreshListener to trigger when Segment replication is turned on and Primary shard refreshes ([#4044](https://github.com/opensearch-project/opensearch/pull/4044)) +* Add doc_count field mapper ([#3985](https://github.com/opensearch-project/opensearch/pull/3985)) ([#4037](https://github.com/opensearch-project/opensearch/pull/4037)) +* Parallelize stale blobs deletion during snapshot delete ([#3796](https://github.com/opensearch-project/opensearch/pull/3796)) ([#3990](https://github.com/opensearch-project/opensearch/pull/3990)) +* [Backport 2.x] [Segment Replication] Add a new Engine implementation for replicas with segment replication enabled. ([#4003](https://github.com/opensearch-project/opensearch/pull/4003)) +* [Backport 2.x] Adds a new parameter, max_analyzer_offset, for the highlighter ([#4031](https://github.com/opensearch-project/opensearch/pull/4031)) +* Update merge on refresh and merge on commit defaults in Opensearch (Lucene 9.3) ([#3561](https://github.com/opensearch-project/opensearch/pull/3561)) ([#4013](https://github.com/opensearch-project/opensearch/pull/4013)) +* Make HybridDirectory MMAP Extensions Configurable ([#3837](https://github.com/opensearch-project/opensearch/pull/3837)) ([#3970](https://github.com/opensearch-project/opensearch/pull/3970)) +* Add option to disable chunked transfer-encoding ([#3864](https://github.com/opensearch-project/opensearch/pull/3864)) ([#3885](https://github.com/opensearch-project/opensearch/pull/3885)) +* Introducing TranslogManager implementations decoupled from the Engine [2.x] ([#3820](https://github.com/opensearch-project/opensearch/pull/3820)) +* Changing default no_master_block from write to metadata_write ([#3621](https://github.com/opensearch-project/opensearch/pull/3621)) ([#3756](https://github.com/opensearch-project/opensearch/pull/3756)) + +### Bug Fixes + +* OpenSearch crashes on closed client connection before search reply when total ops higher compared to expected ([#4143](https://github.com/opensearch-project/opensearch/pull/4143)) ([#4145](https://github.com/opensearch-project/opensearch/pull/4145)) +* Binding empty instance of SegmentReplicationCheckpointPublisher when Feature Flag is off in IndicesModule.java file. ([#4119](https://github.com/opensearch-project/opensearch/pull/4119)) +* Fix the bug that masterOperation(with task param) is bypassed ([#4103](https://github.com/opensearch-project/opensearch/pull/4103)) ([#4115](https://github.com/opensearch-project/opensearch/pull/4115)) +* Fixing flaky org.opensearch.cluster.routing.allocation.decider.DiskThresholdDeciderIT.testHighWatermarkNotExceeded test case ([#4012](https://github.com/opensearch-project/opensearch/pull/4012)) ([#4014](https://github.com/opensearch-project/opensearch/pull/4014)) +* Correct typo: Rutime -> Runtime ([#3896](https://github.com/opensearch-project/opensearch/pull/3896)) ([#3898](https://github.com/opensearch-project/opensearch/pull/3898)) +* Fixing implausibly old time stamp 1970-01-01 00:00:00 by using the timestamp from the Git revision instead of default 0 value ([#3883](https://github.com/opensearch-project/opensearch/pull/3883)) ([#3891](https://github.com/opensearch-project/opensearch/pull/3891)) + +### Infrastructure + +* Correctly ignore depandabot branches during push ([#4077](https://github.com/opensearch-project/opensearch/pull/4077)) ([#4113](https://github.com/opensearch-project/opensearch/pull/4113)) +* Build performance improvements ([#3926](https://github.com/opensearch-project/opensearch/pull/3926)) ([#3937](https://github.com/opensearch-project/opensearch/pull/3937)) +* PR coverage requirement and default settings ([#3931](https://github.com/opensearch-project/opensearch/pull/3931)) ([#3938](https://github.com/opensearch-project/opensearch/pull/3938)) +* [Backport 2.x] Fail build on wildcard imports ([#3940](https://github.com/opensearch-project/opensearch/pull/3940)) +* Don't run EmptyDirTaskTests in a Docker container ([#3792](https://github.com/opensearch-project/opensearch/pull/3792)) ([#3912](https://github.com/opensearch-project/opensearch/pull/3912)) +* Add coverage, gha, jenkins server, documentation and forum badges ([#3886](https://github.com/opensearch-project/opensearch/pull/3886)) +* Unable to use Systemd module with tar distribution ([#3755](https://github.com/opensearch-project/opensearch/pull/3755)) ([#3903](https://github.com/opensearch-project/opensearch/pull/3903)) +* Ignore backport / autocut / dependentbot branches for gradle checks ([#3816](https://github.com/opensearch-project/opensearch/pull/3816)) ([#3825](https://github.com/opensearch-project/opensearch/pull/3825)) +* Setup branch push coverage and fix coverage uploads ([#3793](https://github.com/opensearch-project/opensearch/pull/3793)) ([#3811](https://github.com/opensearch-project/opensearch/pull/3811)) +* Enable XML test reports for Jenkins integration ([#3799](https://github.com/opensearch-project/opensearch/pull/3799)) ([#3803](https://github.com/opensearch-project/opensearch/pull/3803)) + +### Maintenance + +* OpenJDK Update (July 2022 Patch releases) ([#4023](https://github.com/opensearch-project/opensearch/pull/4023)) ([#4092](https://github.com/opensearch-project/opensearch/pull/4092)) +* Update to Lucene 9.3.0 ([#4043](https://github.com/opensearch-project/opensearch/pull/4043)) ([#4088](https://github.com/opensearch-project/opensearch/pull/4088)) +* Bump commons-configuration2 from 2.7 to 2.8.0 in /plugins/repository-hdfs ([#3764](https://github.com/opensearch-project/opensearch/pull/3764)) ([#3783](https://github.com/opensearch-project/opensearch/pull/3783)) +* Use bash in systemd-entrypoint shebang ([#4008](https://github.com/opensearch-project/opensearch/pull/4008)) ([#4009](https://github.com/opensearch-project/opensearch/pull/4009)) +* Bump com.gradle.enterprise from 3.10.1 to 3.10.2 ([#3568](https://github.com/opensearch-project/opensearch/pull/3568)) ([#3934](https://github.com/opensearch-project/opensearch/pull/3934)) +* Bump log4j-core in /buildSrc/src/testKit/thirdPartyAudit/sample_jars ([#3763](https://github.com/opensearch-project/opensearch/pull/3763)) ([#3784](https://github.com/opensearch-project/opensearch/pull/3784)) +* Added bwc version 1.3.5 ([#3911](https://github.com/opensearch-project/opensearch/pull/3911)) ([#3913](https://github.com/opensearch-project/opensearch/pull/3913)) +* Update to Gradle 7.5 ([#3594](https://github.com/opensearch-project/opensearch/pull/3594)) ([#3904](https://github.com/opensearch-project/opensearch/pull/3904)) +* Update Netty to 4.1.79.Final ([#3868](https://github.com/opensearch-project/opensearch/pull/3868)) ([#3874](https://github.com/opensearch-project/opensearch/pull/3874)) +* Upgrade MinIO image version ([#3541](https://github.com/opensearch-project/opensearch/pull/3541)) ([#3867](https://github.com/opensearch-project/opensearch/pull/3867)) +* Add netty-transport-native-unix-common to modules/transport-netty4/bu… ([#3848](https://github.com/opensearch-project/opensearch/pull/3848)) ([#3853](https://github.com/opensearch-project/opensearch/pull/3853)) +* Update outdated dependencies ([#3821](https://github.com/opensearch-project/opensearch/pull/3821)) ([#3854](https://github.com/opensearch-project/opensearch/pull/3854)) +* Added bwc version 2.1.1 ([#3806](https://github.com/opensearch-project/opensearch/pull/3806)) +* Upgrade netty from 4.1.73.Final to 4.1.78.Final ([#3772](https://github.com/opensearch-project/opensearch/pull/3772)) ([#3778](https://github.com/opensearch-project/opensearch/pull/3778)) +* Bump protobuf-java from 3.21.1 to 3.21.2 in /plugins/repository-hdfs ([#3711](https://github.com/opensearch-project/opensearch/pull/3711)) ([#3726](https://github.com/opensearch-project/opensearch/pull/3726)) +* Upgrading AWS SDK dependency for native plugins ([#3694](https://github.com/opensearch-project/opensearch/pull/3694)) ([#3701](https://github.com/opensearch-project/opensearch/pull/3701)) + +### Refactoring + +* [Backport 2.x] Changes to encapsulate Translog into TranslogManager ([#4095](https://github.com/opensearch-project/opensearch/pull/4095)) ([#4142](https://github.com/opensearch-project/opensearch/pull/4142)) +* Deprecate and rename abstract methods in interfaces that contain 'master' in name ([#4121](https://github.com/opensearch-project/opensearch/pull/4121)) ([#4123](https://github.com/opensearch-project/opensearch/pull/4123)) +* [Backport 2.x] Integrate Engine with decoupled Translog interfaces ([#3822](https://github.com/opensearch-project/opensearch/pull/3822)) +* Deprecate class FakeThreadPoolMasterService, BlockMasterServiceOnMaster and BusyMasterServiceDisruption ([#4058](https://github.com/opensearch-project/opensearch/pull/4058)) ([#4068](https://github.com/opensearch-project/opensearch/pull/4068)) +* Rename classes with name 'MasterService' to 'ClusterManagerService' in directory 'test/framework' ([#4051](https://github.com/opensearch-project/opensearch/pull/4051)) ([#4057](https://github.com/opensearch-project/opensearch/pull/4057)) +* Deprecate class 'MasterService' and create alternative class 'ClusterManagerService' ([#4022](https://github.com/opensearch-project/opensearch/pull/4022)) ([#4050](https://github.com/opensearch-project/opensearch/pull/4050)) +* Deprecate and Rename abstract methods from 'Master' terminology to 'ClusterManager'. ([#4032](https://github.com/opensearch-project/opensearch/pull/4032)) ([#4048](https://github.com/opensearch-project/opensearch/pull/4048)) +* Deprecate public methods and variables that contain 'master' terminology in class 'NoMasterBlockService' and 'MasterService' ([#4006](https://github.com/opensearch-project/opensearch/pull/4006)) ([#4038](https://github.com/opensearch-project/opensearch/pull/4038)) +* Deprecate public methods and variables that contain 'master' terminology in 'client' directory ([#3966](https://github.com/opensearch-project/opensearch/pull/3966)) ([#3981](https://github.com/opensearch-project/opensearch/pull/3981)) +* [segment replication]Introducing common Replication interfaces for segment replication and recovery code paths ([#3234](https://github.com/opensearch-project/opensearch/pull/3234)) ([#3984](https://github.com/opensearch-project/opensearch/pull/3984)) +* Deprecate public methods and variables that contain 'master' terminology in 'test/framework' directory ([#3978](https://github.com/opensearch-project/opensearch/pull/3978)) ([#3987](https://github.com/opensearch-project/opensearch/pull/3987)) +* [Backport 2.x] [Segment Replication] Moving RecoveryState.Index to a top-level class and renaming ([#3971](https://github.com/opensearch-project/opensearch/pull/3971)) +* Rename and deprecate public methods that contains 'master' in the name in 'server' directory ([#3647](https://github.com/opensearch-project/opensearch/pull/3647)) ([#3964](https://github.com/opensearch-project/opensearch/pull/3964)) +* [2.x] Deprecate public class names with master terminology ([#3871](https://github.com/opensearch-project/opensearch/pull/3871)) ([#3914](https://github.com/opensearch-project/opensearch/pull/3914)) +* [Backport 2.x] Rename public classes with 'Master' to 'ClusterManager' ([#3870](https://github.com/opensearch-project/opensearch/pull/3870)) +* Revert renaming masterOperation() to clusterManagerOperation() ([#3681](https://github.com/opensearch-project/opensearch/pull/3681)) ([#3714](https://github.com/opensearch-project/opensearch/pull/3714)) +* Revert renaming method onMaster() and offMaster() in interface LocalNodeMasterListener ([#3686](https://github.com/opensearch-project/opensearch/pull/3686)) ([#3693](https://github.com/opensearch-project/opensearch/pull/3693)) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/remote_store.restore.json b/rest-api-spec/src/main/resources/rest-api-spec/api/remote_store.restore.json new file mode 100644 index 0000000000000..6af49f75b9f6e --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/remote_store.restore.json @@ -0,0 +1,34 @@ +{ + "remote_store.restore":{ + "documentation":{ + "url": "https://opensearch.org/docs/latest/opensearch/rest-api/remote-store#restore", + "description":"Restores from remote store." + }, + "stability":"experimental", + "url":{ + "paths":[ + { + "path":"/_remotestore/_restore", + "methods":[ + "POST" + ] + } + ] + }, + "params":{ + "cluster_manager_timeout":{ + "type":"time", + "description":"Explicit operation timeout for connection to cluster-manager node" + }, + "wait_for_completion":{ + "type":"boolean", + "description":"Should this request wait until the operation has completed before returning", + "default":false + } + }, + "body":{ + "description":"A comma separated list of index IDs", + "required":true + } + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/index/WaitUntilRefreshIT.java b/server/src/internalClusterTest/java/org/opensearch/index/WaitUntilRefreshIT.java index e38b128c04fde..a75057356fe8a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/WaitUntilRefreshIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/WaitUntilRefreshIT.java @@ -42,7 +42,10 @@ import org.opensearch.action.support.WriteRequest.RefreshPolicy; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Requests; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.plugins.Plugin; import org.opensearch.rest.RestStatus; import org.opensearch.script.MockScriptPlugin; @@ -74,7 +77,11 @@ public class WaitUntilRefreshIT extends OpenSearchIntegTestCase { @Override public Settings indexSettings() { // Use a shorter refresh interval to speed up the tests. We'll be waiting on this interval several times. - return Settings.builder().put(super.indexSettings()).put("index.refresh_interval", "40ms").build(); + final Settings.Builder builder = Settings.builder().put(super.indexSettings()).put("index.refresh_interval", "40ms"); + if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) { + builder.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); + } + return builder.build(); } @Before diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 2f470d7603869..052d2ec2b5764 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -61,6 +61,8 @@ import org.opensearch.action.admin.cluster.node.usage.TransportNodesUsageAction; import org.opensearch.action.admin.cluster.remote.RemoteInfoAction; import org.opensearch.action.admin.cluster.remote.TransportRemoteInfoAction; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreAction; +import org.opensearch.action.admin.cluster.remotestore.restore.TransportRestoreRemoteStoreAction; import org.opensearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryAction; import org.opensearch.action.admin.cluster.repositories.cleanup.TransportCleanupRepositoryAction; import org.opensearch.action.admin.cluster.repositories.delete.DeleteRepositoryAction; @@ -267,6 +269,7 @@ import org.opensearch.common.settings.IndexScopedSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.seqno.RetentionLeaseActions; import org.opensearch.indices.SystemIndices; import org.opensearch.indices.breaker.CircuitBreakerService; @@ -314,6 +317,7 @@ import org.opensearch.rest.action.admin.cluster.RestPutStoredScriptAction; import org.opensearch.rest.action.admin.cluster.RestReloadSecureSettingsAction; import org.opensearch.rest.action.admin.cluster.RestRemoteClusterInfoAction; +import org.opensearch.rest.action.admin.cluster.RestRestoreRemoteStoreAction; import org.opensearch.rest.action.admin.cluster.RestRestoreSnapshotAction; import org.opensearch.rest.action.admin.cluster.RestSnapshotsStatusAction; import org.opensearch.rest.action.admin.cluster.RestVerifyRepositoryAction; @@ -668,6 +672,9 @@ public void reg actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class); actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class); + // Remote Store + actions.register(RestoreRemoteStoreAction.INSTANCE, TransportRestoreRemoteStoreAction.class); + return unmodifiableMap(actions.getRegistry()); } @@ -853,6 +860,11 @@ public void initRestHandlers(Supplier nodesInCluster) { } } registerHandler.accept(new RestCatAction(catActions)); + + // Remote Store APIs + if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) { + registerHandler.accept(new RestRestoreRemoteStoreAction()); + } } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreAction.java new file mode 100644 index 0000000000000..46b1bc14e8537 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreAction.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.remotestore.restore; + +import org.opensearch.action.ActionType; + +/** + * Restore remote store action + * + * @opensearch.internal + */ +public final class RestoreRemoteStoreAction extends ActionType { + + public static final RestoreRemoteStoreAction INSTANCE = new RestoreRemoteStoreAction(); + public static final String NAME = "cluster:admin/remotestore/restore"; + + private RestoreRemoteStoreAction() { + super(NAME, RestoreRemoteStoreResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreResponse.java new file mode 100644 index 0000000000000..80bf96b6b2562 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreResponse.java @@ -0,0 +1,126 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.remotestore.restore; + +import org.opensearch.action.ActionResponse; +import org.opensearch.common.Nullable; +import org.opensearch.common.ParseField; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.ConstructingObjectParser; +import org.opensearch.common.xcontent.ToXContentObject; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.rest.RestStatus; +import org.opensearch.snapshots.RestoreInfo; + +import java.io.IOException; +import java.util.Objects; + +import static org.opensearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; + +/** + * Contains information about remote store restores + * + * @opensearch.internal + */ +public final class RestoreRemoteStoreResponse extends ActionResponse implements ToXContentObject { + + @Nullable + private final RestoreInfo restoreInfo; + + public RestoreRemoteStoreResponse(@Nullable RestoreInfo restoreInfo) { + this.restoreInfo = restoreInfo; + } + + public RestoreRemoteStoreResponse(StreamInput in) throws IOException { + super(in); + restoreInfo = RestoreInfo.readOptionalRestoreInfo(in); + } + + /** + * Returns restore information if remote store restore was completed before this method returned, null otherwise + * + * @return restore information or null + */ + public RestoreInfo getRestoreInfo() { + return restoreInfo; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable(restoreInfo); + } + + public RestStatus status() { + if (restoreInfo == null) { + return RestStatus.ACCEPTED; + } + return restoreInfo.status(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (restoreInfo != null) { + builder.field("remote_store"); + restoreInfo.toXContent(builder, params); + } else { + builder.field("accepted", true); + } + builder.endObject(); + return builder; + } + + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "restore_remote_store", + true, + v -> { + RestoreInfo restoreInfo = (RestoreInfo) v[0]; + Boolean accepted = (Boolean) v[1]; + assert (accepted == null && restoreInfo != null) || (accepted != null && accepted && restoreInfo == null) : "accepted: [" + + accepted + + "], restoreInfo: [" + + restoreInfo + + "]"; + return new RestoreRemoteStoreResponse(restoreInfo); + } + ); + + static { + PARSER.declareObject( + optionalConstructorArg(), + (parser, context) -> RestoreInfo.fromXContent(parser), + new ParseField("remote_store") + ); + PARSER.declareBoolean(optionalConstructorArg(), new ParseField("accepted")); + } + + public static RestoreRemoteStoreResponse fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + RestoreRemoteStoreResponse that = (RestoreRemoteStoreResponse) o; + return Objects.equals(restoreInfo, that.restoreInfo); + } + + @Override + public int hashCode() { + return Objects.hash(restoreInfo); + } + + @Override + public String toString() { + return "RestoreRemoteStoreResponse{" + "restoreInfo=" + restoreInfo + '}'; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/TransportRestoreRemoteStoreAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/TransportRestoreRemoteStoreAction.java new file mode 100644 index 0000000000000..7304ba25717ac --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/TransportRestoreRemoteStoreAction.java @@ -0,0 +1,103 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.remotestore.restore; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.cluster.snapshots.restore.RestoreClusterStateListener; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.snapshots.RestoreService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; + +/** + * Transport action for restore remote store operation + * + * @opensearch.internal + */ +public final class TransportRestoreRemoteStoreAction extends TransportClusterManagerNodeAction< + RestoreRemoteStoreRequest, + RestoreRemoteStoreResponse> { + private final RestoreService restoreService; + + @Inject + public TransportRestoreRemoteStoreAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + RestoreService restoreService, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super( + RestoreRemoteStoreAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + RestoreRemoteStoreRequest::new, + indexNameExpressionResolver + ); + this.restoreService = restoreService; + } + + @Override + protected String executor() { + return ThreadPool.Names.GENERIC; + } + + @Override + protected RestoreRemoteStoreResponse read(StreamInput in) throws IOException { + return new RestoreRemoteStoreResponse(in); + } + + @Override + protected ClusterBlockException checkBlock(RestoreRemoteStoreRequest request, ClusterState state) { + // Restoring a remote store might change the global state and create/change an index, + // so we need to check for METADATA_WRITE and WRITE blocks + ClusterBlockException blockException = state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + if (blockException != null) { + return blockException; + } + return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE); + + } + + @Override + protected void clusterManagerOperation( + final RestoreRemoteStoreRequest request, + final ClusterState state, + final ActionListener listener + ) { + restoreService.restoreFromRemoteStore( + request, + ActionListener.delegateFailure(listener, (delegatedListener, restoreCompletionResponse) -> { + if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) { + RestoreClusterStateListener.createAndRegisterListener( + clusterService, + restoreCompletionResponse, + delegatedListener, + RestoreRemoteStoreResponse::new + ); + } else { + delegatedListener.onResponse(new RestoreRemoteStoreResponse(restoreCompletionResponse.getRestoreInfo())); + } + }) + ); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/package-info.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/package-info.java index 363b7179f3c6c..10348f5ccfe6e 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/package-info.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/package-info.java @@ -6,5 +6,5 @@ * compatible open source license. */ -/** Restore Snapshot transport handler. */ +/** Restore remote store transport handler. */ package org.opensearch.action.admin.cluster.remotestore.restore; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreClusterStateListener.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreClusterStateListener.java index 7d2ca99e3dbf5..d0f78e85e26a5 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreClusterStateListener.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreClusterStateListener.java @@ -35,6 +35,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionResponse; import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterStateListener; import org.opensearch.cluster.RestoreInProgress; @@ -44,6 +45,8 @@ import org.opensearch.snapshots.RestoreInfo; import org.opensearch.snapshots.RestoreService; +import java.util.function.Function; + import static org.opensearch.snapshots.RestoreService.restoreInProgress; /** @@ -51,22 +54,27 @@ * * @opensearch.internal */ -public class RestoreClusterStateListener implements ClusterStateListener { +public class RestoreClusterStateListener implements ClusterStateListener { private static final Logger logger = LogManager.getLogger(RestoreClusterStateListener.class); private final ClusterService clusterService; private final String uuid; - private final ActionListener listener; + private final String restoreIdentifier; + private final ActionListener listener; + private final Function actionResponseFactory; private RestoreClusterStateListener( ClusterService clusterService, RestoreService.RestoreCompletionResponse response, - ActionListener listener + ActionListener listener, + Function actionResponseFactory ) { this.clusterService = clusterService; this.uuid = response.getUuid(); + this.restoreIdentifier = response.getSnapshot() != null ? response.getSnapshot().getSnapshotId().getName() : "remote_store"; this.listener = listener; + this.actionResponseFactory = actionResponseFactory; } @Override @@ -78,23 +86,23 @@ public void clusterChanged(ClusterChangedEvent changedEvent) { // on the current cluster-manager and as such it might miss some intermediary cluster states due to batching. // Clean up listener in that case and acknowledge completion of restore operation to client. clusterService.removeListener(this); - listener.onResponse(new RestoreSnapshotResponse((RestoreInfo) null)); + listener.onResponse(actionResponseFactory.apply(null)); } else if (newEntry == null) { clusterService.removeListener(this); ImmutableOpenMap shards = prevEntry.shards(); - assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state(); + assert prevEntry.state().completed() : "expected completed snapshot/remote store restore state but was " + prevEntry.state(); assert RestoreService.completed(shards) : "expected all restore entries to be completed"; RestoreInfo ri = new RestoreInfo( - prevEntry.snapshot().getSnapshotId().getName(), + restoreIdentifier, prevEntry.indices(), shards.size(), shards.size() - RestoreService.failedShards(shards) ); - RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri); - logger.debug("restore of [{}] completed", prevEntry.snapshot().getSnapshotId()); + T response = actionResponseFactory.apply(ri); + logger.debug("restore of [{}] completed", restoreIdentifier); listener.onResponse(response); } else { - // restore not completed yet, wait for next cluster state update + logger.debug("restore not completed yet, wait for next cluster state update"); } } @@ -102,11 +110,12 @@ public void clusterChanged(ClusterChangedEvent changedEvent) { * Creates a cluster state listener and registers it with the cluster service. The listener passed as a * parameter will be called when the restore is complete. */ - public static void createAndRegisterListener( + public static void createAndRegisterListener( ClusterService clusterService, RestoreService.RestoreCompletionResponse response, - ActionListener listener + ActionListener listener, + Function actionResponseFactory ) { - clusterService.addListener(new RestoreClusterStateListener(clusterService, response, listener)); + clusterService.addListener(new RestoreClusterStateListener(clusterService, response, listener, actionResponseFactory)); } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java index e7d95b9e40880..c2f79b2a27157 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java @@ -109,7 +109,12 @@ protected void clusterManagerOperation( ) { restoreService.restoreSnapshot(request, ActionListener.delegateFailure(listener, (delegatedListener, restoreCompletionResponse) -> { if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) { - RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, delegatedListener); + RestoreClusterStateListener.createAndRegisterListener( + clusterService, + restoreCompletionResponse, + delegatedListener, + RestoreSnapshotResponse::new + ); } else { delegatedListener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo())); } diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index 1597b31e89871..0876bf93a557b 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -302,7 +302,16 @@ public void onFailure(Exception t) { * It is possible to run into connection exceptions here because we are getting the connection early and might * run into nodes that are not connected. In this case, on shard failure will move us to the next shard copy. */ - fork(() -> onShardFailure(shardIndex, shard, shardIt, e)); + fork(() -> { + // It only happens when onPhaseDone() is called and executePhaseOnShard() fails hard with an exception. + // In this case calling onShardFailure() would overflow the operations counter, so the best we could do + // here is to fail the phase and move on to the next one. + if (totalOps.get() == expectedTotalOps) { + onPhaseFailure(this, "The phase has failed", e); + } else { + onShardFailure(shardIndex, shard, shardIt, e); + } + }); } finally { executeNext(pendingExecutions, thread); } diff --git a/server/src/main/java/org/opensearch/action/search/SearchShardTask.java b/server/src/main/java/org/opensearch/action/search/SearchShardTask.java index 57831896db714..c9d0d6e2d3d47 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchShardTask.java +++ b/server/src/main/java/org/opensearch/action/search/SearchShardTask.java @@ -32,12 +32,14 @@ package org.opensearch.action.search; +import org.opensearch.common.MemoizedSupplier; import org.opensearch.search.fetch.ShardFetchSearchRequest; import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.TaskId; import java.util.Map; +import java.util.function.Supplier; /** * Task storing information about a currently running search shard request. @@ -46,9 +48,28 @@ * @opensearch.internal */ public class SearchShardTask extends CancellableTask { + // generating metadata in a lazy way since source can be quite big + private final MemoizedSupplier metadataSupplier; public SearchShardTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { + this(id, type, action, description, parentTaskId, headers, () -> ""); + } + + public SearchShardTask( + long id, + String type, + String action, + String description, + TaskId parentTaskId, + Map headers, + Supplier metadataSupplier + ) { super(id, type, action, description, parentTaskId, headers); + this.metadataSupplier = new MemoizedSupplier<>(metadataSupplier); + } + + public String getTaskMetadata() { + return metadataSupplier.get(); } @Override diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java index d65ba3eddf776..a97f4ffe555b6 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java @@ -125,22 +125,27 @@ protected void masterOperation(Request request, ClusterState state, ActionListen throw new UnsupportedOperationException("Must be overridden"); } + // TODO: Add abstract keyword after removing the deprecated masterOperation() protected void clusterManagerOperation(Request request, ClusterState state, ActionListener listener) throws Exception { masterOperation(request, state, listener); } - /** @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #clusterManagerOperation(Task, ClusterManagerNodeRequest, ClusterState, ActionListener)} */ + /** + * Override this operation if access to the task parameter is needed + * @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #clusterManagerOperation(Task, ClusterManagerNodeRequest, ClusterState, ActionListener)} + */ @Deprecated protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) throws Exception { - clusterManagerOperation(task, request, state, listener); + clusterManagerOperation(request, state, listener); } /** * Override this operation if access to the task parameter is needed */ + // TODO: Change the implementation to call 'clusterManagerOperation(request...)' after removing the deprecated masterOperation() protected void clusterManagerOperation(Task task, Request request, ClusterState state, ActionListener listener) throws Exception { - clusterManagerOperation(request, state, listener); + masterOperation(task, request, state, listener); } protected boolean localExecute(Request request) { diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/info/TransportClusterInfoAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/info/TransportClusterInfoAction.java index 1411ff7b30695..c43256a61e8b4 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/info/TransportClusterInfoAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/info/TransportClusterInfoAction.java @@ -88,6 +88,7 @@ protected final void clusterManagerOperation(final Request request, final Cluste doClusterManagerOperation(request, concreteIndices, state, listener); } + // TODO: Add abstract keyword after removing the deprecated doMasterOperation() protected void doClusterManagerOperation( Request request, String[] concreteIndices, diff --git a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java index f4eaa979ff18c..7a7b98bf724f6 100644 --- a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java +++ b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java @@ -62,6 +62,8 @@ import org.opensearch.action.admin.cluster.node.usage.NodesUsageRequest; import org.opensearch.action.admin.cluster.node.usage.NodesUsageRequestBuilder; import org.opensearch.action.admin.cluster.node.usage.NodesUsageResponse; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreResponse; import org.opensearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest; import org.opensearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequestBuilder; import org.opensearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse; @@ -577,6 +579,11 @@ public interface ClusterAdminClient extends OpenSearchClient { */ void restoreSnapshot(RestoreSnapshotRequest request, ActionListener listener); + /** + * Restores from remote store. + */ + void restoreRemoteStore(RestoreRemoteStoreRequest request, ActionListener listener); + /** * Restores a snapshot. */ diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index f99454a8a8913..7084a856ab3d1 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -77,6 +77,9 @@ import org.opensearch.action.admin.cluster.node.usage.NodesUsageRequest; import org.opensearch.action.admin.cluster.node.usage.NodesUsageRequestBuilder; import org.opensearch.action.admin.cluster.node.usage.NodesUsageResponse; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreAction; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreResponse; import org.opensearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryAction; import org.opensearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest; import org.opensearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequestBuilder; @@ -1109,6 +1112,11 @@ public void restoreSnapshot(RestoreSnapshotRequest request, ActionListener listener) { + execute(RestoreRemoteStoreAction.INSTANCE, request, listener); + } + @Override public RestoreSnapshotRequestBuilder prepareRestoreSnapshot(String repository, String snapshot) { return new RestoreSnapshotRequestBuilder(this, RestoreSnapshotAction.INSTANCE, repository, snapshot); diff --git a/server/src/main/java/org/opensearch/cluster/LocalNodeClusterManagerListener.java b/server/src/main/java/org/opensearch/cluster/LocalNodeClusterManagerListener.java index c86aa00a6f2a2..c07dcc5daaee6 100644 --- a/server/src/main/java/org/opensearch/cluster/LocalNodeClusterManagerListener.java +++ b/server/src/main/java/org/opensearch/cluster/LocalNodeClusterManagerListener.java @@ -42,21 +42,21 @@ public interface LocalNodeClusterManagerListener extends ClusterStateListener { /** * Called when local node is elected to be the cluster-manager */ - void onMaster(); + void onClusterManager(); /** * Called when the local node used to be the cluster-manager, a new cluster-manager was elected and it's no longer the local node. */ - void offMaster(); + void offClusterManager(); @Override default void clusterChanged(ClusterChangedEvent event) { final boolean wasClusterManager = event.previousState().nodes().isLocalNodeElectedClusterManager(); final boolean isClusterManager = event.localNodeClusterManager(); if (wasClusterManager == false && isClusterManager) { - onMaster(); + onClusterManager(); } else if (wasClusterManager && isClusterManager == false) { - offMaster(); + offClusterManager(); } } } diff --git a/server/src/main/java/org/opensearch/cluster/LocalNodeMasterListener.java b/server/src/main/java/org/opensearch/cluster/LocalNodeMasterListener.java index eebfb60d8472d..31c0b294b8004 100644 --- a/server/src/main/java/org/opensearch/cluster/LocalNodeMasterListener.java +++ b/server/src/main/java/org/opensearch/cluster/LocalNodeMasterListener.java @@ -41,4 +41,33 @@ @Deprecated public interface LocalNodeMasterListener extends LocalNodeClusterManagerListener { + /** + * Called when local node is elected to be the cluster-manager. + * @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #onClusterManager()} + */ + @Deprecated + void onMaster(); + + /** + * Called when the local node used to be the cluster-manager, a new cluster-manager was elected and it's no longer the local node. + * @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #offClusterManager()} + */ + @Deprecated + void offMaster(); + + /** + * Called when local node is elected to be the cluster-manager. + */ + @Override + default void onClusterManager() { + onMaster(); + } + + /** + * Called when the local node used to be the cluster-manager, a new cluster-manager was elected and it's no longer the local node. + */ + @Override + default void offClusterManager() { + offMaster(); + } } diff --git a/server/src/main/java/org/opensearch/cluster/ack/AckedRequest.java b/server/src/main/java/org/opensearch/cluster/ack/AckedRequest.java index a3f74cb45a880..750f4b177cb86 100644 --- a/server/src/main/java/org/opensearch/cluster/ack/AckedRequest.java +++ b/server/src/main/java/org/opensearch/cluster/ack/AckedRequest.java @@ -48,6 +48,18 @@ public interface AckedRequest { /** * Returns the timeout for the request to be completed on the cluster-manager node + * @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #clusterManagerNodeTimeout()} */ - TimeValue masterNodeTimeout(); + @Deprecated + default TimeValue masterNodeTimeout() { + throw new UnsupportedOperationException("Must be overridden"); + } + + /** + * Returns the timeout for the request to be completed on the cluster-manager node + */ + // TODO: Remove default implementation after removing the deprecated masterNodeTimeout() + default TimeValue clusterManagerNodeTimeout() { + return masterNodeTimeout(); + } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index fe322992cd3e5..971fb518ff1da 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -41,6 +41,7 @@ import org.opensearch.index.ShardIndexingPressureMemoryManager; import org.opensearch.index.ShardIndexingPressureSettings; import org.opensearch.index.ShardIndexingPressureStore; +import org.opensearch.tasks.TaskManager; import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.watcher.ResourceWatcherService; import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; @@ -577,7 +578,8 @@ public void apply(Settings value, Settings current, Settings previous) { ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT, ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS, IndexingPressure.MAX_INDEXING_BYTES, - TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED + TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED, + TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED ) ) ); diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index af175be286b13..1f9306cf51e1e 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -69,6 +69,12 @@ public NRTReplicationEngine(EngineConfig engineConfig) { this.completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats")); this.readerManager = readerManager; this.readerManager.addListener(completionStatsCache); + for (ReferenceManager.RefreshListener listener : engineConfig.getExternalRefreshListener()) { + this.readerManager.addListener(listener); + } + for (ReferenceManager.RefreshListener listener : engineConfig.getInternalRefreshListener()) { + this.readerManager.addListener(listener); + } final Map userData = store.readLastCommittedSegmentsInfo().getUserData(); final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY)); translogManagerRef = new WriteOnlyTranslogManager( @@ -193,6 +199,18 @@ protected ReferenceManager getReferenceManager(Search return readerManager; } + /** + * Refreshing of this engine will only happen internally when a new set of segments is received. The engine will ignore external + * refresh attempts so we can return false here. Further Engine's existing implementation reads DirectoryReader.isCurrent after acquiring a searcher. + * With this Engine's NRTReplicationReaderManager, This will use StandardDirectoryReader's implementation which determines if the reader is current by + * comparing the on-disk SegmentInfos version against the one in the reader, which at refresh points will always return isCurrent false and then refreshNeeded true. + * Even if this method returns refresh as needed, we ignore it and only ever refresh with incoming SegmentInfos. + */ + @Override + public boolean refreshNeeded() { + return false; + } + @Override public Closeable acquireHistoryRetentionLock() { throw new UnsupportedOperationException("Not implemented"); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 9694f3dd37f80..b05eec4304cd5 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3778,6 +3778,10 @@ public boolean scheduledRefresh() { if (listenerNeedsRefresh == false // if we have a listener that is waiting for a refresh we need to force it && isSearchIdle() && indexSettings.isExplicitRefresh() == false + && indexSettings.isSegRepEnabled() == false + // Indices with segrep enabled will never wait on a refresh and ignore shard idle. Primary shards push out new segments only + // after a refresh, so we don't want to wait for a search to trigger that cycle. Replicas will only refresh after receiving + // a new set of segments. && active.get()) { // it must be active otherwise we might not free up segment memory once the shard became inactive // lets skip this refresh since we are search idle and // don't necessarily need to refresh. the next searcher access will register a refreshListener and that will diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index abcef1bd91944..8afb5bd055636 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -125,10 +125,13 @@ public int hashCode() { } /** - * Checks if other is aheadof current replication point by comparing segmentInfosVersion. Returns true for null + * Checks if current replication checkpoint is AheadOf `other` replication checkpoint point by first comparing + * primaryTerm followed by segmentInfosVersion. Returns true when `other` is null. */ public boolean isAheadOf(@Nullable ReplicationCheckpoint other) { - return other == null || segmentInfosVersion > other.getSegmentInfosVersion() || primaryTerm > other.getPrimaryTerm(); + return other == null + || primaryTerm > other.getPrimaryTerm() + || (primaryTerm == other.getPrimaryTerm() && segmentInfosVersion > other.getSegmentInfosVersion()); } @Override diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestRestoreRemoteStoreAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestRestoreRemoteStoreAction.java new file mode 100644 index 0000000000000..fca6745167bb4 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestRestoreRemoteStoreAction.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.admin.cluster; + +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; +import org.opensearch.client.node.NodeClient; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; + +import static java.util.Collections.singletonList; +import static org.opensearch.rest.RestRequest.Method.POST; + +/** + * Restores data from remote store + * + * @opensearch.api + */ +public final class RestRestoreRemoteStoreAction extends BaseRestHandler { + + @Override + public List routes() { + return singletonList(new Route(POST, "/_remotestore/_restore")); + } + + @Override + public String getName() { + return "restore_remote_store_action"; + } + + @Override + public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { + RestoreRemoteStoreRequest restoreRemoteStoreRequest = new RestoreRemoteStoreRequest(); + restoreRemoteStoreRequest.masterNodeTimeout( + request.paramAsTime("cluster_manager_timeout", restoreRemoteStoreRequest.masterNodeTimeout()) + ); + restoreRemoteStoreRequest.waitForCompletion(request.paramAsBoolean("wait_for_completion", false)); + request.applyContentParser(p -> restoreRemoteStoreRequest.source(p.mapOrdered())); + return channel -> client.admin().cluster().restoreRemoteStore(restoreRemoteStoreRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/main/java/org/opensearch/search/internal/ShardSearchRequest.java b/server/src/main/java/org/opensearch/search/internal/ShardSearchRequest.java index a4d7b2bed516c..828c2f8c78d69 100644 --- a/server/src/main/java/org/opensearch/search/internal/ShardSearchRequest.java +++ b/server/src/main/java/org/opensearch/search/internal/ShardSearchRequest.java @@ -51,6 +51,7 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.ToXContent; import org.opensearch.index.Index; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.MatchNoneQueryBuilder; @@ -71,6 +72,7 @@ import org.opensearch.transport.TransportRequest; import java.io.IOException; +import java.util.Collections; import java.util.Arrays; import java.util.Map; import java.util.function.Function; @@ -85,6 +87,8 @@ * @opensearch.internal */ public class ShardSearchRequest extends TransportRequest implements IndicesRequest { + public static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false")); + private final String clusterAlias; private final ShardId shardId; private final int numberOfShards; @@ -501,7 +505,7 @@ public String getClusterAlias() { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers); + return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers, this::getMetadataSupplier); } @Override @@ -510,6 +514,16 @@ public String getDescription() { return "shardId[" + shardId() + "]"; } + public String getMetadataSupplier() { + StringBuilder sb = new StringBuilder(); + if (source != null) { + sb.append("source[").append(source.toString(FORMAT_PARAMS)).append("]"); + } else { + sb.append("source[]"); + } + return sb.toString(); + } + public Rewriteable getRewriteable() { return new RequestRewritable(this); } diff --git a/server/src/main/java/org/opensearch/search/query/QuerySearchRequest.java b/server/src/main/java/org/opensearch/search/query/QuerySearchRequest.java index ae2f9e8fab989..ca74942decb50 100644 --- a/server/src/main/java/org/opensearch/search/query/QuerySearchRequest.java +++ b/server/src/main/java/org/opensearch/search/query/QuerySearchRequest.java @@ -123,7 +123,7 @@ public IndicesOptions indicesOptions() { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers); + return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers, this::getMetadataSupplier); } public String getDescription() { @@ -137,4 +137,7 @@ public String getDescription() { return sb.toString(); } + public String getMetadataSupplier() { + return shardSearchRequest().getMetadataSupplier(); + } } diff --git a/server/src/main/java/org/opensearch/tasks/TaskManager.java b/server/src/main/java/org/opensearch/tasks/TaskManager.java index 01b6b8ea603bf..334cde81dfb6a 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskManager.java +++ b/server/src/main/java/org/opensearch/tasks/TaskManager.java @@ -51,6 +51,8 @@ import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; @@ -58,6 +60,7 @@ import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.concurrent.ConcurrentMapLong; import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.tasks.consumer.TopNSearchTasksLogger; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TcpChannel; @@ -75,6 +78,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -92,6 +96,15 @@ public class TaskManager implements ClusterStateApplier { private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100); + public static final String TASK_RESOURCE_CONSUMERS_ATTRIBUTES = "task_resource_consumers.enabled"; + + public static final Setting TASK_RESOURCE_CONSUMERS_ENABLED = Setting.boolSetting( + TASK_RESOURCE_CONSUMERS_ATTRIBUTES, + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + /** * Rest headers that are copied to the task */ @@ -116,10 +129,26 @@ public class TaskManager implements ClusterStateApplier { private final Map channelPendingTaskTrackers = ConcurrentCollections.newConcurrentMap(); private final SetOnce cancellationService = new SetOnce<>(); + private volatile boolean taskResourceConsumersEnabled; + private final Set> taskResourceConsumer; + + public static TaskManager createTaskManagerWithClusterSettings( + Settings settings, + ClusterSettings clusterSettings, + ThreadPool threadPool, + Set taskHeaders + ) { + final TaskManager taskManager = new TaskManager(settings, threadPool, taskHeaders); + clusterSettings.addSettingsUpdateConsumer(TASK_RESOURCE_CONSUMERS_ENABLED, taskManager::setTaskResourceConsumersEnabled); + return taskManager; + } + public TaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { this.threadPool = threadPool; this.taskHeaders = new ArrayList<>(taskHeaders); this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings); + this.taskResourceConsumersEnabled = TASK_RESOURCE_CONSUMERS_ENABLED.get(settings); + this.taskResourceConsumer = Set.of(new TopNSearchTasksLogger(settings)); } public void setTaskResultsService(TaskResultsService taskResultsService) { @@ -135,6 +164,10 @@ public void setTaskResourceTrackingService(TaskResourceTrackingService taskResou this.taskResourceTrackingService.set(taskResourceTrackingService); } + public void setTaskResourceConsumersEnabled(boolean taskResourceConsumersEnabled) { + this.taskResourceConsumersEnabled = taskResourceConsumersEnabled; + } + /** * Registers a task without parent task */ @@ -240,6 +273,16 @@ public Task unregister(Task task) { // Decrement the task's self-thread as part of unregistration. task.decrementResourceTrackingThreads(); + if (taskResourceConsumersEnabled) { + for (Consumer taskConsumer : taskResourceConsumer) { + try { + taskConsumer.accept(task); + } catch (Exception e) { + logger.error("error encountered when updating the consumer", e); + } + } + } + if (task instanceof CancellableTask) { CancellableTaskHolder holder = cancellableTasks.remove(task.getId()); if (holder != null) { diff --git a/server/src/main/java/org/opensearch/tasks/consumer/SearchShardTaskDetailsLogMessage.java b/server/src/main/java/org/opensearch/tasks/consumer/SearchShardTaskDetailsLogMessage.java new file mode 100644 index 0000000000000..1755db3ab4ae8 --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/consumer/SearchShardTaskDetailsLogMessage.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks.consumer; + +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.common.logging.OpenSearchLogMessage; + +import java.util.HashMap; +import java.util.Map; + +/** + * Search shard task information that will be extracted from Task and converted into + * format that will be logged + * + * @opensearch.internal + */ +public final class SearchShardTaskDetailsLogMessage extends OpenSearchLogMessage { + SearchShardTaskDetailsLogMessage(SearchShardTask task) { + super(prepareMap(task), message(task)); + } + + private static Map prepareMap(SearchShardTask task) { + Map messageFields = new HashMap<>(); + messageFields.put("taskId", task.getId()); + messageFields.put("type", task.getType()); + messageFields.put("action", task.getAction()); + messageFields.put("description", task.getDescription()); + messageFields.put("start_time_millis", task.getStartTime()); + messageFields.put("parentTaskId", task.getParentTaskId()); + messageFields.put("resource_stats", task.getResourceStats()); + messageFields.put("metadata", task.getTaskMetadata()); + return messageFields; + } + + // Message will be used in plaintext logs + private static String message(SearchShardTask task) { + StringBuilder sb = new StringBuilder(); + sb.append("taskId:[") + .append(task.getId()) + .append("], ") + .append("type:[") + .append(task.getType()) + .append("], ") + .append("action:[") + .append(task.getAction()) + .append("], ") + .append("description:[") + .append(task.getDescription()) + .append("], ") + .append("start_time_millis:[") + .append(task.getStartTime()) + .append("], ") + .append("resource_stats:[") + .append(task.getResourceStats()) + .append("], ") + .append("metadata:[") + .append(task.getTaskMetadata()) + .append("]"); + return sb.toString(); + } +} diff --git a/server/src/main/java/org/opensearch/tasks/consumer/TopNSearchTasksLogger.java b/server/src/main/java/org/opensearch/tasks/consumer/TopNSearchTasksLogger.java new file mode 100644 index 0000000000000..dd7e200d7f4b2 --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/consumer/TopNSearchTasksLogger.java @@ -0,0 +1,100 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks.consumer; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.tasks.ResourceStats; +import org.opensearch.tasks.Task; + +import java.util.Comparator; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.function.Consumer; + +/** + * A simple listener that logs resource information of high memory consuming search tasks + * + * @opensearch.internal + */ +public class TopNSearchTasksLogger implements Consumer { + public static final String TASK_DETAILS_LOG_PREFIX = "task.detailslog"; + public static final String LOG_TOP_QUERIES_SIZE = "cluster.task.consumers.top_n.size"; + public static final String LOG_TOP_QUERIES_FREQUENCY = "cluster.task.consumers.top_n.frequency"; + + private static final Logger SEARCH_TASK_DETAILS_LOGGER = LogManager.getLogger(TASK_DETAILS_LOG_PREFIX + ".search"); + + // number of memory expensive search tasks that are logged + private static final Setting LOG_TOP_QUERIES_SIZE_SETTING = Setting.intSetting( + LOG_TOP_QUERIES_SIZE, + 10, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + // frequency in which memory expensive search tasks are logged + private static final Setting LOG_TOP_QUERIES_FREQUENCY_SETTING = Setting.timeSetting( + LOG_TOP_QUERIES_FREQUENCY, + TimeValue.timeValueSeconds(60L), + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private final int topQueriesSize; + private final long topQueriesLogFrequencyInNanos; + private final Queue> topQueries; + private long lastReportedTimeInNanos = System.nanoTime(); + + public TopNSearchTasksLogger(Settings settings) { + this.topQueriesSize = LOG_TOP_QUERIES_SIZE_SETTING.get(settings); + this.topQueriesLogFrequencyInNanos = LOG_TOP_QUERIES_FREQUENCY_SETTING.get(settings).getNanos(); + this.topQueries = new PriorityQueue<>(topQueriesSize, Comparator.comparingLong(Tuple::v1)); + } + + /** + * Called when task is unregistered and task has resource stats present. + */ + @Override + public void accept(Task task) { + if (task instanceof SearchShardTask) { + recordSearchTask((SearchShardTask) task); + } + } + + private synchronized void recordSearchTask(SearchShardTask searchTask) { + final long memory_in_bytes = searchTask.getTotalResourceUtilization(ResourceStats.MEMORY); + if (System.nanoTime() - lastReportedTimeInNanos >= topQueriesLogFrequencyInNanos) { + publishTopNEvents(); + lastReportedTimeInNanos = System.nanoTime(); + } + if (topQueries.size() >= topQueriesSize && topQueries.peek().v1() < memory_in_bytes) { + // evict the element + topQueries.poll(); + } + if (topQueries.size() < topQueriesSize) { + topQueries.offer(new Tuple<>(memory_in_bytes, searchTask)); + } + } + + private void publishTopNEvents() { + logTopResourceConsumingQueries(); + topQueries.clear(); + } + + private void logTopResourceConsumingQueries() { + for (Tuple topQuery : topQueries) { + SEARCH_TASK_DETAILS_LOGGER.info(new SearchShardTaskDetailsLogMessage(topQuery.v2())); + } + } +} diff --git a/server/src/main/java/org/opensearch/tasks/consumer/package-info.java b/server/src/main/java/org/opensearch/tasks/consumer/package-info.java new file mode 100644 index 0000000000000..40219a1cead5b --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/consumer/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Support for adding consumers to consume task related information. + */ +package org.opensearch.tasks.consumer; diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index 1a280f2475e5d..aaba06196bc59 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -210,7 +210,7 @@ public TransportService( setTracerLogInclude(TransportSettings.TRACE_LOG_INCLUDE_SETTING.get(settings)); setTracerLogExclude(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.get(settings)); tracerLog = Loggers.getLogger(logger, ".tracer"); - taskManager = createTaskManager(settings, threadPool, taskHeaders); + taskManager = createTaskManager(settings, clusterSettings, threadPool, taskHeaders); this.interceptor = transportInterceptor; this.asyncSender = interceptor.interceptSender(this::sendRequestInternal); this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings); @@ -246,8 +246,17 @@ public TaskManager getTaskManager() { return taskManager; } - protected TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { - return new TaskManager(settings, threadPool, taskHeaders); + protected TaskManager createTaskManager( + Settings settings, + ClusterSettings clusterSettings, + ThreadPool threadPool, + Set taskHeaders + ) { + if (clusterSettings != null) { + return TaskManager.createTaskManagerWithClusterSettings(settings, clusterSettings, threadPool, taskHeaders); + } else { + return new TaskManager(settings, threadPool, taskHeaders); + } } /** diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java index fd6f5d17a3a80..68cf69e30f8a6 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java @@ -53,6 +53,7 @@ import org.opensearch.common.io.stream.Writeable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.transport.BoundTransportAddress; import org.opensearch.common.util.PageCacheRecycler; @@ -218,11 +219,16 @@ public TestNode(String name, ThreadPool threadPool, Settings settings) { Collections.emptySet() ) { @Override - protected TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { + protected TaskManager createTaskManager( + Settings settings, + ClusterSettings clusterSettings, + ThreadPool threadPool, + Set taskHeaders + ) { if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) { return new MockTaskManager(settings, threadPool, taskHeaders); } else { - return super.createTaskManager(settings, threadPool, taskHeaders); + return super.createTaskManager(settings, clusterSettings, threadPool, taskHeaders); } } }; diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreResponseTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreResponseTests.java new file mode 100644 index 0000000000000..b52729c1bbfd7 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreResponseTests.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.remotestore.restore; + +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.snapshots.RestoreInfo; +import org.opensearch.test.AbstractXContentTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class RestoreRemoteStoreResponseTests extends AbstractXContentTestCase { + + @Override + protected RestoreRemoteStoreResponse createTestInstance() { + if (randomBoolean()) { + String name = "remote_store"; + List indices = new ArrayList<>(); + indices.add("test0"); + indices.add("test1"); + int totalShards = randomIntBetween(1, 1000); + int successfulShards = randomIntBetween(0, totalShards); + return new RestoreRemoteStoreResponse(new RestoreInfo(name, indices, totalShards, successfulShards)); + } else { + return new RestoreRemoteStoreResponse((RestoreInfo) null); + } + } + + @Override + protected RestoreRemoteStoreResponse doParseInstance(XContentParser parser) throws IOException { + return RestoreRemoteStoreResponse.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } +} diff --git a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java index b44b59b8a4ad5..ad2657517df9a 100644 --- a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java @@ -477,6 +477,57 @@ public void onFailure(Exception e) { assertThat(searchResponse.getSuccessfulShards(), equalTo(shards.length)); } + public void testExecutePhaseOnShardFailure() throws InterruptedException { + final Index index = new Index("test", UUID.randomUUID().toString()); + + final SearchShardIterator[] shards = IntStream.range(0, 2 + randomInt(3)) + .mapToObj(i -> new SearchShardIterator(null, new ShardId(index, i), List.of("n1", "n2", "n3"), null, null, null)) + .toArray(SearchShardIterator[]::new); + + final AtomicBoolean fail = new AtomicBoolean(true); + final CountDownLatch latch = new CountDownLatch(1); + SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); + searchRequest.setMaxConcurrentShardRequests(5); + + final ArraySearchPhaseResults queryResult = new ArraySearchPhaseResults<>(shards.length); + AbstractSearchAsyncAction action = createAction( + searchRequest, + queryResult, + new ActionListener() { + @Override + public void onResponse(SearchResponse response) {} + + @Override + public void onFailure(Exception e) { + try { + // We end up here only when onPhaseDone() is called (causing NPE) and + // ending up in the onPhaseFailure() callback + if (fail.compareAndExchange(true, false)) { + assertThat(e, instanceOf(SearchPhaseExecutionException.class)); + throw new RuntimeException("Simulated exception"); + } + } finally { + executor.submit(() -> latch.countDown()); + } + } + }, + false, + false, + new AtomicLong(), + shards + ); + action.run(); + assertTrue(latch.await(1, TimeUnit.SECONDS)); + + InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty(); + SearchResponse searchResponse = action.buildSearchResponse(internalSearchResponse, action.buildShardFailures(), null, null); + assertSame(searchResponse.getAggregations(), internalSearchResponse.aggregations()); + assertSame(searchResponse.getSuggest(), internalSearchResponse.suggest()); + assertSame(searchResponse.getProfileResults(), internalSearchResponse.profile()); + assertSame(searchResponse.getHits(), internalSearchResponse.hits()); + assertThat(searchResponse.getSuccessfulShards(), equalTo(shards.length)); + } + private static final class PhaseResult extends SearchPhaseResult { PhaseResult(ShardSearchContextId contextId) { this.contextId = contextId; diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java index 9ec9e656257d6..1195ed2590b1e 100644 --- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java @@ -274,6 +274,43 @@ protected void clusterManagerOperation(Task task, Request request, ClusterState } } + /* The test is copied from testLocalOperationWithoutBlocks() + to validate the backwards compatibility for the deprecated method masterOperation(with task parameter). */ + public void testDeprecatedMasterOperationWithTaskParameterCanBeCalled() throws ExecutionException, InterruptedException { + final boolean clusterManagerOperationFailure = randomBoolean(); + + Request request = new Request(); + PlainActionFuture listener = new PlainActionFuture<>(); + + final Exception exception = new Exception(); + final Response response = new Response(); + + setState(clusterService, ClusterStateCreationUtils.state(localNode, localNode, allNodes)); + + new Action("internal:testAction", transportService, clusterService, threadPool) { + @Override + protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) { + if (clusterManagerOperationFailure) { + listener.onFailure(exception); + } else { + listener.onResponse(response); + } + } + }.execute(request, listener); + assertTrue(listener.isDone()); + + if (clusterManagerOperationFailure) { + try { + listener.get(); + fail("Expected exception but returned proper result"); + } catch (ExecutionException ex) { + assertThat(ex.getCause(), equalTo(exception)); + } + } else { + assertThat(listener.get(), equalTo(response)); + } + } + public void testLocalOperationWithBlocks() throws ExecutionException, InterruptedException { final boolean retryableBlock = randomBoolean(); final boolean unblockBeforeTimeout = randomBoolean(); diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterApplierServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterApplierServiceTests.java index 62dae0622eb85..e6da768650088 100644 --- a/server/src/test/java/org/opensearch/cluster/service/ClusterApplierServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterApplierServiceTests.java @@ -299,12 +299,12 @@ public void testLocalNodeClusterManagerListenerCallbacks() { AtomicBoolean isClusterManager = new AtomicBoolean(); timedClusterApplierService.addLocalNodeClusterManagerListener(new LocalNodeClusterManagerListener() { @Override - public void onMaster() { + public void onClusterManager() { isClusterManager.set(true); } @Override - public void offMaster() { + public void offClusterManager() { isClusterManager.set(false); } }); diff --git a/server/src/test/java/org/opensearch/index/IndexingSlowLogTests.java b/server/src/test/java/org/opensearch/index/IndexingSlowLogTests.java index 38c8491d79150..75a346e444b73 100644 --- a/server/src/test/java/org/opensearch/index/IndexingSlowLogTests.java +++ b/server/src/test/java/org/opensearch/index/IndexingSlowLogTests.java @@ -86,8 +86,8 @@ public static void init() throws IllegalAccessException { @AfterClass public static void cleanup() { - appender.stop(); Loggers.removeAppender(testLogger1, appender); + appender.stop(); } public void testLevelPrecedence() { diff --git a/server/src/test/java/org/opensearch/index/SearchSlowLogTests.java b/server/src/test/java/org/opensearch/index/SearchSlowLogTests.java index ae159092a4833..ea146ec20b16a 100644 --- a/server/src/test/java/org/opensearch/index/SearchSlowLogTests.java +++ b/server/src/test/java/org/opensearch/index/SearchSlowLogTests.java @@ -84,9 +84,9 @@ public static void init() throws IllegalAccessException { @AfterClass public static void cleanup() { - appender.stop(); Loggers.removeAppender(queryLog, appender); Loggers.removeAppender(fetchLog, appender); + appender.stop(); } @Override diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java new file mode 100644 index 0000000000000..3fcf6116b11a2 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -0,0 +1,59 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; +import org.opensearch.indices.replication.common.ReplicationType; + +public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelReplicationTestCase { + + private static final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + + public void testIgnoreShardIdle() throws Exception { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + final int numDocs = shards.indexDocs(randomInt(10)); + primary.refresh("test"); + replicateSegments(primary, shards.getReplicas()); + shards.assertAllEqual(numDocs); + + primary.scheduledRefresh(); + replica.scheduledRefresh(); + + primary.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); + replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); + + // Update the search_idle setting, this will put both shards into search idle. + Settings updatedSettings = Settings.builder() + .put(settings) + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO) + .build(); + primary.indexSettings().getScopedSettings().applySettings(updatedSettings); + replica.indexSettings().getScopedSettings().applySettings(updatedSettings); + + primary.scheduledRefresh(); + replica.scheduledRefresh(); + + // Shards without segrep will register a new RefreshListener on the engine and return true when registered, + // assert with segrep enabled that awaitShardSearchActive does not register a listener. + primary.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); + replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); + } + } +} diff --git a/server/src/test/java/org/opensearch/tasks/consumer/SearchShardTaskDetailsLogMessageTests.java b/server/src/test/java/org/opensearch/tasks/consumer/SearchShardTaskDetailsLogMessageTests.java new file mode 100644 index 0000000000000..641fdef4891bd --- /dev/null +++ b/server/src/test/java/org/opensearch/tasks/consumer/SearchShardTaskDetailsLogMessageTests.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks.consumer; + +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.tasks.ResourceStats; +import org.opensearch.tasks.ResourceStatsType; +import org.opensearch.tasks.ResourceUsageMetric; +import org.opensearch.tasks.Task; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + +import java.util.Collections; + +import static org.hamcrest.Matchers.equalTo; + +public class SearchShardTaskDetailsLogMessageTests extends OpenSearchSingleNodeTestCase { + public void testTaskDetailsLogHasJsonFields() { + SearchShardTask task = new SearchShardTask( + 0, + "n/a", + "n/a", + "test", + null, + Collections.singletonMap(Task.X_OPAQUE_ID, "my_id"), + () -> "test_metadata" + ); + SearchShardTaskDetailsLogMessage p = new SearchShardTaskDetailsLogMessage(task); + + assertThat(p.getValueFor("taskId"), equalTo("0")); + assertThat(p.getValueFor("type"), equalTo("n/a")); + assertThat(p.getValueFor("action"), equalTo("n/a")); + assertThat(p.getValueFor("description"), equalTo("test")); + assertThat(p.getValueFor("parentTaskId"), equalTo(null)); + // when no resource information present + assertThat(p.getValueFor("resource_stats"), equalTo("{}")); + assertThat(p.getValueFor("metadata"), equalTo("test_metadata")); + + task.startThreadResourceTracking( + 0, + ResourceStatsType.WORKER_STATS, + new ResourceUsageMetric(ResourceStats.MEMORY, 0L), + new ResourceUsageMetric(ResourceStats.CPU, 0L) + ); + task.updateThreadResourceStats( + 0, + ResourceStatsType.WORKER_STATS, + new ResourceUsageMetric(ResourceStats.MEMORY, 100), + new ResourceUsageMetric(ResourceStats.CPU, 100) + ); + assertThat( + p.getValueFor("resource_stats"), + equalTo("{0=[{cpu_time_in_nanos=100, memory_in_bytes=100}, stats_type=worker_stats, is_active=true, threadId=0]}") + ); + } +} diff --git a/server/src/test/java/org/opensearch/tasks/consumer/TopNSearchTasksLoggerTests.java b/server/src/test/java/org/opensearch/tasks/consumer/TopNSearchTasksLoggerTests.java new file mode 100644 index 0000000000000..a8fd3623ef09d --- /dev/null +++ b/server/src/test/java/org/opensearch/tasks/consumer/TopNSearchTasksLoggerTests.java @@ -0,0 +1,105 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks.consumer; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LogEvent; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.common.logging.Loggers; +import org.opensearch.common.logging.MockAppender; +import org.opensearch.common.settings.Settings; +import org.opensearch.tasks.ResourceStats; +import org.opensearch.tasks.ResourceStatsType; +import org.opensearch.tasks.ResourceUsageMetric; +import org.opensearch.tasks.Task; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + +import java.util.Collections; + +import static org.opensearch.tasks.consumer.TopNSearchTasksLogger.LOG_TOP_QUERIES_FREQUENCY; +import static org.opensearch.tasks.consumer.TopNSearchTasksLogger.LOG_TOP_QUERIES_SIZE; + +public class TopNSearchTasksLoggerTests extends OpenSearchSingleNodeTestCase { + static MockAppender appender; + static Logger searchLogger = LogManager.getLogger(TopNSearchTasksLogger.TASK_DETAILS_LOG_PREFIX + ".search"); + + private TopNSearchTasksLogger topNSearchTasksLogger; + + @BeforeClass + public static void init() throws IllegalAccessException { + appender = new MockAppender("trace_appender"); + appender.start(); + Loggers.addAppender(searchLogger, appender); + } + + @AfterClass + public static void cleanup() { + Loggers.removeAppender(searchLogger, appender); + appender.stop(); + } + + public void testLoggerWithTasks() { + final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, "0ms").build(); + topNSearchTasksLogger = new TopNSearchTasksLogger(settings); + generateTasks(5); + LogEvent logEvent = appender.getLastEventAndReset(); + assertNotNull(logEvent); + assertEquals(logEvent.getLevel(), Level.INFO); + assertTrue(logEvent.getMessage().getFormattedMessage().contains("cpu_time_in_nanos=300, memory_in_bytes=300")); + } + + public void testLoggerWithoutTasks() { + final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, "500ms").build(); + topNSearchTasksLogger = new TopNSearchTasksLogger(settings); + + assertNull(appender.getLastEventAndReset()); + } + + public void testLoggerWithHighFrequency() { + // setting the frequency to a really large value and confirming that nothing gets written to log file. + final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, "10m").build(); + topNSearchTasksLogger = new TopNSearchTasksLogger(settings); + generateTasks(5); + generateTasks(2); + + assertNull(appender.getLastEventAndReset()); + } + + // generate search tasks and updates the topN search tasks logger consumer. + public void generateTasks(int numberOfTasks) { + for (int i = 0; i < numberOfTasks; i++) { + Task task = new SearchShardTask( + i, + "n/a", + "n/a", + "test", + null, + Collections.singletonMap(Task.X_OPAQUE_ID, "my_id"), + () -> "n/a" + ); + task.startThreadResourceTracking( + i, + ResourceStatsType.WORKER_STATS, + new ResourceUsageMetric(ResourceStats.MEMORY, 0L), + new ResourceUsageMetric(ResourceStats.CPU, 0L) + ); + task.updateThreadResourceStats( + i, + ResourceStatsType.WORKER_STATS, + new ResourceUsageMetric(ResourceStats.MEMORY, i * 100L), + new ResourceUsageMetric(ResourceStats.CPU, i * 100L) + ); + topNSearchTasksLogger.accept(task); + } + } +} diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index 5e22a7c145a39..249ffcfd0bf6e 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -76,6 +76,7 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentType; @@ -139,6 +140,16 @@ protected ReplicationGroup createGroup(int replicas, Settings settings) throws I return new ReplicationGroup(metadata); } + protected ReplicationGroup createGroup(int replicas, Settings settings, EngineFactory engineFactory) throws IOException { + IndexMetadata metadata = buildIndexMetadata(replicas, settings, indexMapping); + return new ReplicationGroup(metadata) { + @Override + protected EngineFactory getEngineFactory(ShardRouting routing) { + return engineFactory; + } + }; + } + protected IndexMetadata buildIndexMetadata(int replicas) throws IOException { return buildIndexMetadata(replicas, indexMapping); } @@ -191,6 +202,7 @@ protected class ReplicationGroup implements AutoCloseable, Iterable private final AtomicInteger docId = new AtomicInteger(); boolean closed = false; private volatile ReplicationTargets replicationTargets; + private final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); private final PrimaryReplicaSyncer primaryReplicaSyncer = new PrimaryReplicaSyncer( new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index c5ee54450cce2..7dedc572ff19b 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -33,9 +33,15 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.junit.Assert; +import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; import org.opensearch.Version; +import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.PlainActionFuture; @@ -58,6 +64,7 @@ import org.opensearch.common.lucene.uid.Versions; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.internal.io.IOUtils; @@ -82,6 +89,7 @@ import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.breaker.HierarchyCircuitBreakerService; @@ -94,7 +102,14 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.recovery.StartRecoveryRequest; +import org.opensearch.indices.replication.CheckpointInfoResponse; +import org.opensearch.indices.replication.GetSegmentFilesResponse; +import org.opensearch.indices.replication.SegmentReplicationSource; +import org.opensearch.indices.replication.SegmentReplicationTarget; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; +import org.opensearch.indices.replication.common.CopyState; +import org.opensearch.indices.replication.common.ReplicationCollection; import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.repositories.IndexId; @@ -112,6 +127,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -122,6 +138,7 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; +import static org.mockito.Mockito.mock; import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; /** @@ -1133,4 +1150,117 @@ public static Engine.Warmer createTestWarmer(IndexSettings indexSettings) { } }; } + + /** + * Segment Replication specific test method - Replicate segments to a list of replicas from a given primary. + * This test will use a real {@link SegmentReplicationTarget} for each replica with a mock {@link SegmentReplicationSource} that + * writes all segments directly to the target. + */ + public final void replicateSegments(IndexShard primaryShard, List replicaShards) throws IOException, InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(replicaShards.size()); + Store.MetadataSnapshot primaryMetadata; + try (final GatedCloseable segmentInfosSnapshot = primaryShard.getSegmentInfosSnapshot()) { + final SegmentInfos primarySegmentInfos = segmentInfosSnapshot.get(); + primaryMetadata = primaryShard.store().getMetadata(primarySegmentInfos); + } + final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primaryShard.shardId), primaryShard); + + final ReplicationCollection replicationCollection = new ReplicationCollection<>(logger, threadPool); + final SegmentReplicationSource source = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onResponse( + new CheckpointInfoResponse( + copyState.getCheckpoint(), + copyState.getMetadataSnapshot(), + copyState.getInfosBytes(), + copyState.getPendingDeleteFiles() + ) + ); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + try ( + final ReplicationCollection.ReplicationRef replicationRef = replicationCollection.get( + replicationId + ) + ) { + writeFileChunks(replicationRef.get(), primaryShard, filesToFetch.toArray(new StoreFileMetadata[] {})); + } catch (IOException e) { + listener.onFailure(e); + } + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + + for (IndexShard replica : replicaShards) { + final SegmentReplicationTarget target = new SegmentReplicationTarget( + ReplicationCheckpoint.empty(replica.shardId), + replica, + source, + new ReplicationListener() { + @Override + public void onDone(ReplicationState state) { + try (final GatedCloseable snapshot = replica.getSegmentInfosSnapshot()) { + final SegmentInfos replicaInfos = snapshot.get(); + final Store.MetadataSnapshot replicaMetadata = replica.store().getMetadata(replicaInfos); + final Store.RecoveryDiff recoveryDiff = primaryMetadata.recoveryDiff(replicaMetadata); + assertTrue(recoveryDiff.missing.isEmpty()); + assertTrue(recoveryDiff.different.isEmpty()); + assertEquals(recoveryDiff.identical.size(), primaryMetadata.size()); + assertEquals(primaryMetadata.getCommitUserData(), replicaMetadata.getCommitUserData()); + } catch (Exception e) { + throw ExceptionsHelper.convertToRuntime(e); + } + countDownLatch.countDown(); + } + + @Override + public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { + logger.error("Unexpected replication failure in test", e); + Assert.fail("test replication should not fail: " + e); + } + } + ); + replicationCollection.start(target, TimeValue.timeValueMillis(5000)); + target.startReplication(new ActionListener<>() { + @Override + public void onResponse(Void o) { + replicationCollection.markAsDone(target.getId()); + } + + @Override + public void onFailure(Exception e) { + replicationCollection.fail(target.getId(), new OpenSearchException("Segment Replication failed", e), true); + } + }); + } + countDownLatch.await(3, TimeUnit.SECONDS); + } + + private void writeFileChunks(SegmentReplicationTarget target, IndexShard primary, StoreFileMetadata[] files) throws IOException { + for (StoreFileMetadata md : files) { + try (IndexInput in = primary.store().directory().openInput(md.name(), IOContext.READONCE)) { + int pos = 0; + while (pos < md.length()) { + int length = between(1, Math.toIntExact(md.length() - pos)); + byte[] buffer = new byte[length]; + in.readBytes(buffer, 0, length); + target.writeFileChunk(md, pos, new BytesArray(buffer), pos + length == md.length(), 0, mock(ActionListener.class)); + pos += length; + } + } + } + } } diff --git a/test/framework/src/main/java/org/opensearch/test/TestCluster.java b/test/framework/src/main/java/org/opensearch/test/TestCluster.java index c2e90f0369e6c..478b692fb06ef 100644 --- a/test/framework/src/main/java/org/opensearch/test/TestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/TestCluster.java @@ -127,6 +127,7 @@ public void assertAfterTest() throws Exception { /** * Returns the number of data and cluster-manager eligible nodes in the cluster. */ + // TODO: Add abstract keyword after removing the deprecated numDataAndMasterNodes() public int numDataAndClusterManagerNodes() { return numDataAndMasterNodes(); } diff --git a/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java index 9b9baebd540c3..c80b120ad0148 100644 --- a/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java @@ -261,11 +261,16 @@ private static TransportAddress[] extractTransportAddresses(TransportService tra } @Override - protected TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { + protected TaskManager createTaskManager( + Settings settings, + ClusterSettings clusterSettings, + ThreadPool threadPool, + Set taskHeaders + ) { if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) { return new MockTaskManager(settings, threadPool, taskHeaders); } else { - return super.createTaskManager(settings, threadPool, taskHeaders); + return super.createTaskManager(settings, clusterSettings, threadPool, taskHeaders); } } @@ -530,7 +535,6 @@ public void clearCallback() { /** * Adds a new handling behavior that is used when the defined request is received. - * */ public void addRequestHandlingBehavior( String actionName,