diff --git a/CHANGELOG.md b/CHANGELOG.md index e8ed344a85ab1..b46351e68e89c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Recommissioning of zone. REST layer support. ([#4624](https://github.com/opensearch-project/OpenSearch/pull/4604)) - Added in-flight cancellation of SearchShardTask based on resource consumption ([#4565](https://github.com/opensearch-project/OpenSearch/pull/4565)) - Apply reproducible builds configuration for OpenSearch plugins through gradle plugin ([#4746](https://github.com/opensearch-project/OpenSearch/pull/4746)) +- Add groupId value propagation tests for ZIP publication task ([#4772](https://github.com/opensearch-project/OpenSearch/pull/4772)) - Add support for GeoJson Point type in GeoPoint field ([#4597](https://github.com/opensearch-project/OpenSearch/pull/4597)) ### Dependencies @@ -42,7 +43,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Bumps `azure-storage-common` from 12.18.0 to 12.18.1 - Bumps `forbiddenapis` from 3.3 to 3.4 - Bumps `gson` from 2.9.0 to 2.9.1 -- Bumps `protobuf-java` from 3.21.2 to 3.21.7 +- Bumps `protobuf-java` from 3.21.2 to 3.21.7k - Bumps `azure-core` from 1.31.0 to 1.33.0 - Bumps `avro` from 1.11.0 to 1.11.1 - Bumps `woodstox-core` from 6.3.0 to 6.3.1 @@ -60,6 +61,9 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Bumps `jna` from 5.11.0 to 5.12.1 ([#4656](https://github.com/opensearch-project/OpenSearch/pull/4656)) - Update Jackson Databind to 2.13.4.2 (addressing CVE-2022-42003) ([#4779](https://github.com/opensearch-project/OpenSearch/pull/4779)) - Bumps `tika` from 2.4.0 to 2.5.0 ([#4791](https://github.com/opensearch-project/OpenSearch/pull/4791)) +- Exclude jettison version brought in with hadoop-minicluster. ([#4787](https://github.com/opensearch-project/OpenSearch/pull/4787)) +- Bump protobuf-java to 3.21.7 in repository-gcs and repository-hdfs ([#]()) + ### Changed - Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308)) - Use RemoteSegmentStoreDirectory instead of RemoteDirectory ([#4240](https://github.com/opensearch-project/OpenSearch/pull/4240)) @@ -87,6 +91,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Fix weighted routing metadata deserialization error on process restart ([#4691](https://github.com/opensearch-project/OpenSearch/pull/4691)) - Refactor Base Action class javadocs to OpenSearch.API ([#4732](https://github.com/opensearch-project/OpenSearch/pull/4732)) - Migrate client transports to Apache HttpClient / Core 5.x ([#4459](https://github.com/opensearch-project/OpenSearch/pull/4459)) +- Refactored BalancedAllocator.Balancer to LocalShardsBalancer ([#4761](https://github.com/opensearch-project/OpenSearch/pull/4761)) ### Deprecated ### Removed - Remove deprecated code to add node name into log pattern of log4j property file ([#4568](https://github.com/opensearch-project/OpenSearch/pull/4568)) @@ -136,8 +141,13 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Fix new race condition in DecommissionControllerTests ([4688](https://github.com/opensearch-project/OpenSearch/pull/4688)) - Fix SearchStats (de)serialization (caused by https://github.com/opensearch-project/OpenSearch/pull/4616) ([#4697](https://github.com/opensearch-project/OpenSearch/pull/4697)) - Fixing Gradle warnings associated with publishPluginZipPublicationToXxx tasks ([#4696](https://github.com/opensearch-project/OpenSearch/pull/4696)) +- [BUG]: Remove redundant field from GetDecommissionStateResponse ([#4751](https://github.com/opensearch-project/OpenSearch/pull/4751)) - Fixed randomly failing test ([4774](https://github.com/opensearch-project/OpenSearch/pull/4774)) - Update version check after backport ([4786](https://github.com/opensearch-project/OpenSearch/pull/4786)) +- Fix decommission status update to non leader nodes ([4800](https://github.com/opensearch-project/OpenSearch/pull/4800)) +- Fix recovery path for searchable snapshots ([4813](https://github.com/opensearch-project/OpenSearch/pull/4813)) +- Fix bug in AwarenessAttributeDecommissionIT([4822](https://github.com/opensearch-project/OpenSearch/pull/4822)) + ### Security - CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341)) @@ -158,6 +168,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - PR reference to checkout code for changelog verifier ([#4296](https://github.com/opensearch-project/OpenSearch/pull/4296)) - Commit workflow for dependabot changelog helper ([#4331](https://github.com/opensearch-project/OpenSearch/pull/4331)) - Better plural stemmer than minimal_english ([#4738](https://github.com/opensearch-project/OpenSearch/pull/4738)) +- Disable merge on refresh in DiskThresholdDeciderIT ([#4828](https://github.com/opensearch-project/OpenSearch/pull/4828)) ### Security [Unreleased]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...HEAD diff --git a/buildSrc/src/test/java/org/opensearch/gradle/pluginzip/PublishTests.java b/buildSrc/src/test/java/org/opensearch/gradle/pluginzip/PublishTests.java index 2ca0e507acb44..148a836f32b41 100644 --- a/buildSrc/src/test/java/org/opensearch/gradle/pluginzip/PublishTests.java +++ b/buildSrc/src/test/java/org/opensearch/gradle/pluginzip/PublishTests.java @@ -271,6 +271,76 @@ public void useDefaultValues() throws IOException, URISyntaxException, XmlPullPa assertEquals(model.getUrl(), "https://github.com/doe/sample-plugin"); } + /** + * If the `group` is defined in gradle's allprojects section then it does not have to defined in publications. + */ + @Test + public void allProjectsGroup() throws IOException, URISyntaxException, XmlPullParserException { + GradleRunner runner = prepareGradleRunnerFromTemplate("allProjectsGroup.gradle", "build", ZIP_PUBLISH_TASK); + BuildResult result = runner.build(); + + /** Check if build and {@value ZIP_PUBLISH_TASK} tasks have run well */ + assertEquals(SUCCESS, result.task(":" + "build").getOutcome()); + assertEquals(SUCCESS, result.task(":" + ZIP_PUBLISH_TASK).getOutcome()); + + // Parse the maven file and validate default values + MavenXpp3Reader reader = new MavenXpp3Reader(); + Model model = reader.read( + new FileReader( + new File( + projectDir.getRoot(), + String.join( + File.separator, + "build", + "local-staging-repo", + "org", + "opensearch", + PROJECT_NAME, + "2.0.0.0", + PROJECT_NAME + "-2.0.0.0.pom" + ) + ) + ) + ); + assertEquals(model.getVersion(), "2.0.0.0"); + assertEquals(model.getGroupId(), "org.opensearch"); + } + + /** + * The groupId value can be defined on several levels. This tests that the most internal level outweighs other levels. + */ + @Test + public void groupPriorityLevel() throws IOException, URISyntaxException, XmlPullParserException { + GradleRunner runner = prepareGradleRunnerFromTemplate("groupPriorityLevel.gradle", "build", ZIP_PUBLISH_TASK); + BuildResult result = runner.build(); + + /** Check if build and {@value ZIP_PUBLISH_TASK} tasks have run well */ + assertEquals(SUCCESS, result.task(":" + "build").getOutcome()); + assertEquals(SUCCESS, result.task(":" + ZIP_PUBLISH_TASK).getOutcome()); + + // Parse the maven file and validate default values + MavenXpp3Reader reader = new MavenXpp3Reader(); + Model model = reader.read( + new FileReader( + new File( + projectDir.getRoot(), + String.join( + File.separator, + "build", + "local-staging-repo", + "level", + "3", + PROJECT_NAME, + "2.0.0.0", + PROJECT_NAME + "-2.0.0.0.pom" + ) + ) + ) + ); + assertEquals(model.getVersion(), "2.0.0.0"); + assertEquals(model.getGroupId(), "level.3"); + } + /** * In this case the Publication entity is completely missing but still the POM file is generated using the default * values including the groupId and version values obtained from the Gradle project object. diff --git a/buildSrc/src/test/resources/pluginzip/allProjectsGroup.gradle b/buildSrc/src/test/resources/pluginzip/allProjectsGroup.gradle new file mode 100644 index 0000000000000..80638107c86e1 --- /dev/null +++ b/buildSrc/src/test/resources/pluginzip/allProjectsGroup.gradle @@ -0,0 +1,28 @@ +plugins { + id 'java-gradle-plugin' + id 'opensearch.pluginzip' +} + +version='2.0.0.0' + +// A bundlePlugin task mockup +tasks.register('bundlePlugin', Zip.class) { + archiveFileName = "sample-plugin-${version}.zip" + destinationDirectory = layout.buildDirectory.dir('distributions') + from layout.projectDirectory.file('sample-plugin-source.txt') +} + +allprojects { + group = 'org.opensearch' +} + +publishing { + publications { + pluginZip(MavenPublication) { publication -> + pom { + name = "sample-plugin" + description = "pluginDescription" + } + } + } +} diff --git a/buildSrc/src/test/resources/pluginzip/groupPriorityLevel.gradle b/buildSrc/src/test/resources/pluginzip/groupPriorityLevel.gradle new file mode 100644 index 0000000000000..4da02c9f191d8 --- /dev/null +++ b/buildSrc/src/test/resources/pluginzip/groupPriorityLevel.gradle @@ -0,0 +1,30 @@ +plugins { + id 'java-gradle-plugin' + id 'opensearch.pluginzip' +} + +version='2.0.0.0' + +// A bundlePlugin task mockup +tasks.register('bundlePlugin', Zip.class) { + archiveFileName = "sample-plugin-${version}.zip" + destinationDirectory = layout.buildDirectory.dir('distributions') + from layout.projectDirectory.file('sample-plugin-source.txt') +} + +allprojects { + group = 'level.1' +} + +publishing { + publications { + pluginZip(MavenPublication) { publication -> + groupId = "level.2" + pom { + name = "sample-plugin" + description = "pluginDescription" + groupId = "level.3" + } + } + } +} diff --git a/buildSrc/version.properties b/buildSrc/version.properties index 08784c82a4cc4..db0e5b142f7de 100644 --- a/buildSrc/version.properties +++ b/buildSrc/version.properties @@ -17,6 +17,7 @@ supercsv = 2.4.0 log4j = 2.17.1 slf4j = 1.7.36 asm = 9.3 +jettison = 1.5.1 # when updating the JNA version, also update the version in buildSrc/build.gradle jna = 5.12.1 diff --git a/plugins/discovery-azure-classic/build.gradle b/plugins/discovery-azure-classic/build.gradle index 8ca9491f834a6..c88d19f0e2806 100644 --- a/plugins/discovery-azure-classic/build.gradle +++ b/plugins/discovery-azure-classic/build.gradle @@ -59,7 +59,7 @@ dependencies { api "com.sun.jersey:jersey-client:${versions.jersey}" api "com.sun.jersey:jersey-core:${versions.jersey}" api "com.sun.jersey:jersey-json:${versions.jersey}" - api 'org.codehaus.jettison:jettison:1.5.1' + api "org.codehaus.jettison:jettison:${versions.jettison}" api 'com.sun.xml.bind:jaxb-impl:2.2.3-1' // HACK: javax.xml.bind was removed from default modules in java 9, so we pull the api in here, diff --git a/plugins/repository-gcs/build.gradle b/plugins/repository-gcs/build.gradle index 097e96fcd8fdc..05e879547a4b0 100644 --- a/plugins/repository-gcs/build.gradle +++ b/plugins/repository-gcs/build.gradle @@ -66,7 +66,7 @@ dependencies { api 'com.google.api:gax:2.17.0' api 'org.threeten:threetenbp:1.4.4' api 'com.google.protobuf:protobuf-java-util:3.20.0' - api 'com.google.protobuf:protobuf-java:3.19.3' + api 'com.google.protobuf:protobuf-java:3.21.7' api 'com.google.code.gson:gson:2.9.0' api 'com.google.api.grpc:proto-google-common-protos:2.8.0' api 'com.google.api.grpc:proto-google-iam-v1:0.12.0' diff --git a/plugins/repository-gcs/licenses/protobuf-java-3.19.3.jar.sha1 b/plugins/repository-gcs/licenses/protobuf-java-3.19.3.jar.sha1 deleted file mode 100644 index 655ecd1f1c1c9..0000000000000 --- a/plugins/repository-gcs/licenses/protobuf-java-3.19.3.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -4b57f1b1b9e281231c3fcfc039ce3021e29ff570 \ No newline at end of file diff --git a/plugins/repository-gcs/licenses/protobuf-java-3.21.7.jar.sha1 b/plugins/repository-gcs/licenses/protobuf-java-3.21.7.jar.sha1 new file mode 100644 index 0000000000000..faa673a23ef41 --- /dev/null +++ b/plugins/repository-gcs/licenses/protobuf-java-3.21.7.jar.sha1 @@ -0,0 +1 @@ +96cfc7147192f1de72c3d7d06972155ffb7d180c \ No newline at end of file diff --git a/plugins/repository-hdfs/build.gradle b/plugins/repository-hdfs/build.gradle index 792bdc6bacd4a..07dc8b8547b80 100644 --- a/plugins/repository-hdfs/build.gradle +++ b/plugins/repository-hdfs/build.gradle @@ -69,7 +69,7 @@ dependencies { api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}" api 'com.google.code.gson:gson:2.9.0' runtimeOnly 'com.google.guava:guava:31.1-jre' - api 'com.google.protobuf:protobuf-java:3.21.4' + api 'com.google.protobuf:protobuf-java:3.21.7' api "commons-logging:commons-logging:${versions.commonslogging}" api 'commons-cli:commons-cli:1.5.0' api "commons-codec:commons-codec:${versions.commonscodec}" diff --git a/plugins/repository-hdfs/licenses/protobuf-java-3.21.4.jar.sha1 b/plugins/repository-hdfs/licenses/protobuf-java-3.21.4.jar.sha1 deleted file mode 100644 index f232c9a449547..0000000000000 --- a/plugins/repository-hdfs/licenses/protobuf-java-3.21.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -9947febd7a6d0695726c78f603a149b7b7c108e0 \ No newline at end of file diff --git a/plugins/repository-hdfs/licenses/protobuf-java-3.21.7.jar.sha1 b/plugins/repository-hdfs/licenses/protobuf-java-3.21.7.jar.sha1 new file mode 100644 index 0000000000000..faa673a23ef41 --- /dev/null +++ b/plugins/repository-hdfs/licenses/protobuf-java-3.21.7.jar.sha1 @@ -0,0 +1 @@ +96cfc7147192f1de72c3d7d06972155ffb7d180c \ No newline at end of file diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.get_decommission_awareness.json b/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.get_decommission_awareness.json index 430f96921fbc2..302dea4ec31a7 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.get_decommission_awareness.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.get_decommission_awareness.json @@ -8,10 +8,16 @@ "url": { "paths": [ { - "path": "/_cluster/decommission/awareness/_status", - "methods": [ + "path":"/_cluster/decommission/awareness/{awareness_attribute_name}/_status", + "methods":[ "GET" - ] + ], + "parts":{ + "awareness_attribute_name":{ + "type":"string", + "description":"Awareness attribute name" + } + } } ] } diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java new file mode 100644 index 0000000000000..a2270d63ba6fa --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java @@ -0,0 +1,165 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.coordination; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.After; +import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateAction; +import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateRequest; +import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateResponse; +import org.opensearch.action.admin.cluster.decommission.awareness.get.GetDecommissionStateAction; +import org.opensearch.action.admin.cluster.decommission.awareness.get.GetDecommissionStateRequest; +import org.opensearch.action.admin.cluster.decommission.awareness.get.GetDecommissionStateResponse; +import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionAction; +import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest; +import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.decommission.DecommissionAttribute; +import org.opensearch.cluster.decommission.DecommissionStatus; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.opensearch.test.NodeRoles.onlyRole; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoTimeout; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class AwarenessAttributeDecommissionIT extends OpenSearchIntegTestCase { + private final Logger logger = LogManager.getLogger(AwarenessAttributeDecommissionIT.class); + + @Override + protected Collection> nodePlugins() { + return Collections.singletonList(MockTransportService.TestPlugin.class); + } + + @After + public void cleanup() throws Exception { + assertNoTimeout(client().admin().cluster().prepareHealth().get()); + } + + public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionException, InterruptedException { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + logger.info("--> start 3 cluster manager nodes on zones 'a' & 'b' & 'c'"); + List clusterManagerNodes = internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "a") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "b") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "c") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build() + ); + + logger.info("--> start 3 data nodes on zones 'a' & 'b' & 'c'"); + List dataNodes = internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "a") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "b") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "c") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build() + ); + + ensureStableCluster(6); + + logger.info("--> starting decommissioning nodes in zone {}", 'c'); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c"); + DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); + DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest).get(); + assertTrue(decommissionResponse.isAcknowledged()); + + // Will wait for all events to complete + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + + // assert that decommission status is successful + GetDecommissionStateResponse response = client().execute( + GetDecommissionStateAction.INSTANCE, + new GetDecommissionStateRequest(decommissionAttribute.attributeName()) + ).get(); + assertEquals(response.getAttributeValue(), decommissionAttribute.attributeValue()); + assertEquals(response.getDecommissionStatus(), DecommissionStatus.SUCCESSFUL); + + ClusterState clusterState = client(clusterManagerNodes.get(0)).admin().cluster().prepareState().execute().actionGet().getState(); + assertEquals(4, clusterState.nodes().getSize()); + + // assert status on nodes that are part of cluster currently + Iterator discoveryNodeIterator = clusterState.nodes().getNodes().valuesIt(); + while (discoveryNodeIterator.hasNext()) { + // assert no node has decommissioned attribute + DiscoveryNode node = discoveryNodeIterator.next(); + assertNotEquals(node.getAttributes().get("zone"), "c"); + + // assert all the nodes has status as SUCCESSFUL + ClusterService localNodeClusterService = internalCluster().getInstance(ClusterService.class, node.getName()); + assertEquals( + localNodeClusterService.state().metadata().decommissionAttributeMetadata().status(), + DecommissionStatus.SUCCESSFUL + ); + } + + // assert status on decommissioned node + // Here we will verify that until it got kicked out, it received appropriate status updates + // decommissioned nodes hence will have status as IN_PROGRESS as it will be kicked out later after this + // and won't receive status update to SUCCESSFUL + String randomDecommissionedNode = randomFrom(clusterManagerNodes.get(2), dataNodes.get(2)); + ClusterService decommissionedNodeClusterService = internalCluster().getInstance(ClusterService.class, randomDecommissionedNode); + assertEquals( + decommissionedNodeClusterService.state().metadata().decommissionAttributeMetadata().status(), + DecommissionStatus.IN_PROGRESS + ); + + // Will wait for all events to complete + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + + // Recommissioning the zone back to gracefully succeed the test once above tests succeeds + DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(clusterManagerNodes.get(0)).execute( + DeleteDecommissionStateAction.INSTANCE, + new DeleteDecommissionStateRequest() + ).get(); + assertTrue(deleteDecommissionStateResponse.isAcknowledged()); + + // will wait for cluster to stabilise with a timeout of 2 min (findPeerInterval for decommissioned nodes) + // as by then all nodes should have joined the cluster + ensureStableCluster(6, TimeValue.timeValueMinutes(2)); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java index 10e809e2fb5dc..955f0f0465d88 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java @@ -217,6 +217,7 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 6) .put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms") + .put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false) .build() ); final long minShardSize = createReasonableSizedShards(indexName); diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java index 96fcf0053c9ab..0ab025bb575cc 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java @@ -49,15 +49,24 @@ protected boolean addMockInternalEngine() { } public void testCreateSearchableSnapshot() throws Exception { + final int numReplicasIndex1 = randomIntBetween(1, 4); + final int numReplicasIndex2 = randomIntBetween(0, 2); + internalCluster().ensureAtLeastNumDataNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1); final Client client = client(); createRepository("test-repo", "fs"); createIndex( "test-idx-1", - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0").put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1").build() + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, Integer.toString(numReplicasIndex1)) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") + .build() ); createIndex( "test-idx-2", - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0").put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1").build() + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, Integer.toString(numReplicasIndex2)) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") + .build() ); ensureGreen(); indexRandomDocs("test-idx-1", 100); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateRequest.java index 90150c71bf3f2..1f301aa2b5273 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateRequest.java @@ -10,11 +10,14 @@ import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest; +import org.opensearch.common.Strings; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import java.io.IOException; +import static org.opensearch.action.ValidateActions.addValidationError; + /** * Get Decommissioned attribute request * @@ -22,19 +25,56 @@ */ public class GetDecommissionStateRequest extends ClusterManagerNodeReadRequest { + private String attributeName; + public GetDecommissionStateRequest() {} + /** + * Constructs a new get decommission state request with given attribute name + * + * @param attributeName name of the attribute + */ + public GetDecommissionStateRequest(String attributeName) { + this.attributeName = attributeName; + } + public GetDecommissionStateRequest(StreamInput in) throws IOException { super(in); + attributeName = in.readString(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); + out.writeString(attributeName); } @Override public ActionRequestValidationException validate() { - return null; + ActionRequestValidationException validationException = null; + if (attributeName == null || Strings.isEmpty(attributeName)) { + validationException = addValidationError("attribute name is missing", validationException); + } + return validationException; + } + + /** + * Sets attribute name + * + * @param attributeName attribute name + * @return this request + */ + public GetDecommissionStateRequest attributeName(String attributeName) { + this.attributeName = attributeName; + return this; + } + + /** + * Returns attribute name + * + * @return attributeName name of attribute + */ + public String attributeName() { + return this.attributeName; } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateRequestBuilder.java index 2b8616d0511cd..e766e9c674ff7 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateRequestBuilder.java @@ -27,4 +27,13 @@ public class GetDecommissionStateRequestBuilder extends ClusterManagerNodeReadOp public GetDecommissionStateRequestBuilder(OpenSearchClient client, GetDecommissionStateAction action) { super(client, action, new GetDecommissionStateRequest()); } + + /** + * @param attributeName name of attribute + * @return current object + */ + public GetDecommissionStateRequestBuilder setAttributeName(String attributeName) { + request.attributeName(attributeName); + return this; + } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateResponse.java index 2034cdb16e40f..ec0bd7cf7e7eb 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateResponse.java @@ -10,7 +10,6 @@ import org.opensearch.OpenSearchParseException; import org.opensearch.action.ActionResponse; -import org.opensearch.cluster.decommission.DecommissionAttribute; import org.opensearch.cluster.decommission.DecommissionStatus; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -31,49 +30,40 @@ */ public class GetDecommissionStateResponse extends ActionResponse implements ToXContentObject { - private DecommissionAttribute decommissionedAttribute; + private String attributeValue; private DecommissionStatus status; GetDecommissionStateResponse() { this(null, null); } - GetDecommissionStateResponse(DecommissionAttribute decommissionedAttribute, DecommissionStatus status) { - this.decommissionedAttribute = decommissionedAttribute; + GetDecommissionStateResponse(String attributeValue, DecommissionStatus status) { + this.attributeValue = attributeValue; this.status = status; } GetDecommissionStateResponse(StreamInput in) throws IOException { // read decommissioned attribute and status only if it is present if (in.readBoolean()) { - this.decommissionedAttribute = new DecommissionAttribute(in); - } - if (in.readBoolean()) { + this.attributeValue = in.readString(); this.status = DecommissionStatus.fromString(in.readString()); } } @Override public void writeTo(StreamOutput out) throws IOException { - // if decommissioned attribute is null, mark absence of decommissioned attribute - if (decommissionedAttribute == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - decommissionedAttribute.writeTo(out); - } - - // if status is null, mark absence of status - if (status == null) { + // if decommissioned attribute value is null or status is null then mark its absence + if (attributeValue == null || status == null) { out.writeBoolean(false); } else { out.writeBoolean(true); + out.writeString(attributeValue); out.writeString(status.status()); } } - public DecommissionAttribute getDecommissionedAttribute() { - return decommissionedAttribute; + public String getAttributeValue() { + return attributeValue; } public DecommissionStatus getDecommissionStatus() { @@ -83,13 +73,8 @@ public DecommissionStatus getDecommissionStatus() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.startObject("awareness"); - if (decommissionedAttribute != null) { - builder.field(decommissionedAttribute.attributeName(), decommissionedAttribute.attributeValue()); - } - builder.endObject(); - if (status != null) { - builder.field("status", status); + if (attributeValue != null && status != null) { + builder.field(attributeValue, status); } builder.endObject(); return builder; @@ -97,58 +82,25 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public static GetDecommissionStateResponse fromXContent(XContentParser parser) throws IOException { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); - String attributeType = "awareness"; XContentParser.Token token; - DecommissionAttribute decommissionAttribute = null; + String attributeValue = null; DecommissionStatus status = null; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { - String currentFieldName = parser.currentName(); - if (attributeType.equals(currentFieldName)) { - if (parser.nextToken() != XContentParser.Token.START_OBJECT) { - throw new OpenSearchParseException( - "failed to parse decommission attribute type [{}], expected object", - attributeType - ); - } - token = parser.nextToken(); - if (token != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - String fieldName = parser.currentName(); - String value; - token = parser.nextToken(); - if (token == XContentParser.Token.VALUE_STRING) { - value = parser.text(); - } else { - throw new OpenSearchParseException( - "failed to parse attribute [{}], expected string for attribute value", - fieldName - ); - } - decommissionAttribute = new DecommissionAttribute(fieldName, value); - parser.nextToken(); - } else { - throw new OpenSearchParseException("failed to parse attribute type [{}], unexpected type", attributeType); - } - } else { - throw new OpenSearchParseException("failed to parse attribute type [{}]", attributeType); - } - } else if ("status".equals(currentFieldName)) { - if (parser.nextToken() != XContentParser.Token.VALUE_STRING) { - throw new OpenSearchParseException( - "failed to parse status of decommissioning, expected string but found unknown type" - ); - } - status = DecommissionStatus.fromString(parser.text().toLowerCase(Locale.ROOT)); - } else { - throw new OpenSearchParseException( - "unknown field found [{}], failed to parse the decommission attribute", - currentFieldName - ); + attributeValue = parser.currentName(); + if (parser.nextToken() != XContentParser.Token.VALUE_STRING) { + throw new OpenSearchParseException("failed to parse status of decommissioning, expected string but found unknown type"); } + status = DecommissionStatus.fromString(parser.text().toLowerCase(Locale.ROOT)); + } else { + throw new OpenSearchParseException( + "failed to parse decommission state, expected [{}] but found [{}]", + XContentParser.Token.FIELD_NAME, + token + ); } } - return new GetDecommissionStateResponse(decommissionAttribute, status); + return new GetDecommissionStateResponse(attributeValue, status); } @Override @@ -156,11 +108,14 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; GetDecommissionStateResponse that = (GetDecommissionStateResponse) o; - return decommissionedAttribute.equals(that.decommissionedAttribute) && status == that.status; + if (!Objects.equals(attributeValue, that.attributeValue)) { + return false; + } + return Objects.equals(status, that.status); } @Override public int hashCode() { - return Objects.hash(decommissionedAttribute, status); + return Objects.hash(attributeValue, status); } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/TransportGetDecommissionStateAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/TransportGetDecommissionStateAction.java index 48ed13c6c0aaf..d811ab8cf6948 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/TransportGetDecommissionStateAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/TransportGetDecommissionStateAction.java @@ -69,10 +69,11 @@ protected void clusterManagerOperation( ActionListener listener ) throws Exception { DecommissionAttributeMetadata decommissionAttributeMetadata = state.metadata().decommissionAttributeMetadata(); - if (decommissionAttributeMetadata != null) { + if (decommissionAttributeMetadata != null + && request.attributeName().equals(decommissionAttributeMetadata.decommissionAttribute().attributeName())) { listener.onResponse( new GetDecommissionStateResponse( - decommissionAttributeMetadata.decommissionAttribute(), + decommissionAttributeMetadata.decommissionAttribute().attributeValue(), decommissionAttributeMetadata.status() ) ); diff --git a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java index 77ddb5e17c742..4ab438ec064f1 100644 --- a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java +++ b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java @@ -873,17 +873,17 @@ public interface ClusterAdminClient extends OpenSearchClient { /** * Get Decommissioned attribute */ - ActionFuture getDecommission(GetDecommissionStateRequest request); + ActionFuture getDecommissionState(GetDecommissionStateRequest request); /** * Get Decommissioned attribute */ - void getDecommission(GetDecommissionStateRequest request, ActionListener listener); + void getDecommissionState(GetDecommissionStateRequest request, ActionListener listener); /** * Get Decommissioned attribute */ - GetDecommissionStateRequestBuilder prepareGetDecommission(); + GetDecommissionStateRequestBuilder prepareGetDecommissionState(); /** * Deletes the decommission metadata. diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index b42010d4253d5..828ca5f8083ee 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -1417,17 +1417,17 @@ public DecommissionRequestBuilder prepareDecommission(DecommissionRequest reques } @Override - public ActionFuture getDecommission(GetDecommissionStateRequest request) { + public ActionFuture getDecommissionState(GetDecommissionStateRequest request) { return execute(GetDecommissionStateAction.INSTANCE, request); } @Override - public void getDecommission(GetDecommissionStateRequest request, ActionListener listener) { + public void getDecommissionState(GetDecommissionStateRequest request, ActionListener listener) { execute(GetDecommissionStateAction.INSTANCE, request, listener); } @Override - public GetDecommissionStateRequestBuilder prepareGetDecommission() { + public GetDecommissionStateRequestBuilder prepareGetDecommissionState() { return new GetDecommissionStateRequestBuilder(this, GetDecommissionStateAction.INSTANCE); } diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java index dbb3fea823eb6..d3d508bf36451 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java @@ -79,35 +79,29 @@ public DecommissionStatus status() { /** * Returns instance of the metadata with updated status * @param newStatus status to be updated with - * @return instance with valid status */ // synchronized is strictly speaking not needed (this is called by a single thread), but just to be safe - public synchronized DecommissionAttributeMetadata setUpdatedStatus(DecommissionStatus newStatus) { - // if the current status is the expected status already, we return the same instance - if (newStatus.equals(status)) { - return this; + public synchronized void validateNewStatus(DecommissionStatus newStatus) { + // if the current status is the expected status already or new status is FAILED, we let the check pass + if (newStatus.equals(status) || newStatus.equals(DecommissionStatus.FAILED)) { + return; } // We don't expect that INIT will be new status, as it is registered only when starting the decommission action switch (newStatus) { case IN_PROGRESS: - validateAndSetStatus(DecommissionStatus.INIT, newStatus); + validateStatus(DecommissionStatus.INIT, newStatus); break; case SUCCESSFUL: - validateAndSetStatus(DecommissionStatus.IN_PROGRESS, newStatus); - break; - case FAILED: - // we don't need to validate here and directly update status to FAILED - this.status = newStatus; + validateStatus(DecommissionStatus.IN_PROGRESS, newStatus); break; default: throw new IllegalArgumentException( "illegal decommission status [" + newStatus.status() + "] requested for updating metadata" ); } - return this; } - private void validateAndSetStatus(DecommissionStatus expected, DecommissionStatus next) { + private void validateStatus(DecommissionStatus expected, DecommissionStatus next) { if (status.equals(expected) == false) { assert false : "can't move decommission status to [" + next @@ -120,7 +114,6 @@ private void validateAndSetStatus(DecommissionStatus expected, DecommissionStatu "can't move decommission status to [" + next + "]. current status: [" + status + "] (expected [" + expected + "])" ); } - status = next; } @Override diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java index 7719012f2f3d7..b58d99a9d59db 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java @@ -246,8 +246,12 @@ public ClusterState execute(ClusterState currentState) { decommissionAttributeMetadata.status(), decommissionStatus ); - // setUpdatedStatus can throw IllegalStateException if the sequence of update is not valid - decommissionAttributeMetadata.setUpdatedStatus(decommissionStatus); + // validateNewStatus can throw IllegalStateException if the sequence of update is not valid + decommissionAttributeMetadata.validateNewStatus(decommissionStatus); + decommissionAttributeMetadata = new DecommissionAttributeMetadata( + decommissionAttributeMetadata.decommissionAttribute(), + decommissionStatus + ); return ClusterState.builder(currentState) .metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) .build(); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java index 8c2c85ce107a6..3d9847ca35931 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java @@ -6,6 +6,7 @@ package org.opensearch.cluster.routing.allocation; import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.opensearch.cluster.routing.allocation.allocator.ShardsBalancer; import java.util.ArrayList; import java.util.List; @@ -27,11 +28,11 @@ public AllocationConstraints() { } class ConstraintParams { - private BalancedShardsAllocator.Balancer balancer; + private ShardsBalancer balancer; private BalancedShardsAllocator.ModelNode node; private String index; - ConstraintParams(BalancedShardsAllocator.Balancer balancer, BalancedShardsAllocator.ModelNode node, String index) { + ConstraintParams(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) { this.balancer = balancer; this.node = node; this.index = index; @@ -50,7 +51,7 @@ class ConstraintParams { * This weight function is used only in case of unassigned shards to avoid overloading a newly added node. * Weight calculation in other scenarios like shard movement and re-balancing remain unaffected by this function. */ - public long weight(BalancedShardsAllocator.Balancer balancer, BalancedShardsAllocator.ModelNode node, String index) { + public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) { int constraintsBreached = 0; ConstraintParams params = new ConstraintParams(balancer, node, index); for (Predicate predicate : constraintPredicates) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 181910e3ac1c4..42c8f7987bf3d 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -34,47 +34,28 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.IntroSorter; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.ShardRouting; -import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.AllocationConstraints; -import org.opensearch.cluster.routing.allocation.AllocationDecision; import org.opensearch.cluster.routing.allocation.MoveDecision; -import org.opensearch.cluster.routing.allocation.NodeAllocationResult; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.ShardAllocationDecision; -import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; -import org.opensearch.cluster.routing.allocation.decider.Decision; -import org.opensearch.cluster.routing.allocation.decider.Decision.Type; -import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; -import org.opensearch.common.collect.Tuple; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; -import org.opensearch.gateway.PriorityComparator; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.StreamSupport; - -import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING; /** * The {@link BalancedShardsAllocator} re-balances the nodes allocations @@ -160,23 +141,23 @@ public void allocate(RoutingAllocation allocation) { failAllocationOfNewPrimaries(allocation); return; } - final Balancer balancer = new Balancer(logger, allocation, movePrimaryFirst, weightFunction, threshold); - balancer.allocateUnassigned(); - balancer.moveShards(); - balancer.balance(); + final ShardsBalancer localShardsBalancer = new LocalShardsBalancer(logger, allocation, movePrimaryFirst, weightFunction, threshold); + localShardsBalancer.allocateUnassigned(); + localShardsBalancer.moveShards(); + localShardsBalancer.balance(); } @Override public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, final RoutingAllocation allocation) { - Balancer balancer = new Balancer(logger, allocation, movePrimaryFirst, weightFunction, threshold); + ShardsBalancer localShardsBalancer = new LocalShardsBalancer(logger, allocation, movePrimaryFirst, weightFunction, threshold); AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN; MoveDecision moveDecision = MoveDecision.NOT_TAKEN; if (shard.unassigned()) { - allocateUnassignedDecision = balancer.decideAllocateUnassigned(shard); + allocateUnassignedDecision = localShardsBalancer.decideAllocateUnassigned(shard); } else { - moveDecision = balancer.decideMove(shard); + moveDecision = localShardsBalancer.decideMove(shard); if (moveDecision.isDecisionTaken() && moveDecision.canRemain()) { - MoveDecision rebalanceDecision = balancer.decideRebalance(shard); + MoveDecision rebalanceDecision = localShardsBalancer.decideRebalance(shard); moveDecision = rebalanceDecision.withRemainDecision(moveDecision.getCanRemainDecision()); } } @@ -277,923 +258,18 @@ static class WeightFunction { this.constraints = new AllocationConstraints(); } - public float weightWithAllocationConstraints(Balancer balancer, ModelNode node, String index) { + public float weightWithAllocationConstraints(ShardsBalancer balancer, ModelNode node, String index) { float balancerWeight = weight(balancer, node, index); return balancerWeight + constraints.weight(balancer, node, index); } - float weight(Balancer balancer, ModelNode node, String index) { + float weight(ShardsBalancer balancer, ModelNode node, String index) { final float weightShard = node.numShards() - balancer.avgShardsPerNode(); final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index); return theta0 * weightShard + theta1 * weightIndex; } } - /** - * A {@link Balancer} - * - * @opensearch.internal - */ - public static class Balancer { - private final Logger logger; - private final Map nodes; - private final RoutingAllocation allocation; - private final RoutingNodes routingNodes; - private final boolean movePrimaryFirst; - private final WeightFunction weight; - - private final float threshold; - private final Metadata metadata; - private final float avgShardsPerNode; - private final NodeSorter sorter; - private final Set inEligibleTargetNode; - - public Balancer(Logger logger, RoutingAllocation allocation, boolean movePrimaryFirst, WeightFunction weight, float threshold) { - this.logger = logger; - this.allocation = allocation; - this.movePrimaryFirst = movePrimaryFirst; - this.weight = weight; - this.threshold = threshold; - this.routingNodes = allocation.routingNodes(); - this.metadata = allocation.metadata(); - avgShardsPerNode = ((float) metadata.getTotalNumberOfShards()) / routingNodes.size(); - nodes = Collections.unmodifiableMap(buildModelFromAssigned()); - sorter = newNodeSorter(); - inEligibleTargetNode = new HashSet<>(); - } - - /** - * Returns an array view on the nodes in the balancer. Nodes should not be removed from this list. - */ - private ModelNode[] nodesArray() { - return nodes.values().toArray(new ModelNode[nodes.size()]); - } - - /** - * Returns the average of shards per node for the given index - */ - public float avgShardsPerNode(String index) { - return ((float) metadata.index(index).getTotalNumberOfShards()) / nodes.size(); - } - - /** - * Returns the global average of shards per node - */ - public float avgShardsPerNode() { - return avgShardsPerNode; - } - - /** - * Returns a new {@link NodeSorter} that sorts the nodes based on their - * current weight with respect to the index passed to the sorter. The - * returned sorter is not sorted. Use {@link NodeSorter#reset(String)} - * to sort based on an index. - */ - private NodeSorter newNodeSorter() { - return new NodeSorter(nodesArray(), weight, this); - } - - /** - * The absolute value difference between two weights. - */ - private static float absDelta(float lower, float higher) { - assert higher >= lower : higher + " lt " + lower + " but was expected to be gte"; - return Math.abs(higher - lower); - } - - /** - * Returns {@code true} iff the weight delta between two nodes is under a defined threshold. - * See {@link #THRESHOLD_SETTING} for defining the threshold. - */ - private static boolean lessThan(float delta, float threshold) { - /* deltas close to the threshold are "rounded" to the threshold manually - to prevent floating point problems if the delta is very close to the - threshold ie. 1.000000002 which can trigger unnecessary balance actions*/ - return delta <= (threshold + 0.001f); - } - - /** - * Balances the nodes on the cluster model according to the weight function. - * The actual balancing is delegated to {@link #balanceByWeights()} - */ - private void balance() { - if (logger.isTraceEnabled()) { - logger.trace("Start balancing cluster"); - } - if (allocation.hasPendingAsyncFetch()) { - /* - * see https://github.com/elastic/elasticsearch/issues/14387 - * if we allow rebalance operations while we are still fetching shard store data - * we might end up with unnecessary rebalance operations which can be super confusion/frustrating - * since once the fetches come back we might just move all the shards back again. - * Therefore we only do a rebalance if we have fetched all information. - */ - logger.debug("skipping rebalance due to in-flight shard/store fetches"); - return; - } - if (allocation.deciders().canRebalance(allocation).type() != Type.YES) { - logger.trace("skipping rebalance as it is disabled"); - return; - } - if (nodes.size() < 2) { /* skip if we only have one node */ - logger.trace("skipping rebalance as single node only"); - return; - } - balanceByWeights(); - } - - /** - * Makes a decision about moving a single shard to a different node to form a more - * optimally balanced cluster. This method is invoked from the cluster allocation - * explain API only. - */ - private MoveDecision decideRebalance(final ShardRouting shard) { - if (shard.started() == false) { - // we can only rebalance started shards - return MoveDecision.NOT_TAKEN; - } - - Decision canRebalance = allocation.deciders().canRebalance(shard, allocation); - - sorter.reset(shard.getIndexName()); - ModelNode[] modelNodes = sorter.modelNodes; - final String currentNodeId = shard.currentNodeId(); - // find currently assigned node - ModelNode currentNode = null; - for (ModelNode node : modelNodes) { - if (node.getNodeId().equals(currentNodeId)) { - currentNode = node; - break; - } - } - assert currentNode != null : "currently assigned node could not be found"; - - // balance the shard, if a better node can be found - final String idxName = shard.getIndexName(); - final float currentWeight = weight.weight(this, currentNode, idxName); - final AllocationDeciders deciders = allocation.deciders(); - Type rebalanceDecisionType = Type.NO; - ModelNode assignedNode = null; - List> betterBalanceNodes = new ArrayList<>(); - List> sameBalanceNodes = new ArrayList<>(); - List> worseBalanceNodes = new ArrayList<>(); - for (ModelNode node : modelNodes) { - if (node == currentNode) { - continue; // skip over node we're currently allocated to - } - final Decision canAllocate = deciders.canAllocate(shard, node.getRoutingNode(), allocation); - // the current weight of the node in the cluster, as computed by the weight function; - // this is a comparison of the number of shards on this node to the number of shards - // that should be on each node on average (both taking the cluster as a whole into account - // as well as shards per index) - final float nodeWeight = weight.weight(this, node, idxName); - // if the node we are examining has a worse (higher) weight than the node the shard is - // assigned to, then there is no way moving the shard to the node with the worse weight - // can make the balance of the cluster better, so we check for that here - final boolean betterWeightThanCurrent = nodeWeight <= currentWeight; - boolean rebalanceConditionsMet = false; - if (betterWeightThanCurrent) { - // get the delta between the weights of the node we are checking and the node that holds the shard - float currentDelta = absDelta(nodeWeight, currentWeight); - // checks if the weight delta is above a certain threshold; if it is not above a certain threshold, - // then even though the node we are examining has a better weight and may make the cluster balance - // more even, it doesn't make sense to execute the heavyweight operation of relocating a shard unless - // the gains make it worth it, as defined by the threshold - boolean deltaAboveThreshold = lessThan(currentDelta, threshold) == false; - // calculate the delta of the weights of the two nodes if we were to add the shard to the - // node in question and move it away from the node that currently holds it. - // hence we add 2.0f to the weight delta - float proposedDelta = 2.0f + nodeWeight - currentWeight; - boolean betterWeightWithShardAdded = proposedDelta < currentDelta; - - rebalanceConditionsMet = deltaAboveThreshold && betterWeightWithShardAdded; - // if the simulated weight delta with the shard moved away is better than the weight delta - // with the shard remaining on the current node, and we are allowed to allocate to the - // node in question, then allow the rebalance - if (rebalanceConditionsMet && canAllocate.type().higherThan(rebalanceDecisionType)) { - // rebalance to the node, only will get overwritten if the decision here is to - // THROTTLE and we get a decision with YES on another node - rebalanceDecisionType = canAllocate.type(); - assignedNode = node; - } - } - Tuple nodeResult = Tuple.tuple(node, canAllocate); - if (rebalanceConditionsMet) { - betterBalanceNodes.add(nodeResult); - } else if (betterWeightThanCurrent) { - sameBalanceNodes.add(nodeResult); - } else { - worseBalanceNodes.add(nodeResult); - } - } - - int weightRanking = 0; - List nodeDecisions = new ArrayList<>(modelNodes.length - 1); - for (Tuple result : betterBalanceNodes) { - nodeDecisions.add( - new NodeAllocationResult( - result.v1().routingNode.node(), - AllocationDecision.fromDecisionType(result.v2().type()), - result.v2(), - ++weightRanking - ) - ); - } - int currentNodeWeightRanking = ++weightRanking; - for (Tuple result : sameBalanceNodes) { - AllocationDecision nodeDecision = result.v2().type() == Type.NO ? AllocationDecision.NO : AllocationDecision.WORSE_BALANCE; - nodeDecisions.add( - new NodeAllocationResult(result.v1().routingNode.node(), nodeDecision, result.v2(), currentNodeWeightRanking) - ); - } - for (Tuple result : worseBalanceNodes) { - AllocationDecision nodeDecision = result.v2().type() == Type.NO ? AllocationDecision.NO : AllocationDecision.WORSE_BALANCE; - nodeDecisions.add(new NodeAllocationResult(result.v1().routingNode.node(), nodeDecision, result.v2(), ++weightRanking)); - } - - if (canRebalance.type() != Type.YES || allocation.hasPendingAsyncFetch()) { - AllocationDecision allocationDecision = allocation.hasPendingAsyncFetch() - ? AllocationDecision.AWAITING_INFO - : AllocationDecision.fromDecisionType(canRebalance.type()); - return MoveDecision.cannotRebalance(canRebalance, allocationDecision, currentNodeWeightRanking, nodeDecisions); - } else { - return MoveDecision.rebalance( - canRebalance, - AllocationDecision.fromDecisionType(rebalanceDecisionType), - assignedNode != null ? assignedNode.routingNode.node() : null, - currentNodeWeightRanking, - nodeDecisions - ); - } - } - - /** - * Balances the nodes on the cluster model according to the weight - * function. The configured threshold is the minimum delta between the - * weight of the maximum node and the minimum node according to the - * {@link WeightFunction}. This weight is calculated per index to - * distribute shards evenly per index. The balancer tries to relocate - * shards only if the delta exceeds the threshold. In the default case - * the threshold is set to {@code 1.0} to enforce gaining relocation - * only, or in other words relocations that move the weight delta closer - * to {@code 0.0} - */ - private void balanceByWeights() { - final AllocationDeciders deciders = allocation.deciders(); - final ModelNode[] modelNodes = sorter.modelNodes; - final float[] weights = sorter.weights; - for (String index : buildWeightOrderedIndices()) { - IndexMetadata indexMetadata = metadata.index(index); - - // find nodes that have a shard of this index or where shards of this index are allowed to be allocated to, - // move these nodes to the front of modelNodes so that we can only balance based on these nodes - int relevantNodes = 0; - for (int i = 0; i < modelNodes.length; i++) { - ModelNode modelNode = modelNodes[i]; - if (modelNode.getIndex(index) != null - || deciders.canAllocate(indexMetadata, modelNode.getRoutingNode(), allocation).type() != Type.NO) { - // swap nodes at position i and relevantNodes - modelNodes[i] = modelNodes[relevantNodes]; - modelNodes[relevantNodes] = modelNode; - relevantNodes++; - } - } - - if (relevantNodes < 2) { - continue; - } - - sorter.reset(index, 0, relevantNodes); - int lowIdx = 0; - int highIdx = relevantNodes - 1; - while (true) { - final ModelNode minNode = modelNodes[lowIdx]; - final ModelNode maxNode = modelNodes[highIdx]; - advance_range: if (maxNode.numShards(index) > 0) { - final float delta = absDelta(weights[lowIdx], weights[highIdx]); - if (lessThan(delta, threshold)) { - if (lowIdx > 0 - && highIdx - 1 > 0 // is there a chance for a higher delta? - && (absDelta(weights[0], weights[highIdx - 1]) > threshold) // check if we need to break at all - ) { - /* This is a special case if allocations from the "heaviest" to the "lighter" nodes is not possible - * due to some allocation decider restrictions like zone awareness. if one zone has for instance - * less nodes than another zone. so one zone is horribly overloaded from a balanced perspective but we - * can't move to the "lighter" shards since otherwise the zone would go over capacity. - * - * This break jumps straight to the condition below were we start moving from the high index towards - * the low index to shrink the window we are considering for balance from the other direction. - * (check shrinking the window from MAX to MIN) - * See #3580 - */ - break advance_range; - } - if (logger.isTraceEnabled()) { - logger.trace( - "Stop balancing index [{}] min_node [{}] weight: [{}]" + " max_node [{}] weight: [{}] delta: [{}]", - index, - maxNode.getNodeId(), - weights[highIdx], - minNode.getNodeId(), - weights[lowIdx], - delta - ); - } - break; - } - if (logger.isTraceEnabled()) { - logger.trace( - "Balancing from node [{}] weight: [{}] to node [{}] weight: [{}] delta: [{}]", - maxNode.getNodeId(), - weights[highIdx], - minNode.getNodeId(), - weights[lowIdx], - delta - ); - } - if (delta <= 1.0f) { - /* - * prevent relocations that only swap the weights of the two nodes. a relocation must bring us closer to the - * balance if we only achieve the same delta the relocation is useless - * - * NB this comment above was preserved from an earlier version but doesn't obviously describe the code today. We - * already know that lessThan(delta, threshold) == false and threshold defaults to 1.0, so by default we never - * hit this case anyway. - */ - logger.trace( - "Couldn't find shard to relocate from node [{}] to node [{}]", - maxNode.getNodeId(), - minNode.getNodeId() - ); - } else if (tryRelocateShard(minNode, maxNode, index)) { - /* - * TODO we could be a bit smarter here, we don't need to fully sort necessarily - * we could just find the place to insert linearly but the win might be minor - * compared to the added complexity - */ - weights[lowIdx] = sorter.weight(modelNodes[lowIdx]); - weights[highIdx] = sorter.weight(modelNodes[highIdx]); - sorter.sort(0, relevantNodes); - lowIdx = 0; - highIdx = relevantNodes - 1; - continue; - } - } - if (lowIdx < highIdx - 1) { - /* Shrinking the window from MIN to MAX - * we can't move from any shard from the min node lets move on to the next node - * and see if the threshold still holds. We either don't have any shard of this - * index on this node of allocation deciders prevent any relocation.*/ - lowIdx++; - } else if (lowIdx > 0) { - /* Shrinking the window from MAX to MIN - * now we go max to min since obviously we can't move anything to the max node - * lets pick the next highest */ - lowIdx = 0; - highIdx--; - } else { - /* we are done here, we either can't relocate anymore or we are balanced */ - break; - } - } - } - } - - /** - * This builds a initial index ordering where the indices are returned - * in most unbalanced first. We need this in order to prevent over - * allocations on added nodes from one index when the weight parameters - * for global balance overrule the index balance at an intermediate - * state. For example this can happen if we have 3 nodes and 3 indices - * with 3 primary and 1 replica shards. At the first stage all three nodes hold - * 2 shard for each index. Now we add another node and the first index - * is balanced moving three shards from two of the nodes over to the new node since it - * has no shards yet and global balance for the node is way below - * average. To re-balance we need to move shards back eventually likely - * to the nodes we relocated them from. - */ - private String[] buildWeightOrderedIndices() { - final String[] indices = allocation.routingTable().indicesRouting().keys().toArray(String.class); - final float[] deltas = new float[indices.length]; - for (int i = 0; i < deltas.length; i++) { - sorter.reset(indices[i]); - deltas[i] = sorter.delta(); - } - new IntroSorter() { - - float pivotWeight; - - @Override - protected void swap(int i, int j) { - final String tmpIdx = indices[i]; - indices[i] = indices[j]; - indices[j] = tmpIdx; - final float tmpDelta = deltas[i]; - deltas[i] = deltas[j]; - deltas[j] = tmpDelta; - } - - @Override - protected int compare(int i, int j) { - return Float.compare(deltas[j], deltas[i]); - } - - @Override - protected void setPivot(int i) { - pivotWeight = deltas[i]; - } - - @Override - protected int comparePivot(int j) { - return Float.compare(deltas[j], pivotWeight); - } - }.sort(0, deltas.length); - - return indices; - } - - /** - * Checks if target node is ineligible and if so, adds to the list - * of ineligible target nodes - */ - private void checkAndAddInEligibleTargetNode(RoutingNode targetNode) { - Decision nodeLevelAllocationDecision = allocation.deciders().canAllocateAnyShardToNode(targetNode, allocation); - if (nodeLevelAllocationDecision.type() != Decision.Type.YES) { - inEligibleTargetNode.add(targetNode); - } - } - - /** - * Move started shards that can not be allocated to a node anymore - * - * For each shard to be moved this function executes a move operation - * to the minimal eligible node with respect to the - * weight function. If a shard is moved the shard will be set to - * {@link ShardRoutingState#RELOCATING} and a shadow instance of this - * shard is created with an incremented version in the state - * {@link ShardRoutingState#INITIALIZING}. - */ - public void moveShards() { - // Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling - // shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are - // offloading the shards. - - // Trying to eliminate target nodes so that we donot unnecessarily iterate over source nodes - // when no target is eligible - for (ModelNode currentNode : sorter.modelNodes) { - checkAndAddInEligibleTargetNode(currentNode.getRoutingNode()); - } - boolean primariesThrottled = false; - for (Iterator it = allocation.routingNodes().nodeInterleavedShardIterator(movePrimaryFirst); it.hasNext();) { - // Verify if the cluster concurrent recoveries have been reached. - if (allocation.deciders().canMoveAnyShard(allocation).type() != Decision.Type.YES) { - logger.info( - "Cannot move any shard in the cluster due to cluster concurrent recoveries getting breached" - + ". Skipping shard iteration" - ); - return; - } - // Early terminate node interleaved shard iteration when no eligible target nodes are available - if (sorter.modelNodes.length == inEligibleTargetNode.size()) { - logger.info( - "Cannot move any shard in the cluster as there is no node on which shards can be allocated" - + ". Skipping shard iteration" - ); - return; - } - - ShardRouting shardRouting = it.next(); - - // Ensure that replicas don't relocate if primaries are being throttled and primary first is enabled - if (movePrimaryFirst && primariesThrottled && !shardRouting.primary()) { - logger.info( - "Cannot move any replica shard in the cluster as movePrimaryFirst is enabled and primary shards" - + "are being throttled. Skipping shard iteration" - ); - return; - } - - // Verify if the shard is allowed to move if outgoing recovery on the node hosting the primary shard - // is not being throttled. - Decision canMoveAwayDecision = allocation.deciders().canMoveAway(shardRouting, allocation); - if (canMoveAwayDecision.type() != Decision.Type.YES) { - if (logger.isDebugEnabled()) logger.debug("Cannot move away shard [{}] Skipping this shard", shardRouting); - if (shardRouting.primary() && canMoveAwayDecision.type() == Type.THROTTLE) { - primariesThrottled = true; - } - continue; - } - - final MoveDecision moveDecision = decideMove(shardRouting); - if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) { - final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); - final ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId()); - sourceNode.removeShard(shardRouting); - Tuple relocatingShards = routingNodes.relocateShard( - shardRouting, - targetNode.getNodeId(), - allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), - allocation.changes() - ); - targetNode.addShard(relocatingShards.v2()); - if (logger.isTraceEnabled()) { - logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode()); - } - - // Verifying if this node can be considered ineligible for further iterations - if (targetNode != null) { - checkAndAddInEligibleTargetNode(targetNode.getRoutingNode()); - } - } else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) { - logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); - } - } - } - - /** - * Makes a decision on whether to move a started shard to another node. The following rules apply - * to the {@link MoveDecision} return object: - * 1. If the shard is not started, no decision will be taken and {@link MoveDecision#isDecisionTaken()} will return false. - * 2. If the shard is allowed to remain on its current node, no attempt will be made to move the shard and - * {@link MoveDecision#getCanRemainDecision} will have a decision type of YES. All other fields in the object will be null. - * 3. If the shard is not allowed to remain on its current node, then {@link MoveDecision#getAllocationDecision()} will be - * populated with the decision of moving to another node. If {@link MoveDecision#forceMove()} ()} returns {@code true}, then - * {@link MoveDecision#getTargetNode} will return a non-null value, otherwise the assignedNodeId will be null. - * 4. If the method is invoked in explain mode (e.g. from the cluster allocation explain APIs), then - * {@link MoveDecision#getNodeDecisions} will have a non-null value. - */ - public MoveDecision decideMove(final ShardRouting shardRouting) { - if (shardRouting.started() == false) { - // we can only move started shards - return MoveDecision.NOT_TAKEN; - } - - final boolean explain = allocation.debugDecision(); - final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); - assert sourceNode != null && sourceNode.containsShard(shardRouting); - RoutingNode routingNode = sourceNode.getRoutingNode(); - Decision canRemain = allocation.deciders().canRemain(shardRouting, routingNode, allocation); - if (canRemain.type() != Decision.Type.NO) { - return MoveDecision.stay(canRemain); - } - - sorter.reset(shardRouting.getIndexName()); - /* - * the sorter holds the minimum weight node first for the shards index. - * We now walk through the nodes until we find a node to allocate the shard. - * This is not guaranteed to be balanced after this operation we still try best effort to - * allocate on the minimal eligible node. - */ - Type bestDecision = Type.NO; - RoutingNode targetNode = null; - final List nodeExplanationMap = explain ? new ArrayList<>() : null; - int weightRanking = 0; - int targetNodeProcessed = 0; - for (ModelNode currentNode : sorter.modelNodes) { - if (currentNode != sourceNode) { - RoutingNode target = currentNode.getRoutingNode(); - if (!explain && inEligibleTargetNode.contains(target)) continue; - // don't use canRebalance as we want hard filtering rules to apply. See #17698 - if (!explain) { - // If we cannot allocate any shard to node marking it in eligible - Decision nodeLevelAllocationDecision = allocation.deciders().canAllocateAnyShardToNode(target, allocation); - if (nodeLevelAllocationDecision.type() != Decision.Type.YES) { - inEligibleTargetNode.add(currentNode.getRoutingNode()); - continue; - } - } - targetNodeProcessed++; - // don't use canRebalance as we want hard filtering rules to apply. See #17698 - Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation); - if (explain) { - nodeExplanationMap.add( - new NodeAllocationResult(currentNode.getRoutingNode().node(), allocationDecision, ++weightRanking) - ); - } - // TODO maybe we can respect throttling here too? - if (allocationDecision.type().higherThan(bestDecision)) { - bestDecision = allocationDecision.type(); - if (bestDecision == Type.YES) { - targetNode = target; - if (explain == false) { - // we are not in explain mode and already have a YES decision on the best weighted node, - // no need to continue iterating - break; - } - } - } - } - } - - return MoveDecision.cannotRemain( - canRemain, - AllocationDecision.fromDecisionType(bestDecision), - targetNode != null ? targetNode.node() : null, - nodeExplanationMap - ); - } - - /** - * Builds the internal model from all shards in the given - * {@link Iterable}. All shards in the {@link Iterable} must be assigned - * to a node. This method will skip shards in the state - * {@link ShardRoutingState#RELOCATING} since each relocating shard has - * a shadow shard in the state {@link ShardRoutingState#INITIALIZING} - * on the target node which we respect during the allocation / balancing - * process. In short, this method recreates the status-quo in the cluster. - */ - private Map buildModelFromAssigned() { - Map nodes = new HashMap<>(); - for (RoutingNode rn : routingNodes) { - ModelNode node = new ModelNode(rn); - nodes.put(rn.nodeId(), node); - for (ShardRouting shard : rn) { - assert rn.nodeId().equals(shard.currentNodeId()); - /* we skip relocating shards here since we expect an initializing shard with the same id coming in */ - if (shard.state() != RELOCATING) { - node.addShard(shard); - if (logger.isTraceEnabled()) { - logger.trace("Assigned shard [{}] to node [{}]", shard, node.getNodeId()); - } - } - } - } - return nodes; - } - - /** - * Allocates all given shards on the minimal eligible node for the shards index - * with respect to the weight function. All given shards must be unassigned. - */ - private void allocateUnassigned() { - RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned(); - assert !nodes.isEmpty(); - if (logger.isTraceEnabled()) { - logger.trace("Start allocating unassigned shards"); - } - if (unassigned.isEmpty()) { - return; - } - - /* - * TODO: We could be smarter here and group the shards by index and then - * use the sorter to save some iterations. - */ - final PriorityComparator secondaryComparator = PriorityComparator.getAllocationComparator(allocation); - final Comparator comparator = (o1, o2) -> { - if (o1.primary() ^ o2.primary()) { - return o1.primary() ? -1 : 1; - } - final int indexCmp; - if ((indexCmp = o1.getIndexName().compareTo(o2.getIndexName())) == 0) { - return o1.getId() - o2.getId(); - } - // this comparator is more expensive than all the others up there - // that's why it's added last even though it could be easier to read - // if we'd apply it earlier. this comparator will only differentiate across - // indices all shards of the same index is treated equally. - final int secondary = secondaryComparator.compare(o1, o2); - return secondary == 0 ? indexCmp : secondary; - }; - /* - * we use 2 arrays and move replicas to the second array once we allocated an identical - * replica in the current iteration to make sure all indices get allocated in the same manner. - * The arrays are sorted by primaries first and then by index and shard ID so a 2 indices with - * 2 replica and 1 shard would look like: - * [(0,P,IDX1), (0,P,IDX2), (0,R,IDX1), (0,R,IDX1), (0,R,IDX2), (0,R,IDX2)] - * if we allocate for instance (0, R, IDX1) we move the second replica to the secondary array and proceed with - * the next replica. If we could not find a node to allocate (0,R,IDX1) we move all it's replicas to ignoreUnassigned. - */ - ShardRouting[] primary = unassigned.drain(); - ShardRouting[] secondary = new ShardRouting[primary.length]; - int secondaryLength = 0; - int primaryLength = primary.length; - ArrayUtil.timSort(primary, comparator); - do { - for (int i = 0; i < primaryLength; i++) { - ShardRouting shard = primary[i]; - final AllocateUnassignedDecision allocationDecision = decideAllocateUnassigned(shard); - final String assignedNodeId = allocationDecision.getTargetNode() != null - ? allocationDecision.getTargetNode().getId() - : null; - final ModelNode minNode = assignedNodeId != null ? nodes.get(assignedNodeId) : null; - - if (allocationDecision.getAllocationDecision() == AllocationDecision.YES) { - if (logger.isTraceEnabled()) { - logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId()); - } - - final long shardSize = DiskThresholdDecider.getExpectedShardSize( - shard, - ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, - allocation.clusterInfo(), - allocation.snapshotShardSizeInfo(), - allocation.metadata(), - allocation.routingTable() - ); - shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize, allocation.changes()); - minNode.addShard(shard); - if (!shard.primary()) { - // copy over the same replica shards to the secondary array so they will get allocated - // in a subsequent iteration, allowing replicas of other shards to be allocated first - while (i < primaryLength - 1 && comparator.compare(primary[i], primary[i + 1]) == 0) { - secondary[secondaryLength++] = primary[++i]; - } - } - } else { - // did *not* receive a YES decision - if (logger.isTraceEnabled()) { - logger.trace( - "No eligible node found to assign shard [{}] allocation_status [{}]", - shard, - allocationDecision.getAllocationStatus() - ); - } - - if (minNode != null) { - // throttle decision scenario - assert allocationDecision.getAllocationStatus() == AllocationStatus.DECIDERS_THROTTLED; - final long shardSize = DiskThresholdDecider.getExpectedShardSize( - shard, - ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, - allocation.clusterInfo(), - allocation.snapshotShardSizeInfo(), - allocation.metadata(), - allocation.routingTable() - ); - minNode.addShard(shard.initialize(minNode.getNodeId(), null, shardSize)); - } else { - if (logger.isTraceEnabled()) { - logger.trace("No Node found to assign shard [{}]", shard); - } - } - - unassigned.ignoreShard(shard, allocationDecision.getAllocationStatus(), allocation.changes()); - if (!shard.primary()) { // we could not allocate it and we are a replica - check if we can ignore the other replicas - while (i < primaryLength - 1 && comparator.compare(primary[i], primary[i + 1]) == 0) { - unassigned.ignoreShard(primary[++i], allocationDecision.getAllocationStatus(), allocation.changes()); - } - } - } - } - primaryLength = secondaryLength; - ShardRouting[] tmp = primary; - primary = secondary; - secondary = tmp; - secondaryLength = 0; - } while (primaryLength > 0); - // clear everything we have either added it or moved to ignoreUnassigned - } - - /** - * Make a decision for allocating an unassigned shard. This method returns a two values in a tuple: the - * first value is the {@link Decision} taken to allocate the unassigned shard, the second value is the - * {@link ModelNode} representing the node that the shard should be assigned to. If the decision returned - * is of type {@link Type#NO}, then the assigned node will be null. - */ - private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) { - if (shard.assignedToNode()) { - // we only make decisions for unassigned shards here - return AllocateUnassignedDecision.NOT_TAKEN; - } - - final boolean explain = allocation.debugDecision(); - Decision shardLevelDecision = allocation.deciders().canAllocate(shard, allocation); - if (shardLevelDecision.type() == Type.NO && explain == false) { - // NO decision for allocating the shard, irrespective of any particular node, so exit early - return AllocateUnassignedDecision.no(AllocationStatus.DECIDERS_NO, null); - } - - /* find an node with minimal weight we can allocate on*/ - float minWeight = Float.POSITIVE_INFINITY; - ModelNode minNode = null; - Decision decision = null; - /* Don't iterate over an identity hashset here the - * iteration order is different for each run and makes testing hard */ - Map nodeExplanationMap = explain ? new HashMap<>() : null; - List> nodeWeights = explain ? new ArrayList<>() : null; - for (ModelNode node : nodes.values()) { - if (node.containsShard(shard) && explain == false) { - // decision is NO without needing to check anything further, so short circuit - continue; - } - - // weight of this index currently on the node - float currentWeight = weight.weightWithAllocationConstraints(this, node, shard.getIndexName()); - // moving the shard would not improve the balance, and we are not in explain mode, so short circuit - if (currentWeight > minWeight && explain == false) { - continue; - } - - Decision currentDecision = allocation.deciders().canAllocate(shard, node.getRoutingNode(), allocation); - if (explain) { - nodeExplanationMap.put(node.getNodeId(), new NodeAllocationResult(node.getRoutingNode().node(), currentDecision, 0)); - nodeWeights.add(Tuple.tuple(node.getNodeId(), currentWeight)); - } - if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) { - final boolean updateMinNode; - if (currentWeight == minWeight) { - /* we have an equal weight tie breaking: - * 1. if one decision is YES prefer it - * 2. prefer the node that holds the primary for this index with the next id in the ring ie. - * for the 3 shards 2 replica case we try to build up: - * 1 2 0 - * 2 0 1 - * 0 1 2 - * such that if we need to tie-break we try to prefer the node holding a shard with the minimal id greater - * than the id of the shard we need to assign. This works find when new indices are created since - * primaries are added first and we only add one shard set a time in this algorithm. - */ - if (currentDecision.type() == decision.type()) { - final int repId = shard.id(); - final int nodeHigh = node.highestPrimary(shard.index().getName()); - final int minNodeHigh = minNode.highestPrimary(shard.getIndexName()); - updateMinNode = ((((nodeHigh > repId && minNodeHigh > repId) || (nodeHigh < repId && minNodeHigh < repId)) - && (nodeHigh < minNodeHigh)) || (nodeHigh > repId && minNodeHigh < repId)); - } else { - updateMinNode = currentDecision.type() == Type.YES; - } - } else { - updateMinNode = currentWeight < minWeight; - } - if (updateMinNode) { - minNode = node; - minWeight = currentWeight; - decision = currentDecision; - } - } - } - if (decision == null) { - // decision was not set and a node was not assigned, so treat it as a NO decision - decision = Decision.NO; - } - List nodeDecisions = null; - if (explain) { - nodeDecisions = new ArrayList<>(); - // fill in the correct weight ranking, once we've been through all nodes - nodeWeights.sort((nodeWeight1, nodeWeight2) -> Float.compare(nodeWeight1.v2(), nodeWeight2.v2())); - int weightRanking = 0; - for (Tuple nodeWeight : nodeWeights) { - NodeAllocationResult current = nodeExplanationMap.get(nodeWeight.v1()); - nodeDecisions.add(new NodeAllocationResult(current.getNode(), current.getCanAllocateDecision(), ++weightRanking)); - } - } - return AllocateUnassignedDecision.fromDecision(decision, minNode != null ? minNode.routingNode.node() : null, nodeDecisions); - } - - private static final Comparator BY_DESCENDING_SHARD_ID = Comparator.comparing(ShardRouting::shardId).reversed(); - - /** - * Tries to find a relocation from the max node to the minimal node for an arbitrary shard of the given index on the - * balance model. Iff this method returns a true the relocation has already been executed on the - * simulation model as well as on the cluster. - */ - private boolean tryRelocateShard(ModelNode minNode, ModelNode maxNode, String idx) { - final ModelIndex index = maxNode.getIndex(idx); - if (index != null) { - logger.trace("Try relocating shard of [{}] from [{}] to [{}]", idx, maxNode.getNodeId(), minNode.getNodeId()); - final Iterable shardRoutings = StreamSupport.stream(index.spliterator(), false) - .filter(ShardRouting::started) // cannot rebalance unassigned, initializing or relocating shards anyway - .filter(maxNode::containsShard) - .sorted(BY_DESCENDING_SHARD_ID) // check in descending order of shard id so that the decision is deterministic - ::iterator; - - final AllocationDeciders deciders = allocation.deciders(); - for (ShardRouting shard : shardRoutings) { - final Decision rebalanceDecision = deciders.canRebalance(shard, allocation); - if (rebalanceDecision.type() == Type.NO) { - continue; - } - final Decision allocationDecision = deciders.canAllocate(shard, minNode.getRoutingNode(), allocation); - if (allocationDecision.type() == Type.NO) { - continue; - } - - final Decision decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision); - - maxNode.removeShard(shard); - long shardSize = allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); - - if (decision.type() == Type.YES) { - /* only allocate on the cluster if we are not throttled */ - logger.debug("Relocate [{}] from [{}] to [{}]", shard, maxNode.getNodeId(), minNode.getNodeId()); - minNode.addShard(routingNodes.relocateShard(shard, minNode.getNodeId(), shardSize, allocation.changes()).v1()); - return true; - } else { - /* allocate on the model even if throttled */ - logger.debug("Simulate relocation of [{}] from [{}] to [{}]", shard, maxNode.getNodeId(), minNode.getNodeId()); - assert decision.type() == Type.THROTTLE; - minNode.addShard(shard.relocate(minNode.getNodeId(), shardSize)); - return false; - } - } - } - logger.trace("No shards of [{}] can relocate from [{}] to [{}]", idx, maxNode.getNodeId(), minNode.getNodeId()); - return false; - } - - } - /** * A model node. * @@ -1277,6 +353,25 @@ public boolean containsShard(ShardRouting shard) { } + /** + * A {@link Balancer} used by the {@link BalancedShardsAllocator} to perform allocation operations + * @deprecated As of 2.4.0, replaced by {@link LocalShardsBalancer} + * + * @opensearch.internal + */ + @Deprecated + public static class Balancer extends LocalShardsBalancer { + public Balancer( + Logger logger, + RoutingAllocation allocation, + boolean movePrimaryFirst, + BalancedShardsAllocator.WeightFunction weight, + float threshold + ) { + super(logger, allocation, movePrimaryFirst, weight, threshold); + } + } + /** * A model index. * @@ -1346,10 +441,10 @@ static final class NodeSorter extends IntroSorter { final float[] weights; private final WeightFunction function; private String index; - private final Balancer balancer; + private final ShardsBalancer balancer; private float pivotWeight; - NodeSorter(ModelNode[] modelNodes, WeightFunction function, Balancer balancer) { + NodeSorter(ModelNode[] modelNodes, WeightFunction function, ShardsBalancer balancer) { this.function = function; this.balancer = balancer; this.modelNodes = modelNodes; diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java new file mode 100644 index 0000000000000..53d7c827392d5 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -0,0 +1,967 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation.allocator; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.IntroSorter; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; +import org.opensearch.cluster.routing.allocation.AllocationDecision; +import org.opensearch.cluster.routing.allocation.MoveDecision; +import org.opensearch.cluster.routing.allocation.NodeAllocationResult; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.opensearch.cluster.routing.allocation.decider.Decision; +import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; +import org.opensearch.common.collect.Tuple; +import org.opensearch.gateway.PriorityComparator; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.StreamSupport; + +import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING; + +/** + * A {@link LocalShardsBalancer} used by the {@link BalancedShardsAllocator} to perform allocation operations + * for local shards within the cluster. + * + * @opensearch.internal + */ +public class LocalShardsBalancer extends ShardsBalancer { + private final Logger logger; + private final Map nodes; + private final RoutingAllocation allocation; + private final RoutingNodes routingNodes; + private final boolean movePrimaryFirst; + private final BalancedShardsAllocator.WeightFunction weight; + + private final float threshold; + private final Metadata metadata; + private final float avgShardsPerNode; + private final BalancedShardsAllocator.NodeSorter sorter; + private final Set inEligibleTargetNode; + + public LocalShardsBalancer( + Logger logger, + RoutingAllocation allocation, + boolean movePrimaryFirst, + BalancedShardsAllocator.WeightFunction weight, + float threshold + ) { + this.logger = logger; + this.allocation = allocation; + this.movePrimaryFirst = movePrimaryFirst; + this.weight = weight; + this.threshold = threshold; + this.routingNodes = allocation.routingNodes(); + this.metadata = allocation.metadata(); + avgShardsPerNode = ((float) metadata.getTotalNumberOfShards()) / routingNodes.size(); + nodes = Collections.unmodifiableMap(buildModelFromAssigned()); + sorter = newNodeSorter(); + inEligibleTargetNode = new HashSet<>(); + } + + /** + * Returns an array view on the nodes in the balancer. Nodes should not be removed from this list. + */ + private BalancedShardsAllocator.ModelNode[] nodesArray() { + return nodes.values().toArray(new BalancedShardsAllocator.ModelNode[nodes.size()]); + } + + /** + * Returns the average of shards per node for the given index + */ + @Override + public float avgShardsPerNode(String index) { + return ((float) metadata.index(index).getTotalNumberOfShards()) / nodes.size(); + } + + /** + * Returns the global average of shards per node + */ + @Override + public float avgShardsPerNode() { + return avgShardsPerNode; + } + + /** + * Returns a new {@link BalancedShardsAllocator.NodeSorter} that sorts the nodes based on their + * current weight with respect to the index passed to the sorter. The + * returned sorter is not sorted. Use {@link BalancedShardsAllocator.NodeSorter#reset(String)} + * to sort based on an index. + */ + private BalancedShardsAllocator.NodeSorter newNodeSorter() { + return new BalancedShardsAllocator.NodeSorter(nodesArray(), weight, this); + } + + /** + * The absolute value difference between two weights. + */ + private static float absDelta(float lower, float higher) { + assert higher >= lower : higher + " lt " + lower + " but was expected to be gte"; + return Math.abs(higher - lower); + } + + /** + * Returns {@code true} iff the weight delta between two nodes is under a defined threshold. + * See {@link BalancedShardsAllocator#THRESHOLD_SETTING} for defining the threshold. + */ + private static boolean lessThan(float delta, float threshold) { + /* deltas close to the threshold are "rounded" to the threshold manually + to prevent floating point problems if the delta is very close to the + threshold ie. 1.000000002 which can trigger unnecessary balance actions*/ + return delta <= (threshold + 0.001f); + } + + /** + * Balances the nodes on the cluster model according to the weight function. + * The actual balancing is delegated to {@link #balanceByWeights()} + */ + @Override + void balance() { + if (logger.isTraceEnabled()) { + logger.trace("Start balancing cluster"); + } + if (allocation.hasPendingAsyncFetch()) { + /* + * see https://github.com/elastic/elasticsearch/issues/14387 + * if we allow rebalance operations while we are still fetching shard store data + * we might end up with unnecessary rebalance operations which can be super confusion/frustrating + * since once the fetches come back we might just move all the shards back again. + * Therefore we only do a rebalance if we have fetched all information. + */ + logger.debug("skipping rebalance due to in-flight shard/store fetches"); + return; + } + if (allocation.deciders().canRebalance(allocation).type() != Decision.Type.YES) { + logger.trace("skipping rebalance as it is disabled"); + return; + } + if (nodes.size() < 2) { /* skip if we only have one node */ + logger.trace("skipping rebalance as single node only"); + return; + } + balanceByWeights(); + } + + /** + * Makes a decision about moving a single shard to a different node to form a more + * optimally balanced cluster. This method is invoked from the cluster allocation + * explain API only. + */ + @Override + MoveDecision decideRebalance(final ShardRouting shard) { + if (shard.started() == false) { + // we can only rebalance started shards + return MoveDecision.NOT_TAKEN; + } + + Decision canRebalance = allocation.deciders().canRebalance(shard, allocation); + + sorter.reset(shard.getIndexName()); + BalancedShardsAllocator.ModelNode[] modelNodes = sorter.modelNodes; + final String currentNodeId = shard.currentNodeId(); + // find currently assigned node + BalancedShardsAllocator.ModelNode currentNode = null; + for (BalancedShardsAllocator.ModelNode node : modelNodes) { + if (node.getNodeId().equals(currentNodeId)) { + currentNode = node; + break; + } + } + assert currentNode != null : "currently assigned node could not be found"; + + // balance the shard, if a better node can be found + final String idxName = shard.getIndexName(); + final float currentWeight = weight.weight(this, currentNode, idxName); + final AllocationDeciders deciders = allocation.deciders(); + Decision.Type rebalanceDecisionType = Decision.Type.NO; + BalancedShardsAllocator.ModelNode assignedNode = null; + List> betterBalanceNodes = new ArrayList<>(); + List> sameBalanceNodes = new ArrayList<>(); + List> worseBalanceNodes = new ArrayList<>(); + for (BalancedShardsAllocator.ModelNode node : modelNodes) { + if (node == currentNode) { + continue; // skip over node we're currently allocated to + } + final Decision canAllocate = deciders.canAllocate(shard, node.getRoutingNode(), allocation); + // the current weight of the node in the cluster, as computed by the weight function; + // this is a comparison of the number of shards on this node to the number of shards + // that should be on each node on average (both taking the cluster as a whole into account + // as well as shards per index) + final float nodeWeight = weight.weight(this, node, idxName); + // if the node we are examining has a worse (higher) weight than the node the shard is + // assigned to, then there is no way moving the shard to the node with the worse weight + // can make the balance of the cluster better, so we check for that here + final boolean betterWeightThanCurrent = nodeWeight <= currentWeight; + boolean rebalanceConditionsMet = false; + if (betterWeightThanCurrent) { + // get the delta between the weights of the node we are checking and the node that holds the shard + float currentDelta = absDelta(nodeWeight, currentWeight); + // checks if the weight delta is above a certain threshold; if it is not above a certain threshold, + // then even though the node we are examining has a better weight and may make the cluster balance + // more even, it doesn't make sense to execute the heavyweight operation of relocating a shard unless + // the gains make it worth it, as defined by the threshold + boolean deltaAboveThreshold = lessThan(currentDelta, threshold) == false; + // calculate the delta of the weights of the two nodes if we were to add the shard to the + // node in question and move it away from the node that currently holds it. + // hence we add 2.0f to the weight delta + float proposedDelta = 2.0f + nodeWeight - currentWeight; + boolean betterWeightWithShardAdded = proposedDelta < currentDelta; + + rebalanceConditionsMet = deltaAboveThreshold && betterWeightWithShardAdded; + // if the simulated weight delta with the shard moved away is better than the weight delta + // with the shard remaining on the current node, and we are allowed to allocate to the + // node in question, then allow the rebalance + if (rebalanceConditionsMet && canAllocate.type().higherThan(rebalanceDecisionType)) { + // rebalance to the node, only will get overwritten if the decision here is to + // THROTTLE and we get a decision with YES on another node + rebalanceDecisionType = canAllocate.type(); + assignedNode = node; + } + } + Tuple nodeResult = Tuple.tuple(node, canAllocate); + if (rebalanceConditionsMet) { + betterBalanceNodes.add(nodeResult); + } else if (betterWeightThanCurrent) { + sameBalanceNodes.add(nodeResult); + } else { + worseBalanceNodes.add(nodeResult); + } + } + + int weightRanking = 0; + List nodeDecisions = new ArrayList<>(modelNodes.length - 1); + for (Tuple result : betterBalanceNodes) { + nodeDecisions.add( + new NodeAllocationResult( + result.v1().getRoutingNode().node(), + AllocationDecision.fromDecisionType(result.v2().type()), + result.v2(), + ++weightRanking + ) + ); + } + int currentNodeWeightRanking = ++weightRanking; + for (Tuple result : sameBalanceNodes) { + AllocationDecision nodeDecision = result.v2().type() == Decision.Type.NO + ? AllocationDecision.NO + : AllocationDecision.WORSE_BALANCE; + nodeDecisions.add( + new NodeAllocationResult(result.v1().getRoutingNode().node(), nodeDecision, result.v2(), currentNodeWeightRanking) + ); + } + for (Tuple result : worseBalanceNodes) { + AllocationDecision nodeDecision = result.v2().type() == Decision.Type.NO + ? AllocationDecision.NO + : AllocationDecision.WORSE_BALANCE; + nodeDecisions.add(new NodeAllocationResult(result.v1().getRoutingNode().node(), nodeDecision, result.v2(), ++weightRanking)); + } + + if (canRebalance.type() != Decision.Type.YES || allocation.hasPendingAsyncFetch()) { + AllocationDecision allocationDecision = allocation.hasPendingAsyncFetch() + ? AllocationDecision.AWAITING_INFO + : AllocationDecision.fromDecisionType(canRebalance.type()); + return MoveDecision.cannotRebalance(canRebalance, allocationDecision, currentNodeWeightRanking, nodeDecisions); + } else { + return MoveDecision.rebalance( + canRebalance, + AllocationDecision.fromDecisionType(rebalanceDecisionType), + assignedNode != null ? assignedNode.getRoutingNode().node() : null, + currentNodeWeightRanking, + nodeDecisions + ); + } + } + + /** + * Balances the nodes on the cluster model according to the weight + * function. The configured threshold is the minimum delta between the + * weight of the maximum node and the minimum node according to the + * {@link BalancedShardsAllocator.WeightFunction}. This weight is calculated per index to + * distribute shards evenly per index. The balancer tries to relocate + * shards only if the delta exceeds the threshold. In the default case + * the threshold is set to {@code 1.0} to enforce gaining relocation + * only, or in other words relocations that move the weight delta closer + * to {@code 0.0} + */ + private void balanceByWeights() { + final AllocationDeciders deciders = allocation.deciders(); + final BalancedShardsAllocator.ModelNode[] modelNodes = sorter.modelNodes; + final float[] weights = sorter.weights; + for (String index : buildWeightOrderedIndices()) { + IndexMetadata indexMetadata = metadata.index(index); + + // find nodes that have a shard of this index or where shards of this index are allowed to be allocated to, + // move these nodes to the front of modelNodes so that we can only balance based on these nodes + int relevantNodes = 0; + for (int i = 0; i < modelNodes.length; i++) { + BalancedShardsAllocator.ModelNode modelNode = modelNodes[i]; + if (modelNode.getIndex(index) != null + || deciders.canAllocate(indexMetadata, modelNode.getRoutingNode(), allocation).type() != Decision.Type.NO) { + // swap nodes at position i and relevantNodes + modelNodes[i] = modelNodes[relevantNodes]; + modelNodes[relevantNodes] = modelNode; + relevantNodes++; + } + } + + if (relevantNodes < 2) { + continue; + } + + sorter.reset(index, 0, relevantNodes); + int lowIdx = 0; + int highIdx = relevantNodes - 1; + while (true) { + final BalancedShardsAllocator.ModelNode minNode = modelNodes[lowIdx]; + final BalancedShardsAllocator.ModelNode maxNode = modelNodes[highIdx]; + advance_range: if (maxNode.numShards(index) > 0) { + final float delta = absDelta(weights[lowIdx], weights[highIdx]); + if (lessThan(delta, threshold)) { + if (lowIdx > 0 + && highIdx - 1 > 0 // is there a chance for a higher delta? + && (absDelta(weights[0], weights[highIdx - 1]) > threshold) // check if we need to break at all + ) { + /* This is a special case if allocations from the "heaviest" to the "lighter" nodes is not possible + * due to some allocation decider restrictions like zone awareness. if one zone has for instance + * less nodes than another zone. so one zone is horribly overloaded from a balanced perspective but we + * can't move to the "lighter" shards since otherwise the zone would go over capacity. + * + * This break jumps straight to the condition below were we start moving from the high index towards + * the low index to shrink the window we are considering for balance from the other direction. + * (check shrinking the window from MAX to MIN) + * See #3580 + */ + break advance_range; + } + if (logger.isTraceEnabled()) { + logger.trace( + "Stop balancing index [{}] min_node [{}] weight: [{}]" + " max_node [{}] weight: [{}] delta: [{}]", + index, + maxNode.getNodeId(), + weights[highIdx], + minNode.getNodeId(), + weights[lowIdx], + delta + ); + } + break; + } + if (logger.isTraceEnabled()) { + logger.trace( + "Balancing from node [{}] weight: [{}] to node [{}] weight: [{}] delta: [{}]", + maxNode.getNodeId(), + weights[highIdx], + minNode.getNodeId(), + weights[lowIdx], + delta + ); + } + if (delta <= 1.0f) { + /* + * prevent relocations that only swap the weights of the two nodes. a relocation must bring us closer to the + * balance if we only achieve the same delta the relocation is useless + * + * NB this comment above was preserved from an earlier version but doesn't obviously describe the code today. We + * already know that lessThan(delta, threshold) == false and threshold defaults to 1.0, so by default we never + * hit this case anyway. + */ + logger.trace( + "Couldn't find shard to relocate from node [{}] to node [{}]", + maxNode.getNodeId(), + minNode.getNodeId() + ); + } else if (tryRelocateShard(minNode, maxNode, index)) { + /* + * TODO we could be a bit smarter here, we don't need to fully sort necessarily + * we could just find the place to insert linearly but the win might be minor + * compared to the added complexity + */ + weights[lowIdx] = sorter.weight(modelNodes[lowIdx]); + weights[highIdx] = sorter.weight(modelNodes[highIdx]); + sorter.sort(0, relevantNodes); + lowIdx = 0; + highIdx = relevantNodes - 1; + continue; + } + } + if (lowIdx < highIdx - 1) { + /* Shrinking the window from MIN to MAX + * we can't move from any shard from the min node lets move on to the next node + * and see if the threshold still holds. We either don't have any shard of this + * index on this node of allocation deciders prevent any relocation.*/ + lowIdx++; + } else if (lowIdx > 0) { + /* Shrinking the window from MAX to MIN + * now we go max to min since obviously we can't move anything to the max node + * lets pick the next highest */ + lowIdx = 0; + highIdx--; + } else { + /* we are done here, we either can't relocate anymore or we are balanced */ + break; + } + } + } + } + + /** + * This builds a initial index ordering where the indices are returned + * in most unbalanced first. We need this in order to prevent over + * allocations on added nodes from one index when the weight parameters + * for global balance overrule the index balance at an intermediate + * state. For example this can happen if we have 3 nodes and 3 indices + * with 3 primary and 1 replica shards. At the first stage all three nodes hold + * 2 shard for each index. Now we add another node and the first index + * is balanced moving three shards from two of the nodes over to the new node since it + * has no shards yet and global balance for the node is way below + * average. To re-balance we need to move shards back eventually likely + * to the nodes we relocated them from. + */ + private String[] buildWeightOrderedIndices() { + final String[] indices = allocation.routingTable().indicesRouting().keys().toArray(String.class); + final float[] deltas = new float[indices.length]; + for (int i = 0; i < deltas.length; i++) { + sorter.reset(indices[i]); + deltas[i] = sorter.delta(); + } + new IntroSorter() { + + float pivotWeight; + + @Override + protected void swap(int i, int j) { + final String tmpIdx = indices[i]; + indices[i] = indices[j]; + indices[j] = tmpIdx; + final float tmpDelta = deltas[i]; + deltas[i] = deltas[j]; + deltas[j] = tmpDelta; + } + + @Override + protected int compare(int i, int j) { + return Float.compare(deltas[j], deltas[i]); + } + + @Override + protected void setPivot(int i) { + pivotWeight = deltas[i]; + } + + @Override + protected int comparePivot(int j) { + return Float.compare(deltas[j], pivotWeight); + } + }.sort(0, deltas.length); + + return indices; + } + + /** + * Checks if target node is ineligible and if so, adds to the list + * of ineligible target nodes + */ + private void checkAndAddInEligibleTargetNode(RoutingNode targetNode) { + Decision nodeLevelAllocationDecision = allocation.deciders().canAllocateAnyShardToNode(targetNode, allocation); + if (nodeLevelAllocationDecision.type() != Decision.Type.YES) { + inEligibleTargetNode.add(targetNode); + } + } + + /** + * Move started shards that can not be allocated to a node anymore + * + * For each shard to be moved this function executes a move operation + * to the minimal eligible node with respect to the + * weight function. If a shard is moved the shard will be set to + * {@link ShardRoutingState#RELOCATING} and a shadow instance of this + * shard is created with an incremented version in the state + * {@link ShardRoutingState#INITIALIZING}. + */ + @Override + void moveShards() { + // Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling + // shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are + // offloading the shards. + + // Trying to eliminate target nodes so that we donot unnecessarily iterate over source nodes + // when no target is eligible + for (BalancedShardsAllocator.ModelNode currentNode : sorter.modelNodes) { + checkAndAddInEligibleTargetNode(currentNode.getRoutingNode()); + } + boolean primariesThrottled = false; + for (Iterator it = allocation.routingNodes().nodeInterleavedShardIterator(movePrimaryFirst); it.hasNext();) { + // Verify if the cluster concurrent recoveries have been reached. + if (allocation.deciders().canMoveAnyShard(allocation).type() != Decision.Type.YES) { + logger.info( + "Cannot move any shard in the cluster due to cluster concurrent recoveries getting breached" + + ". Skipping shard iteration" + ); + return; + } + // Early terminate node interleaved shard iteration when no eligible target nodes are available + if (sorter.modelNodes.length == inEligibleTargetNode.size()) { + logger.info( + "Cannot move any shard in the cluster as there is no node on which shards can be allocated" + + ". Skipping shard iteration" + ); + return; + } + + ShardRouting shardRouting = it.next(); + + // Ensure that replicas don't relocate if primaries are being throttled and primary first is enabled + if (movePrimaryFirst && primariesThrottled && !shardRouting.primary()) { + logger.info( + "Cannot move any replica shard in the cluster as movePrimaryFirst is enabled and primary shards" + + "are being throttled. Skipping shard iteration" + ); + return; + } + + // Verify if the shard is allowed to move if outgoing recovery on the node hosting the primary shard + // is not being throttled. + Decision canMoveAwayDecision = allocation.deciders().canMoveAway(shardRouting, allocation); + if (canMoveAwayDecision.type() != Decision.Type.YES) { + if (logger.isDebugEnabled()) logger.debug("Cannot move away shard [{}] Skipping this shard", shardRouting); + if (shardRouting.primary() && canMoveAwayDecision.type() == Decision.Type.THROTTLE) { + primariesThrottled = true; + } + continue; + } + + final MoveDecision moveDecision = decideMove(shardRouting); + if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) { + final BalancedShardsAllocator.ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); + final BalancedShardsAllocator.ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId()); + sourceNode.removeShard(shardRouting); + Tuple relocatingShards = routingNodes.relocateShard( + shardRouting, + targetNode.getNodeId(), + allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), + allocation.changes() + ); + targetNode.addShard(relocatingShards.v2()); + if (logger.isTraceEnabled()) { + logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode()); + } + + // Verifying if this node can be considered ineligible for further iterations + if (targetNode != null) { + checkAndAddInEligibleTargetNode(targetNode.getRoutingNode()); + } + } else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) { + logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); + } + } + } + + /** + * Makes a decision on whether to move a started shard to another node. The following rules apply + * to the {@link MoveDecision} return object: + * 1. If the shard is not started, no decision will be taken and {@link MoveDecision#isDecisionTaken()} will return false. + * 2. If the shard is allowed to remain on its current node, no attempt will be made to move the shard and + * {@link MoveDecision#getCanRemainDecision} will have a decision type of YES. All other fields in the object will be null. + * 3. If the shard is not allowed to remain on its current node, then {@link MoveDecision#getAllocationDecision()} will be + * populated with the decision of moving to another node. If {@link MoveDecision#forceMove()} ()} returns {@code true}, then + * {@link MoveDecision#getTargetNode} will return a non-null value, otherwise the assignedNodeId will be null. + * 4. If the method is invoked in explain mode (e.g. from the cluster allocation explain APIs), then + * {@link MoveDecision#getNodeDecisions} will have a non-null value. + */ + @Override + MoveDecision decideMove(final ShardRouting shardRouting) { + if (shardRouting.started() == false) { + // we can only move started shards + return MoveDecision.NOT_TAKEN; + } + + final boolean explain = allocation.debugDecision(); + final BalancedShardsAllocator.ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); + assert sourceNode != null && sourceNode.containsShard(shardRouting); + RoutingNode routingNode = sourceNode.getRoutingNode(); + Decision canRemain = allocation.deciders().canRemain(shardRouting, routingNode, allocation); + if (canRemain.type() != Decision.Type.NO) { + return MoveDecision.stay(canRemain); + } + + sorter.reset(shardRouting.getIndexName()); + /* + * the sorter holds the minimum weight node first for the shards index. + * We now walk through the nodes until we find a node to allocate the shard. + * This is not guaranteed to be balanced after this operation we still try best effort to + * allocate on the minimal eligible node. + */ + Decision.Type bestDecision = Decision.Type.NO; + RoutingNode targetNode = null; + final List nodeExplanationMap = explain ? new ArrayList<>() : null; + int weightRanking = 0; + int targetNodeProcessed = 0; + for (BalancedShardsAllocator.ModelNode currentNode : sorter.modelNodes) { + if (currentNode != sourceNode) { + RoutingNode target = currentNode.getRoutingNode(); + if (!explain && inEligibleTargetNode.contains(target)) continue; + // don't use canRebalance as we want hard filtering rules to apply. See #17698 + if (!explain) { + // If we cannot allocate any shard to node marking it in eligible + Decision nodeLevelAllocationDecision = allocation.deciders().canAllocateAnyShardToNode(target, allocation); + if (nodeLevelAllocationDecision.type() != Decision.Type.YES) { + inEligibleTargetNode.add(currentNode.getRoutingNode()); + continue; + } + } + targetNodeProcessed++; + // don't use canRebalance as we want hard filtering rules to apply. See #17698 + Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation); + if (explain) { + nodeExplanationMap.add( + new NodeAllocationResult(currentNode.getRoutingNode().node(), allocationDecision, ++weightRanking) + ); + } + // TODO maybe we can respect throttling here too? + if (allocationDecision.type().higherThan(bestDecision)) { + bestDecision = allocationDecision.type(); + if (bestDecision == Decision.Type.YES) { + targetNode = target; + if (explain == false) { + // we are not in explain mode and already have a YES decision on the best weighted node, + // no need to continue iterating + break; + } + } + } + } + } + + return MoveDecision.cannotRemain( + canRemain, + AllocationDecision.fromDecisionType(bestDecision), + targetNode != null ? targetNode.node() : null, + nodeExplanationMap + ); + } + + /** + * Builds the internal model from all shards in the given + * {@link Iterable}. All shards in the {@link Iterable} must be assigned + * to a node. This method will skip shards in the state + * {@link ShardRoutingState#RELOCATING} since each relocating shard has + * a shadow shard in the state {@link ShardRoutingState#INITIALIZING} + * on the target node which we respect during the allocation / balancing + * process. In short, this method recreates the status-quo in the cluster. + */ + private Map buildModelFromAssigned() { + Map nodes = new HashMap<>(); + for (RoutingNode rn : routingNodes) { + BalancedShardsAllocator.ModelNode node = new BalancedShardsAllocator.ModelNode(rn); + nodes.put(rn.nodeId(), node); + for (ShardRouting shard : rn) { + assert rn.nodeId().equals(shard.currentNodeId()); + /* we skip relocating shards here since we expect an initializing shard with the same id coming in */ + if (shard.state() != RELOCATING) { + node.addShard(shard); + if (logger.isTraceEnabled()) { + logger.trace("Assigned shard [{}] to node [{}]", shard, node.getNodeId()); + } + } + } + } + return nodes; + } + + /** + * Allocates all given shards on the minimal eligible node for the shards index + * with respect to the weight function. All given shards must be unassigned. + */ + @Override + void allocateUnassigned() { + RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned(); + assert !nodes.isEmpty(); + if (logger.isTraceEnabled()) { + logger.trace("Start allocating unassigned shards"); + } + if (unassigned.isEmpty()) { + return; + } + + /* + * TODO: We could be smarter here and group the shards by index and then + * use the sorter to save some iterations. + */ + final PriorityComparator secondaryComparator = PriorityComparator.getAllocationComparator(allocation); + final Comparator comparator = (o1, o2) -> { + if (o1.primary() ^ o2.primary()) { + return o1.primary() ? -1 : 1; + } + final int indexCmp; + if ((indexCmp = o1.getIndexName().compareTo(o2.getIndexName())) == 0) { + return o1.getId() - o2.getId(); + } + // this comparator is more expensive than all the others up there + // that's why it's added last even though it could be easier to read + // if we'd apply it earlier. this comparator will only differentiate across + // indices all shards of the same index is treated equally. + final int secondary = secondaryComparator.compare(o1, o2); + return secondary == 0 ? indexCmp : secondary; + }; + /* + * we use 2 arrays and move replicas to the second array once we allocated an identical + * replica in the current iteration to make sure all indices get allocated in the same manner. + * The arrays are sorted by primaries first and then by index and shard ID so a 2 indices with + * 2 replica and 1 shard would look like: + * [(0,P,IDX1), (0,P,IDX2), (0,R,IDX1), (0,R,IDX1), (0,R,IDX2), (0,R,IDX2)] + * if we allocate for instance (0, R, IDX1) we move the second replica to the secondary array and proceed with + * the next replica. If we could not find a node to allocate (0,R,IDX1) we move all it's replicas to ignoreUnassigned. + */ + ShardRouting[] primary = unassigned.drain(); + ShardRouting[] secondary = new ShardRouting[primary.length]; + int secondaryLength = 0; + int primaryLength = primary.length; + ArrayUtil.timSort(primary, comparator); + do { + for (int i = 0; i < primaryLength; i++) { + ShardRouting shard = primary[i]; + final AllocateUnassignedDecision allocationDecision = decideAllocateUnassigned(shard); + final String assignedNodeId = allocationDecision.getTargetNode() != null + ? allocationDecision.getTargetNode().getId() + : null; + final BalancedShardsAllocator.ModelNode minNode = assignedNodeId != null ? nodes.get(assignedNodeId) : null; + + if (allocationDecision.getAllocationDecision() == AllocationDecision.YES) { + if (logger.isTraceEnabled()) { + logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId()); + } + + final long shardSize = DiskThresholdDecider.getExpectedShardSize( + shard, + ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, + allocation.clusterInfo(), + allocation.snapshotShardSizeInfo(), + allocation.metadata(), + allocation.routingTable() + ); + shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize, allocation.changes()); + minNode.addShard(shard); + if (!shard.primary()) { + // copy over the same replica shards to the secondary array so they will get allocated + // in a subsequent iteration, allowing replicas of other shards to be allocated first + while (i < primaryLength - 1 && comparator.compare(primary[i], primary[i + 1]) == 0) { + secondary[secondaryLength++] = primary[++i]; + } + } + } else { + // did *not* receive a YES decision + if (logger.isTraceEnabled()) { + logger.trace( + "No eligible node found to assign shard [{}] allocation_status [{}]", + shard, + allocationDecision.getAllocationStatus() + ); + } + + if (minNode != null) { + // throttle decision scenario + assert allocationDecision.getAllocationStatus() == UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED; + final long shardSize = DiskThresholdDecider.getExpectedShardSize( + shard, + ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, + allocation.clusterInfo(), + allocation.snapshotShardSizeInfo(), + allocation.metadata(), + allocation.routingTable() + ); + minNode.addShard(shard.initialize(minNode.getNodeId(), null, shardSize)); + } else { + if (logger.isTraceEnabled()) { + logger.trace("No Node found to assign shard [{}]", shard); + } + } + + unassigned.ignoreShard(shard, allocationDecision.getAllocationStatus(), allocation.changes()); + if (!shard.primary()) { // we could not allocate it and we are a replica - check if we can ignore the other replicas + while (i < primaryLength - 1 && comparator.compare(primary[i], primary[i + 1]) == 0) { + unassigned.ignoreShard(primary[++i], allocationDecision.getAllocationStatus(), allocation.changes()); + } + } + } + } + primaryLength = secondaryLength; + ShardRouting[] tmp = primary; + primary = secondary; + secondary = tmp; + secondaryLength = 0; + } while (primaryLength > 0); + // clear everything we have either added it or moved to ignoreUnassigned + } + + /** + * Make a decision for allocating an unassigned shard. This method returns a two values in a tuple: the + * first value is the {@link Decision} taken to allocate the unassigned shard, the second value is the + * {@link BalancedShardsAllocator.ModelNode} representing the node that the shard should be assigned to. If the decision returned + * is of type {@link Decision.Type#NO}, then the assigned node will be null. + */ + @Override + AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) { + if (shard.assignedToNode()) { + // we only make decisions for unassigned shards here + return AllocateUnassignedDecision.NOT_TAKEN; + } + + final boolean explain = allocation.debugDecision(); + Decision shardLevelDecision = allocation.deciders().canAllocate(shard, allocation); + if (shardLevelDecision.type() == Decision.Type.NO && explain == false) { + // NO decision for allocating the shard, irrespective of any particular node, so exit early + return AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.DECIDERS_NO, null); + } + + /* find an node with minimal weight we can allocate on*/ + float minWeight = Float.POSITIVE_INFINITY; + BalancedShardsAllocator.ModelNode minNode = null; + Decision decision = null; + /* Don't iterate over an identity hashset here the + * iteration order is different for each run and makes testing hard */ + Map nodeExplanationMap = explain ? new HashMap<>() : null; + List> nodeWeights = explain ? new ArrayList<>() : null; + for (BalancedShardsAllocator.ModelNode node : nodes.values()) { + if (node.containsShard(shard) && explain == false) { + // decision is NO without needing to check anything further, so short circuit + continue; + } + + // weight of this index currently on the node + float currentWeight = weight.weightWithAllocationConstraints(this, node, shard.getIndexName()); + // moving the shard would not improve the balance, and we are not in explain mode, so short circuit + if (currentWeight > minWeight && explain == false) { + continue; + } + + Decision currentDecision = allocation.deciders().canAllocate(shard, node.getRoutingNode(), allocation); + if (explain) { + nodeExplanationMap.put(node.getNodeId(), new NodeAllocationResult(node.getRoutingNode().node(), currentDecision, 0)); + nodeWeights.add(Tuple.tuple(node.getNodeId(), currentWeight)); + } + if (currentDecision.type() == Decision.Type.YES || currentDecision.type() == Decision.Type.THROTTLE) { + final boolean updateMinNode; + if (currentWeight == minWeight) { + /* we have an equal weight tie breaking: + * 1. if one decision is YES prefer it + * 2. prefer the node that holds the primary for this index with the next id in the ring ie. + * for the 3 shards 2 replica case we try to build up: + * 1 2 0 + * 2 0 1 + * 0 1 2 + * such that if we need to tie-break we try to prefer the node holding a shard with the minimal id greater + * than the id of the shard we need to assign. This works find when new indices are created since + * primaries are added first and we only add one shard set a time in this algorithm. + */ + if (currentDecision.type() == decision.type()) { + final int repId = shard.id(); + final int nodeHigh = node.highestPrimary(shard.index().getName()); + final int minNodeHigh = minNode.highestPrimary(shard.getIndexName()); + updateMinNode = ((((nodeHigh > repId && minNodeHigh > repId) || (nodeHigh < repId && minNodeHigh < repId)) + && (nodeHigh < minNodeHigh)) || (nodeHigh > repId && minNodeHigh < repId)); + } else { + updateMinNode = currentDecision.type() == Decision.Type.YES; + } + } else { + updateMinNode = currentWeight < minWeight; + } + if (updateMinNode) { + minNode = node; + minWeight = currentWeight; + decision = currentDecision; + } + } + } + if (decision == null) { + // decision was not set and a node was not assigned, so treat it as a NO decision + decision = Decision.NO; + } + List nodeDecisions = null; + if (explain) { + nodeDecisions = new ArrayList<>(); + // fill in the correct weight ranking, once we've been through all nodes + nodeWeights.sort((nodeWeight1, nodeWeight2) -> Float.compare(nodeWeight1.v2(), nodeWeight2.v2())); + int weightRanking = 0; + for (Tuple nodeWeight : nodeWeights) { + NodeAllocationResult current = nodeExplanationMap.get(nodeWeight.v1()); + nodeDecisions.add(new NodeAllocationResult(current.getNode(), current.getCanAllocateDecision(), ++weightRanking)); + } + } + return AllocateUnassignedDecision.fromDecision(decision, minNode != null ? minNode.getRoutingNode().node() : null, nodeDecisions); + } + + private static final Comparator BY_DESCENDING_SHARD_ID = Comparator.comparing(ShardRouting::shardId).reversed(); + + /** + * Tries to find a relocation from the max node to the minimal node for an arbitrary shard of the given index on the + * balance model. Iff this method returns a true the relocation has already been executed on the + * simulation model as well as on the cluster. + */ + private boolean tryRelocateShard(BalancedShardsAllocator.ModelNode minNode, BalancedShardsAllocator.ModelNode maxNode, String idx) { + final BalancedShardsAllocator.ModelIndex index = maxNode.getIndex(idx); + if (index != null) { + logger.trace("Try relocating shard of [{}] from [{}] to [{}]", idx, maxNode.getNodeId(), minNode.getNodeId()); + final Iterable shardRoutings = StreamSupport.stream(index.spliterator(), false) + .filter(ShardRouting::started) // cannot rebalance unassigned, initializing or relocating shards anyway + .filter(maxNode::containsShard) + .sorted(BY_DESCENDING_SHARD_ID) // check in descending order of shard id so that the decision is deterministic + ::iterator; + + final AllocationDeciders deciders = allocation.deciders(); + for (ShardRouting shard : shardRoutings) { + final Decision rebalanceDecision = deciders.canRebalance(shard, allocation); + if (rebalanceDecision.type() == Decision.Type.NO) { + continue; + } + final Decision allocationDecision = deciders.canAllocate(shard, minNode.getRoutingNode(), allocation); + if (allocationDecision.type() == Decision.Type.NO) { + continue; + } + + final Decision decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision); + + maxNode.removeShard(shard); + long shardSize = allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); + + if (decision.type() == Decision.Type.YES) { + /* only allocate on the cluster if we are not throttled */ + logger.debug("Relocate [{}] from [{}] to [{}]", shard, maxNode.getNodeId(), minNode.getNodeId()); + minNode.addShard(routingNodes.relocateShard(shard, minNode.getNodeId(), shardSize, allocation.changes()).v1()); + return true; + } else { + /* allocate on the model even if throttled */ + logger.debug("Simulate relocation of [{}] from [{}] to [{}]", shard, maxNode.getNodeId(), minNode.getNodeId()); + assert decision.type() == Decision.Type.THROTTLE; + minNode.addShard(shard.relocate(minNode.getNodeId(), shardSize)); + return false; + } + } + } + logger.trace("No shards of [{}] can relocate from [{}] to [{}]", idx, maxNode.getNodeId(), minNode.getNodeId()); + return false; + } + +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java new file mode 100644 index 0000000000000..593e6998141fb --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java @@ -0,0 +1,75 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation.allocator; + +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; +import org.opensearch.cluster.routing.allocation.MoveDecision; + +/** + *

+ * A {@link ShardsBalancer} helps the {@link BalancedShardsAllocator} to perform allocation and balancing + * operations on the cluster. + *

+ * + * @opensearch.internal + */ +public abstract class ShardsBalancer { + + /** + * Performs allocation of unassigned shards on nodes within the cluster. + */ + abstract void allocateUnassigned(); + + /** + * Moves shards that cannot be allocated to a node anymore. + */ + abstract void moveShards(); + + /** + * Balances the nodes on the cluster model. + */ + abstract void balance(); + + /** + * Make a decision for allocating an unassigned shard. + * @param shardRouting the shard for which the decision has to be made + * @return the allocation decision + */ + abstract AllocateUnassignedDecision decideAllocateUnassigned(ShardRouting shardRouting); + + /** + * Makes a decision on whether to move a started shard to another node. + * @param shardRouting the shard for which the decision has to be made + * @return a move decision for the shard + */ + abstract MoveDecision decideMove(ShardRouting shardRouting); + + /** + * Makes a decision about moving a single shard to a different node to form a more + * optimally balanced cluster. + * @param shardRouting the shard for which the move decision has to be made + * @return a move decision for the shard + */ + abstract MoveDecision decideRebalance(ShardRouting shardRouting); + + /** + * Returns the average of shards per node for the given index + */ + public float avgShardsPerNode() { + return Float.MAX_VALUE; + } + + /** + * Returns the global average of shards per node + */ + public float avgShardsPerNode(String index) { + return Float.MAX_VALUE; + } +} diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index b5702431ed4bf..556e4db3400e1 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -54,6 +54,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.util.concurrent.AbstractRunnable; +import org.opensearch.index.IndexModule; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.engine.RecoveryEngineException; import org.opensearch.index.mapper.MapperException; @@ -244,8 +245,10 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node"; logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); indexShard.prepareForIndexRecovery(); - boolean remoteTranslogEnabled = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); - final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!remoteTranslogEnabled); + final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); + final boolean hasNoTranslog = IndexModule.Type.REMOTE_SNAPSHOT.match(indexShard.indexSettings()); + final boolean verifyTranslog = (hasRemoteTranslog || hasNoTranslog) == false; + final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!hasRemoteTranslog); assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]"; startRequest = getStartRecoveryRequest( @@ -253,7 +256,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi clusterService.localNode(), recoveryTarget, startingSeqNo, - !remoteTranslogEnabled + verifyTranslog ); requestToSend = startRequest; actionName = PeerRecoverySourceService.Actions.START_RECOVERY; diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index c1e29e0d866d8..b6122dbeeea09 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -44,6 +44,7 @@ import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.util.CancellableThreads; +import org.opensearch.index.IndexModule; import org.opensearch.index.engine.Engine; import org.opensearch.index.mapper.MapperException; import org.opensearch.index.seqno.ReplicationTracker; @@ -355,10 +356,12 @@ public void cleanFiles( try { store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetadata); - // If Segment Replication is enabled, we need to reuse the primary's translog UUID already stored in the index. - // With Segrep, replicas should never create their own commit points. This ensures the index and xlog share the same - // UUID without the extra step to associate the index with a new xlog. - if (indexShard.indexSettings().isSegRepEnabled()) { + // Replicas for segment replication or remote snapshot indices do not create + // their own commit points and therefore do not modify the commit user data + // in their store. In these cases, reuse the primary's translog UUID. + final boolean reuseTranslogUUID = indexShard.indexSettings().isSegRepEnabled() + || IndexModule.Type.REMOTE_SNAPSHOT.match(indexShard.indexSettings()); + if (reuseTranslogUUID) { final String translogUUID = store.getMetadata().getCommitUserData().get(TRANSLOG_UUID_KEY); Translog.createEmptyTranslog( indexShard.shardPath().resolveTranslog(), diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestGetDecommissionStateAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestGetDecommissionStateAction.java index 8bc89ebf37960..5d72adbd6ae08 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestGetDecommissionStateAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestGetDecommissionStateAction.java @@ -30,7 +30,7 @@ public class RestGetDecommissionStateAction extends BaseRestHandler { @Override public List routes() { - return singletonList(new Route(GET, "/_cluster/decommission/awareness/_status")); + return singletonList(new Route(GET, "/_cluster/decommission/awareness/{awareness_attribute_name}/_status")); } @Override @@ -41,6 +41,8 @@ public String getName() { @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { GetDecommissionStateRequest getDecommissionStateRequest = Requests.getDecommissionStateRequest(); - return channel -> client.admin().cluster().getDecommission(getDecommissionStateRequest, new RestToXContentListener<>(channel)); + String attributeName = request.param("awareness_attribute_name"); + getDecommissionStateRequest.attributeName(attributeName); + return channel -> client.admin().cluster().getDecommissionState(getDecommissionStateRequest, new RestToXContentListener<>(channel)); } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateRequestTests.java new file mode 100644 index 0000000000000..973485e1917f7 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateRequestTests.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.decommission.awareness.get; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +public class GetDecommissionStateRequestTests extends OpenSearchTestCase { + public void testSerialization() throws IOException { + String attributeName = "zone"; + final GetDecommissionStateRequest originalRequest = new GetDecommissionStateRequest(attributeName); + final GetDecommissionStateRequest deserialized = copyWriteable( + originalRequest, + writableRegistry(), + GetDecommissionStateRequest::new + ); + assertEquals(deserialized.attributeName(), originalRequest.attributeName()); + } + + public void testValidation() { + { + String attributeName = null; + final GetDecommissionStateRequest request = new GetDecommissionStateRequest(attributeName); + ActionRequestValidationException e = request.validate(); + assertNotNull(e); + assertTrue(e.getMessage().contains("attribute name is missing")); + } + { + String attributeName = ""; + final GetDecommissionStateRequest request = new GetDecommissionStateRequest(attributeName); + ActionRequestValidationException e = request.validate(); + assertNotNull(e); + assertTrue(e.getMessage().contains("attribute name is missing")); + } + { + String attributeName = "zone"; + final GetDecommissionStateRequest request = new GetDecommissionStateRequest(attributeName); + ActionRequestValidationException e = request.validate(); + assertNull(e); + } + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateResponseTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateResponseTests.java index 97bc54d8d7b30..437faf2a75720 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateResponseTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateResponseTests.java @@ -8,7 +8,6 @@ package org.opensearch.action.admin.cluster.decommission.awareness.get; -import org.opensearch.cluster.decommission.DecommissionAttribute; import org.opensearch.cluster.decommission.DecommissionStatus; import org.opensearch.common.xcontent.XContentParser; import org.opensearch.test.AbstractXContentTestCase; @@ -18,11 +17,13 @@ public class GetDecommissionStateResponseTests extends AbstractXContentTestCase { @Override protected GetDecommissionStateResponse createTestInstance() { - DecommissionStatus status = randomFrom(DecommissionStatus.values()); - String attributeName = randomAlphaOfLength(10); - String attributeValue = randomAlphaOfLength(10); - DecommissionAttribute decommissionAttribute = new DecommissionAttribute(attributeName, attributeValue); - return new GetDecommissionStateResponse(decommissionAttribute, status); + DecommissionStatus status = null; + String attributeValue = null; + if (randomBoolean()) { + status = randomFrom(DecommissionStatus.values()); + attributeValue = randomAlphaOfLength(10); + } + return new GetDecommissionStateResponse(attributeValue, status); } @Override diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java index d115ee0c515cc..ae10a92a5104e 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java @@ -10,6 +10,8 @@ import org.opensearch.cluster.OpenSearchAllocationTestCase; import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.opensearch.cluster.routing.allocation.allocator.LocalShardsBalancer; +import org.opensearch.cluster.routing.allocation.allocator.ShardsBalancer; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -45,7 +47,7 @@ public void testSettings() { * for IndexShardPerNode constraint satisfied and breached. */ public void testIndexShardsPerNodeConstraint() { - BalancedShardsAllocator.Balancer balancer = mock(BalancedShardsAllocator.Balancer.class); + ShardsBalancer balancer = mock(LocalShardsBalancer.class); BalancedShardsAllocator.ModelNode node = mock(BalancedShardsAllocator.ModelNode.class); AllocationConstraints constraints = new AllocationConstraints(); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalancedSingleShardTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalancedSingleShardTests.java index a7b53a4c4bc8b..d29249cef0818 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalancedSingleShardTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalancedSingleShardTests.java @@ -43,7 +43,7 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; -import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer; +import org.opensearch.cluster.routing.allocation.allocator.ShardsBalancer; import org.opensearch.cluster.routing.allocation.decider.AllocationDecider; import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.Decision; @@ -65,7 +65,7 @@ import static org.hamcrest.Matchers.startsWith; /** - * Tests for balancing a single shard, see {@link Balancer#decideRebalance(ShardRouting)}. + * Tests for balancing a single shard, see {@link ShardsBalancer#decideRebalance(ShardRouting)}. */ public class BalancedSingleShardTests extends OpenSearchAllocationTestCase { diff --git a/test/fixtures/hdfs-fixture/build.gradle b/test/fixtures/hdfs-fixture/build.gradle index d01db6376db42..03455adc8033d 100644 --- a/test/fixtures/hdfs-fixture/build.gradle +++ b/test/fixtures/hdfs-fixture/build.gradle @@ -35,7 +35,9 @@ group = 'hdfs' dependencies { api("org.apache.hadoop:hadoop-minicluster:3.3.4") { exclude module: 'websocket-client' + exclude module: 'jettison' } + api "org.codehaus.jettison:jettison:${versions.jettison}" api "org.apache.commons:commons-compress:1.21" api "commons-codec:commons-codec:${versions.commonscodec}" api "org.apache.logging.log4j:log4j-core:${versions.log4j}"