diff --git a/.github/ISSUE_TEMPLATE/failed_check.md b/.github/ISSUE_TEMPLATE/failed_check.md index 24a323b7dd0f7..86449bbfad36f 100644 --- a/.github/ISSUE_TEMPLATE/failed_check.md +++ b/.github/ISSUE_TEMPLATE/failed_check.md @@ -1,9 +1,8 @@ --- -title: '[AUTOCUT] Gradle Check Failure.' +title: '[AUTOCUT] Gradle Check Failure on push to {{ env.branch_name }}' labels: '>test-failure, bug' --- -A gradle check workflow has failed after merge. - -PR: {{ env.workflow_url }} -CommitId: {{ env.pr_from_sha }} +Gradle check has failed on push of your commit {{ env.pr_from_sha }}. +Please examine the workflow log {{ env.workflow_url }}. +Is the failure [a flaky test](https://github.com/opensearch-project/OpenSearch/blob/main/DEVELOPER_GUIDE.md#flaky-tests) unrelated to your change? diff --git a/.github/workflows/gradle-check.yml b/.github/workflows/gradle-check.yml index 2cfcfa8d9d1f9..ab0e4a6c94252 100644 --- a/.github/workflows/gradle-check.yml +++ b/.github/workflows/gradle-check.yml @@ -41,6 +41,7 @@ jobs: repo_url="https://github.com/opensearch-project/OpenSearch" ref_id=$(git rev-parse HEAD) branch_name=$(git rev-parse --abbrev-ref HEAD) + echo "$branch_name=$branch_name" >> $GITHUB_ENV echo "pr_from_sha=$ref_id" >> $GITHUB_ENV echo "pr_from_clone_url=$repo_url" >> $GITHUB_ENV echo "pr_to_clone_url=$repo_url" >> $GITHUB_ENV diff --git a/libs/common/src/main/java/org/opensearch/common/collect/Triplet.java b/libs/common/src/main/java/org/opensearch/common/collect/Triplet.java new file mode 100644 index 0000000000000..9b7e8f6a419a6 --- /dev/null +++ b/libs/common/src/main/java/org/opensearch/common/collect/Triplet.java @@ -0,0 +1,63 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.common.collect; + +import java.util.Objects; + +/** + * A container for 3 elements, similar to {@link org.opensearch.common.collect.Tuple} + * + * @opensearch.internal + */ +public class Triplet { + + public static Triplet tuple(V1 v1, V2 v2, V3 v3) { + return new Triplet<>(v1, v2, v3); + } + + private final V1 v1; + private final V2 v2; + + private final V3 v3; + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Triplet triplet = (Triplet) o; + return Objects.equals(v1, triplet.v1) && Objects.equals(v2, triplet.v2) && Objects.equals(v3, triplet.v3); + } + + @Override + public int hashCode() { + return Objects.hash(v1, v2, v3); + } + + public Triplet(V1 v1, V2 v2, V3 v3) { + this.v1 = v1; + this.v2 = v2; + this.v3 = v3; + } + + public V1 v1() { + return v1; + } + + public V2 v2() { + return v2; + } + + public V3 v3() { + return v3; + } + + @Override + public String toString() { + return "Tuple [v1=" + v1 + ", v2=" + v2 + ", v3=" + v3 + "]"; + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java new file mode 100644 index 0000000000000..7864bc4668cd5 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java @@ -0,0 +1,307 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexModule; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.ArrayList; +import java.util.Formatter; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationAllocationIT extends SegmentReplicationBaseIT { + + private void createIndex(String idxName, int shardCount, int replicaCount, boolean isSegRep) { + Settings.Builder builder = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, shardCount) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT); + if (isSegRep) { + builder = builder.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); + } else { + builder = builder.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT); + } + prepareCreate(idxName, builder).get(); + } + + /** + * This test verifies primary shard allocation is balanced. + */ + public void testShardAllocation() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final int maxReplicaCount = 2; + final int maxShardCount = 5; + final int nodeCount = randomIntBetween(maxReplicaCount + 1, 10); + final int numberOfIndices = randomIntBetween(5, 10); + + final List nodeNames = new ArrayList<>(); + logger.info("--> Creating {} nodes", nodeCount); + for (int i = 0; i < nodeCount; i++) { + nodeNames.add(internalCluster().startNode()); + } + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings( + Settings.builder().put(BalancedShardsAllocator.PRIMARY_SHARD_BALANCE_FACTOR_SETTING.getKey(), "1.0f") + ) + ); + + int shardCount, replicaCount, totalShardCount = 0, totalReplicaCount = 0; + ShardAllocations shardAllocations = new ShardAllocations(); + ClusterState state; + for (int i = 0; i < numberOfIndices; i++) { + shardCount = randomIntBetween(1, maxShardCount); + totalShardCount += shardCount; + replicaCount = randomIntBetween(0, maxReplicaCount); + totalReplicaCount += replicaCount; + createIndex("test" + i, shardCount, replicaCount, i % 2 == 0); + logger.info("--> Creating index {} with shard count {} and replica count {}", "test" + i, shardCount, replicaCount); + assertBusy(() -> ensureGreen(), 60, TimeUnit.SECONDS); + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + shardAllocations.printShardDistribution(state); + } + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + RoutingNodes nodes = state.getRoutingNodes(); + final float avgNumShards = (float) (totalShardCount) / (float) (nodes.size()); + final int minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards - 1.0f))); + final int maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards + 1.0f))); + + for (RoutingNode node : nodes) { + assertTrue(node.primaryShardsWithState(STARTED).size() >= minAvgNumberOfShards); + assertTrue(node.primaryShardsWithState(STARTED).size() <= maxAvgNumberOfShards); + } + } + + /** + * This test verifies shard allocation with changes to cluster config i.e. node add, removal keeps the primary shard + * allocation balanced. + */ + public void testAllocationWithDisruption() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final int maxReplicaCount = 2; + final int maxShardCount = 5; + final int nodeCount = randomIntBetween(maxReplicaCount + 1, 10); + final int numberOfIndices = randomIntBetween(1, 10); + + logger.info("--> Creating {} nodes", nodeCount); + final List nodeNames = new ArrayList<>(); + for (int i = 0; i < nodeCount; i++) { + nodeNames.add(internalCluster().startNode()); + } + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings( + Settings.builder() + .put(BalancedShardsAllocator.PRIMARY_SHARD_BALANCE_FACTOR_SETTING.getKey(), "1.0f") + .put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), "0.0f") + .put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), "0.0f") + .build() + ) + ); + + int shardCount, replicaCount, totalShardCount = 0, totalReplicaCount = 0; + ShardAllocations shardAllocations = new ShardAllocations(); + ClusterState state; + for (int i = 0; i < numberOfIndices; i++) { + shardCount = randomIntBetween(1, maxShardCount); + totalShardCount += shardCount; + replicaCount = randomIntBetween(1, maxReplicaCount); + totalReplicaCount += replicaCount; + logger.info("--> Creating index test{} with primary {} and replica {}", i, shardCount, replicaCount); + createIndex("test" + i, shardCount, replicaCount, i % 2 == 0); + assertBusy(() -> ensureGreen(), 60, TimeUnit.SECONDS); + if (logger.isTraceEnabled()) { + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + shardAllocations.printShardDistribution(state); + } + } + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + float avgNumShards = (float) (totalShardCount) / (float) (state.getRoutingNodes().size()); + int minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards - 1.0f))); + int maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards + 1.0f))); + + for (RoutingNode node : state.getRoutingNodes()) { + assertTrue(node.primaryShardsWithState(STARTED).size() >= minAvgNumberOfShards); + assertTrue(node.primaryShardsWithState(STARTED).size() <= maxAvgNumberOfShards); + } + + final int additionalNodeCount = randomIntBetween(1, 5); + logger.info("--> Adding {} nodes", additionalNodeCount); + + internalCluster().startNodes(additionalNodeCount); + assertBusy(() -> ensureGreen(), 60, TimeUnit.SECONDS); + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + avgNumShards = (float) (totalShardCount) / (float) (state.getRoutingNodes().size()); + minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards - 1.0f))); + maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards + 1.0f))); + shardAllocations.printShardDistribution(state); + for (RoutingNode node : state.getRoutingNodes()) { + assertTrue(node.primaryShardsWithState(STARTED).size() >= minAvgNumberOfShards); + assertTrue(node.primaryShardsWithState(STARTED).size() <= maxAvgNumberOfShards); + } + + logger.info("--> Stop one third nodes"); + for (int i = 1; i < nodeCount; i += 3) { + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeNames.get(i))); + // give replica a chance to promote as primary before terminating node containing the replica + assertBusy(() -> ensureGreen(), 60, TimeUnit.SECONDS); + } + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + avgNumShards = (float) (totalShardCount) / (float) (state.getRoutingNodes().size()); + minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards - 1.0f))); + maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards + 1.0f))); + shardAllocations.printShardDistribution(state); + + for (RoutingNode node : state.getRoutingNodes()) { + assertTrue(node.primaryShardsWithState(STARTED).size() >= minAvgNumberOfShards); + assertTrue(node.primaryShardsWithState(STARTED).size() <= maxAvgNumberOfShards); + } + } + + /** + * This class is created for debugging purpose to show shard allocation across nodes. It keeps cluster state which + * is used to build the node's shard allocation + */ + private class ShardAllocations { + ClusterState state; + + public static final String separator = "==================================================="; + public static final String ONE_LINE_RETURN = "\n"; + public static final String TWO_LINE_RETURN = "\n\n"; + + /** + Store shard primary/replica shard count against a node for segrep indices. + String: NodeId + int[]: tuple storing primary shard count in 0th index and replica's in 1 + */ + TreeMap nodeToSegRepCountMap = new TreeMap<>(); + /** + Store shard primary/replica shard count against a node for docrep indices. + String: NodeId + int[]: tuple storing primary shard count in 0th index and replica's in 1 + */ + TreeMap nodeToDocRepCountMap = new TreeMap<>(); + + /** + * Helper map containing NodeName to NodeId + */ + TreeMap nameToNodeId = new TreeMap<>(); + + /* + Unassigned array containing primary at 0, replica at 1 + */ + int[] unassigned = new int[2]; + + int[] totalShards = new int[2]; + + public final String printShardAllocationWithHeader(int[] docrep, int[] segrep) { + StringBuffer sb = new StringBuffer(); + Formatter formatter = new Formatter(sb, Locale.getDefault()); + formatter.format("%-20s %-20s %-20s %-20s\n", "P", docrep[0] + segrep[0], docrep[0], segrep[0]); + formatter.format("%-20s %-20s %-20s %-20s\n", "R", docrep[1] + segrep[1], docrep[1], segrep[1]); + return sb.toString(); + } + + public void reset() { + nodeToSegRepCountMap.clear(); + nodeToDocRepCountMap.clear(); + nameToNodeId.clear(); + totalShards[0] = totalShards[1] = 0; + unassigned[0] = unassigned[1] = 0; + } + + public void setState(ClusterState state) { + this.reset(); + this.state = state; + buildMap(); + } + + private void buildMap() { + for (RoutingNode node : state.getRoutingNodes()) { + nameToNodeId.putIfAbsent(node.node().getName(), node.nodeId()); + nodeToSegRepCountMap.putIfAbsent(node.nodeId(), new int[] { 0, 0 }); + nodeToDocRepCountMap.putIfAbsent(node.nodeId(), new int[] { 0, 0 }); + } + for (ShardRouting shardRouting : state.routingTable().allShards()) { + // Fetch shard to update. Initialize local array + if (isIndexSegRep(shardRouting.getIndexName())) { + updateMap(nodeToSegRepCountMap, shardRouting); + } else { + updateMap(nodeToDocRepCountMap, shardRouting); + } + } + } + + void updateMap(TreeMap mapToUpdate, ShardRouting shardRouting) { + int[] shard; + shard = shardRouting.assignedToNode() ? mapToUpdate.get(shardRouting.currentNodeId()) : unassigned; + // Update shard type count + if (shardRouting.primary()) { + shard[0]++; + totalShards[0]++; + } else { + shard[1]++; + totalShards[1]++; + } + // For assigned shards, put back counter + if (shardRouting.assignedToNode()) mapToUpdate.put(shardRouting.currentNodeId(), shard); + } + + boolean isIndexSegRep(String indexName) { + return state.metadata() + .index(indexName) + .getSettings() + .get(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey()) + .equals(ReplicationType.SEGMENT.toString()); + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append(TWO_LINE_RETURN + separator + ONE_LINE_RETURN); + Formatter formatter = new Formatter(sb, Locale.getDefault()); + for (Map.Entry entry : nameToNodeId.entrySet()) { + String nodeId = nameToNodeId.get(entry.getKey()); + formatter.format("%-20s %-20s %-20s %-20s\n", entry.getKey().toUpperCase(Locale.getDefault()), "TOTAL", "DOCREP", "SEGREP"); + sb.append(printShardAllocationWithHeader(nodeToDocRepCountMap.get(nodeId), nodeToSegRepCountMap.get(nodeId))); + } + sb.append(ONE_LINE_RETURN); + formatter.format("%-20s %-20s %-20s\n\n", "Unassigned ", unassigned[0], unassigned[1]); + formatter.format("%-20s %-20s %-20s\n\n", "Total Shards", totalShards[0], totalShards[1]); + return sb.toString(); + } + + public void printShardDistribution(ClusterState state) { + this.setState(state); + logger.info("--> Shard distribution {}", this); + } + } + +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index a1b40cfc66061..a8ecbba2cca56 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -8,28 +8,41 @@ package org.opensearch.remotestore; +import org.junit.After; +import org.junit.Before; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.IndexModule; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.plugins.Plugin; +import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; +import java.io.IOException; import java.nio.file.Path; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteStoreIT extends OpenSearchIntegTestCase { private static final String REPOSITORY_NAME = "test-remore-store-repo"; private static final String INDEX_NAME = "remote-store-test-idx-1"; - protected static final int SHARD_COUNT = 1; - protected static final int REPLICA_COUNT = 1; + private static final String TOTAL_OPERATIONS = "total-operations"; + private static final String REFRESHED_OR_FLUSHED_OPERATIONS = "refreshed-or-flushed-operations"; + private static final String MAX_SEQ_NO_TOTAL = "max-seq-no-total"; + private static final String MAX_SEQ_NO_REFRESHED_OR_FLUSHED = "max-seq-no-refreshed-or-flushed"; @Override protected Collection> nodePlugins() { @@ -38,10 +51,15 @@ protected Collection> nodePlugins() { @Override public Settings indexSettings() { + return remoteStoreIndexSettings(0); + } + + private Settings remoteStoreIndexSettings(int numberOfReplicas) { return Settings.builder() .put(super.indexSettings()) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) + .put("index.refresh_interval", "300s") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) @@ -49,6 +67,14 @@ public Settings indexSettings() { .build(); } + private Settings remoteTranslogIndexSettings(int numberOfReplicas) { + return Settings.builder() + .put(remoteStoreIndexSettings(numberOfReplicas)) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME) + .build(); + } + @Override protected boolean addMockInternalEngine() { return false; @@ -63,18 +89,95 @@ protected Settings featureFlagSettings() { .build(); } - // This is a dummy test to check if create index flow is working as expected. - // As index creation is pre-requisite for each integration test in this class, once we add more integ tests, - // we can remove this test. - public void testIndexCreation() { - internalCluster().startNode(); - + @Before + public void setup() { Path absolutePath = randomRepoPath().toAbsolutePath(); assertAcked( clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath)) ); + } + + @After + public void teardown() { + assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + } + + private IndexResponse indexSingleDoc() { + return client().prepareIndex(INDEX_NAME) + .setId(UUIDs.randomBase64UUID()) + .setSource(randomAlphaOfLength(5), randomAlphaOfLength(5)) + .get(); + } + + private Map indexData() { + long totalOperations = 0; + long refreshedOrFlushedOperations = 0; + long maxSeqNo = 0; + long maxSeqNoRefreshedOrFlushed = 0; + for (int i = 0; i < randomIntBetween(1, 10); i++) { + if (randomBoolean()) { + flush(INDEX_NAME); + } else { + refresh(INDEX_NAME); + } + maxSeqNoRefreshedOrFlushed = maxSeqNo; + refreshedOrFlushedOperations = totalOperations; + int numberOfOperations = randomIntBetween(20, 50); + for (int j = 0; j < numberOfOperations; j++) { + IndexResponse response = indexSingleDoc(); + maxSeqNo = response.getSeqNo(); + } + totalOperations += numberOfOperations; + } + Map indexingStats = new HashMap<>(); + indexingStats.put(TOTAL_OPERATIONS, totalOperations); + indexingStats.put(REFRESHED_OR_FLUSHED_OPERATIONS, refreshedOrFlushedOperations); + indexingStats.put(MAX_SEQ_NO_TOTAL, maxSeqNo); + indexingStats.put(MAX_SEQ_NO_REFRESHED_OR_FLUSHED, maxSeqNoRefreshedOrFlushed); + return indexingStats; + } - createIndex(INDEX_NAME); + private void verifyRestoredData(Map indexStats, boolean checkTotal) { + String statsGranularity = checkTotal ? TOTAL_OPERATIONS : REFRESHED_OR_FLUSHED_OPERATIONS; + String maxSeqNoGranularity = checkTotal ? MAX_SEQ_NO_TOTAL : MAX_SEQ_NO_REFRESHED_OR_FLUSHED; ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(statsGranularity)); + IndexResponse response = indexSingleDoc(); + assertEquals(indexStats.get(maxSeqNoGranularity) + 1, response.getSeqNo()); + refresh(INDEX_NAME); + assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(statsGranularity) + 1); + } + + public void testRemoteStoreRestoreFromRemoteSegmentStore() throws IOException { + internalCluster().startNodes(3); + createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + Map indexStats = indexData(); + + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME))); + assertAcked(client().admin().indices().prepareClose(INDEX_NAME)); + + client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture()); + + verifyRestoredData(indexStats, false); + } + + public void testRemoteTranslogRestore() throws IOException { + internalCluster().startNodes(3); + createIndex(INDEX_NAME, remoteTranslogIndexSettings(0)); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + Map indexStats = indexData(); + + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME))); + assertAcked(client().admin().indices().prepareClose(INDEX_NAME)); + + client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture()); + + verifyRestoredData(indexStats, true); } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java index 413ddff72f7a5..d23b4856eced0 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java @@ -329,6 +329,23 @@ public List shardsWithState(ShardRoutingState... states) { return shards; } + /** + * Determine the primary shards of an index with a specific state + * @param states set of states which should be listed + * @return a list of shards + */ + public List primaryShardsWithState(ShardRoutingState... states) { + List shards = new ArrayList<>(); + for (ShardRouting shardEntry : this) { + for (ShardRoutingState state : states) { + if (shardEntry.state() == state && shardEntry.primary() == true) { + shards.add(shardEntry); + } + } + } + return shards; + } + /** * Determine the shards of an index with a specific state * @param index id of the index diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java index 4f74304471991..b14f0f8eb3c35 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java @@ -202,7 +202,8 @@ private IndexMetadata.Builder updateInSyncAllocations( if (recoverySource == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE) { allocationId = RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID; } else { - assert recoverySource instanceof RecoverySource.SnapshotRecoverySource : recoverySource; + assert (recoverySource instanceof RecoverySource.SnapshotRecoverySource + || recoverySource instanceof RecoverySource.RemoteStoreRecoverySource) : recoverySource; allocationId = updates.initializedPrimary.allocationId().getId(); } // forcing a stale primary resets the in-sync allocations to the singleton set with the stale id diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index d8761e9b1a78e..8893aaa54799a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -107,6 +107,14 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.NodeScope ); + public static final Setting PRIMARY_SHARD_BALANCE_FACTOR_SETTING = Setting.floatSetting( + "cluster.routing.allocation.balance.primary", + 0.0f, + 0.0f, + Property.Dynamic, + Property.NodeScope + ); + private volatile boolean movePrimaryFirst; private volatile WeightFunction weightFunction; private volatile float threshold; @@ -117,10 +125,19 @@ public BalancedShardsAllocator(Settings settings) { @Inject public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) { - setWeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings)); + setWeightFunction( + INDEX_BALANCE_FACTOR_SETTING.get(settings), + SHARD_BALANCE_FACTOR_SETTING.get(settings), + PRIMARY_SHARD_BALANCE_FACTOR_SETTING.get(settings) + ); setThreshold(THRESHOLD_SETTING.get(settings)); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst); - clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, SHARD_BALANCE_FACTOR_SETTING, this::setWeightFunction); + clusterSettings.addSettingsUpdateConsumer( + INDEX_BALANCE_FACTOR_SETTING, + SHARD_BALANCE_FACTOR_SETTING, + PRIMARY_SHARD_BALANCE_FACTOR_SETTING, + this::setWeightFunction + ); clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); } @@ -128,8 +145,8 @@ private void setMovePrimaryFirst(boolean movePrimaryFirst) { this.movePrimaryFirst = movePrimaryFirst; } - private void setWeightFunction(float indexBalance, float shardBalanceFactor) { - weightFunction = new WeightFunction(indexBalance, shardBalanceFactor); + private void setWeightFunction(float indexBalance, float shardBalanceFactor, float primaryShardBalance) { + weightFunction = new WeightFunction(indexBalance, shardBalanceFactor, primaryShardBalance); } private void setThreshold(float threshold) { @@ -252,17 +269,22 @@ static class WeightFunction { private final float shardBalance; private final float theta0; private final float theta1; + private final float theta2; + private final float primaryShardBalance; private AllocationConstraints constraints; - WeightFunction(float indexBalance, float shardBalance) { - float sum = indexBalance + shardBalance; + WeightFunction(float indexBalance, float shardBalance, float primaryShardBalance) { + float sum = indexBalance + shardBalance + primaryShardBalance; if (sum <= 0.0f) { throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); } theta0 = shardBalance / sum; theta1 = indexBalance / sum; + theta2 = primaryShardBalance / sum; + this.indexBalance = indexBalance; this.shardBalance = shardBalance; + this.primaryShardBalance = primaryShardBalance; this.constraints = new AllocationConstraints(); } @@ -274,7 +296,9 @@ public float weightWithAllocationConstraints(ShardsBalancer balancer, ModelNode float weight(ShardsBalancer balancer, ModelNode node, String index) { final float weightShard = node.numShards() - balancer.avgShardsPerNode(); final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index); - return theta0 * weightShard + theta1 * weightIndex; + final float primaryWeightShard = node.numPrimaryShards() - balancer.avgPrimaryShardsPerNode(); + + return theta0 * weightShard + theta1 * weightIndex + theta2 * primaryWeightShard; } } @@ -288,6 +312,8 @@ public static class ModelNode implements Iterable { private int numShards = 0; private final RoutingNode routingNode; + private int primaryNumShards = 0; + ModelNode(RoutingNode routingNode) { this.routingNode = routingNode; } @@ -313,6 +339,10 @@ public int numShards(String idx) { return index == null ? 0 : index.numShards(); } + public int numPrimaryShards() { + return primaryNumShards; + } + public int highestPrimary(String index) { ModelIndex idx = indices.get(index); if (idx != null) { @@ -329,6 +359,9 @@ public void addShard(ShardRouting shard) { } index.addShard(shard); numShards++; + if (shard.primary()) { + primaryNumShards++; + } } public void removeShard(ShardRouting shard) { @@ -339,6 +372,9 @@ public void removeShard(ShardRouting shard) { indices.remove(shard.getIndexName()); } } + if (shard.primary()) { + primaryNumShards--; + } numShards--; } @@ -381,13 +417,14 @@ public Balancer( } /** - * A model index. + * A model index that stores info about specific index * * @opensearch.internal */ static final class ModelIndex implements Iterable { private final String id; private final Set shards = new HashSet<>(4); // expect few shards of same index to be allocated on same node + private final Set primaryShards = new HashSet<>(); private int highestPrimary = -1; ModelIndex(String id) { @@ -415,6 +452,10 @@ public int numShards() { return shards.size(); } + public int numPrimaryShards() { + return primaryShards.size(); + } + @Override public Iterator iterator() { return shards.iterator(); @@ -423,12 +464,20 @@ public Iterator iterator() { public void removeShard(ShardRouting shard) { highestPrimary = -1; assert shards.contains(shard) : "Shard not allocated on current node: " + shard; + if (shard.primary()) { + assert primaryShards.contains(shard) : "Primary shard not allocated on current node: " + shard; + primaryShards.remove(shard); + } shards.remove(shard); } public void addShard(ShardRouting shard) { highestPrimary = -1; - assert !shards.contains(shard) : "Shard already allocated on current node: " + shard; + assert shards.contains(shard) == false : "Shard already allocated on current node: " + shard; + if (shard.primary()) { + assert primaryShards.contains(shard) == false : "Primary shard already allocated on current node: " + shard; + primaryShards.add(shard); + } shards.add(shard); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index 8570a16fd690c..34ad9761b3f1a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -63,6 +63,7 @@ public class LocalShardsBalancer extends ShardsBalancer { private final float threshold; private final Metadata metadata; private final float avgShardsPerNode; + private final float avgPrimaryShardsPerNode; private final BalancedShardsAllocator.NodeSorter sorter; private final Set inEligibleTargetNode; @@ -81,6 +82,8 @@ public LocalShardsBalancer( this.routingNodes = allocation.routingNodes(); this.metadata = allocation.metadata(); avgShardsPerNode = ((float) metadata.getTotalNumberOfShards()) / routingNodes.size(); + final int shardCount = StreamSupport.stream(metadata.spliterator(), false).mapToInt(IndexMetadata::getNumberOfShards).sum(); + avgPrimaryShardsPerNode = (float) shardCount / routingNodes.size(); nodes = Collections.unmodifiableMap(buildModelFromAssigned()); sorter = newNodeSorter(); inEligibleTargetNode = new HashSet<>(); @@ -101,6 +104,11 @@ public float avgShardsPerNode(String index) { return ((float) metadata.index(index).getTotalNumberOfShards()) / nodes.size(); } + @Override + public float avgPrimaryShardsPerNode() { + return avgPrimaryShardsPerNode; + } + /** * Returns the global average of shards per node */ diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java index 593e6998141fb..9774f84a4cd91 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java @@ -60,16 +60,23 @@ public abstract class ShardsBalancer { abstract MoveDecision decideRebalance(ShardRouting shardRouting); /** - * Returns the average of shards per node for the given index + * Returns the average of shards per node */ public float avgShardsPerNode() { return Float.MAX_VALUE; } /** - * Returns the global average of shards per node + * Returns the global average of shards per node for the given index */ public float avgShardsPerNode(String index) { return Float.MAX_VALUE; } + + /** + * Returns the average of primary shards per node + */ + public float avgPrimaryShardsPerNode() { + return Float.MAX_VALUE; + } } diff --git a/server/src/main/java/org/opensearch/common/TriConsumer.java b/server/src/main/java/org/opensearch/common/TriConsumer.java index f98276b6d007d..a174499d58628 100644 --- a/server/src/main/java/org/opensearch/common/TriConsumer.java +++ b/server/src/main/java/org/opensearch/common/TriConsumer.java @@ -32,6 +32,8 @@ package org.opensearch.common; +import java.util.Objects; + /** * Represents an operation that accepts three arguments and returns no result. * @@ -50,5 +52,14 @@ public interface TriConsumer { * @param t the second function argument * @param u the third function argument */ - void apply(S s, T t, U u); + void accept(S s, T t, U u); + + default TriConsumer andThen(TriConsumer after) { + Objects.requireNonNull(after); + + return (l, r, s) -> { + accept(l, r, s); + after.accept(l, r, s); + }; + } } diff --git a/server/src/main/java/org/opensearch/common/settings/AbstractScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/AbstractScopedSettings.java index 8a19d309975df..50f6714ec41b9 100644 --- a/server/src/main/java/org/opensearch/common/settings/AbstractScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/AbstractScopedSettings.java @@ -38,6 +38,7 @@ import org.apache.lucene.search.spell.LevenshteinDistance; import org.apache.lucene.util.CollectionUtil; import org.opensearch.ExceptionsHelper; +import org.opensearch.common.TriConsumer; import org.opensearch.common.collect.Tuple; import org.opensearch.common.regex.Regex; @@ -444,6 +445,28 @@ public synchronized void addSettingsUpdateConsumer(Setting a, Setting< addSettingsUpdateConsumer(a, b, consumer, (i, j) -> {}); } + /** + * Adds a settings consumer that accepts the values for three settings. The consumer is only notified if any one of + * the settings changed and if the provided validator succeeded. + *

+ * Note: Only settings registered in {@link SettingsModule} can be changed dynamically. + *

+ * This method registers a compound updater that is useful if three settings depends on each other. + * The consumer is always provided with both values even if only one of the two changes. + */ + public synchronized void addSettingsUpdateConsumer(Setting
a, Setting b, Setting c, TriConsumer consumer) { + if (a != get(a.getKey())) { + throw new IllegalArgumentException("Setting is not registered for key [" + a.getKey() + "]"); + } + if (b != get(b.getKey())) { + throw new IllegalArgumentException("Setting is not registered for key [" + b.getKey() + "]"); + } + if (c != get(c.getKey())) { + throw new IllegalArgumentException("Setting is not registered for key [" + c.getKey() + "]"); + } + addSettingsUpdater(Setting.compoundUpdater(consumer, (i, j, k) -> {}, a, b, c, logger)); + } + /** * Adds a settings consumer that accepts the values for two settings. The consumer is only notified if one or both settings change * and if the provided validator succeeded. diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index e616521cdf4f3..f1a802c8be168 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -230,6 +230,7 @@ public void apply(Settings value, Settings current, Settings previous) { AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING, BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING, + BalancedShardsAllocator.PRIMARY_SHARD_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING, BalancedShardsAllocator.THRESHOLD_SETTING, BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING, diff --git a/server/src/main/java/org/opensearch/common/settings/Setting.java b/server/src/main/java/org/opensearch/common/settings/Setting.java index 26a60e24b86b2..9e57f321cbca4 100644 --- a/server/src/main/java/org/opensearch/common/settings/Setting.java +++ b/server/src/main/java/org/opensearch/common/settings/Setting.java @@ -39,6 +39,8 @@ import org.opensearch.common.Booleans; import org.opensearch.common.Nullable; import org.opensearch.common.Strings; +import org.opensearch.common.TriConsumer; +import org.opensearch.common.collect.Triplet; import org.opensearch.common.collect.Tuple; import org.opensearch.common.regex.Regex; import org.opensearch.common.unit.ByteSizeValue; @@ -722,6 +724,60 @@ public String toString() { }; } + /** + * Updates settings that depend on each other. + * + * See {@link AbstractScopedSettings#addSettingsUpdateConsumer(Setting, Setting, Setting, TriConsumer)} and its usage for details. + */ + static AbstractScopedSettings.SettingUpdater> compoundUpdater( + final TriConsumer consumer, + final TriConsumer validator, + final Setting aSetting, + final Setting bSetting, + final Setting cSetting, + Logger logger + ) { + final AbstractScopedSettings.SettingUpdater aSettingUpdater = aSetting.newUpdater(null, logger); + final AbstractScopedSettings.SettingUpdater bSettingUpdater = bSetting.newUpdater(null, logger); + final AbstractScopedSettings.SettingUpdater cSettingUpdater = cSetting.newUpdater(null, logger); + return new AbstractScopedSettings.SettingUpdater>() { + @Override + public boolean hasChanged(Settings current, Settings previous) { + return aSettingUpdater.hasChanged(current, previous) + || bSettingUpdater.hasChanged(current, previous) + || cSettingUpdater.hasChanged(current, previous); + } + + @Override + public Triplet getValue(Settings current, Settings previous) { + A valueA = aSettingUpdater.getValue(current, previous); + B valueB = bSettingUpdater.getValue(current, previous); + C valueC = cSettingUpdater.getValue(current, previous); + validator.accept(valueA, valueB, valueC); + return new Triplet<>(valueA, valueB, valueC); + } + + @Override + public void apply(Triplet value, Settings current, Settings previous) { + if (aSettingUpdater.hasChanged(current, previous)) { + logSettingUpdate(aSetting, current, previous, logger); + } + if (bSettingUpdater.hasChanged(current, previous)) { + logSettingUpdate(bSetting, current, previous, logger); + } + if (cSettingUpdater.hasChanged(current, previous)) { + logSettingUpdate(cSetting, current, previous, logger); + } + consumer.accept(value.v1(), value.v2(), value.v3()); + } + + @Override + public String toString() { + return "CompoundUpdater for: " + aSettingUpdater + " and " + bSettingUpdater + " and " + cSettingUpdater; + } + }; + } + static AbstractScopedSettings.SettingUpdater groupedSettingsUpdater( Consumer consumer, final List> configuredSettings diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java index 500d06b3ccc3d..8118760cd1bb2 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java @@ -565,6 +565,22 @@ private ExtensionsSettings readFromExtensionsYml(Path filePath) throws IOExcepti List> unreadExtensions = new ArrayList<>((Collection>) obj.get("extensions")); List readExtensions = new ArrayList(); for (HashMap extensionMap : unreadExtensions) { + // Parse extension dependencies + List extensionDependencyList = new ArrayList(); + if (extensionMap.get("dependencies") != null) { + List> extensionDependencies = new ArrayList<>( + (Collection>) extensionMap.get("dependencies") + ); + for (HashMap dependency : extensionDependencies) { + extensionDependencyList.add( + new ExtensionDependency( + dependency.get("uniqueId").toString(), + Version.fromString(dependency.get("version").toString()) + ) + ); + } + } + // Create extension read from yml config readExtensions.add( new Extension( extensionMap.get("name").toString(), @@ -573,7 +589,8 @@ private ExtensionsSettings readFromExtensionsYml(Path filePath) throws IOExcepti extensionMap.get("port").toString(), extensionMap.get("version").toString(), extensionMap.get("opensearchVersion").toString(), - extensionMap.get("minimumCompatibleVersion").toString() + extensionMap.get("minimumCompatibleVersion").toString(), + extensionDependencyList ) ); } diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionsSettings.java b/server/src/main/java/org/opensearch/extensions/ExtensionsSettings.java index 51815a002d1df..fd11aec973d42 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionsSettings.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionsSettings.java @@ -52,7 +52,8 @@ public Extension( String port, String version, String opensearchVersion, - String minimumCompatibleVersion + String minimumCompatibleVersion, + List dependencies ) { this.name = name; this.uniqueId = uniqueId; @@ -61,6 +62,7 @@ public Extension( this.version = version; this.opensearchVersion = opensearchVersion; this.minimumCompatibleVersion = minimumCompatibleVersion; + this.dependencies = dependencies; } public Extension() { diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 3af663d7b41f9..efd2686b41a20 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -179,20 +179,11 @@ private boolean isRefreshAfterCommit() throws IOException { } String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos segmentInfosSnapshot) throws IOException { - // We use lastRefreshedCheckpoint as local checkpoint for the SegmentInfosSnapshot. This is better than using - // getProcessedLocalCheckpoint() as processedCheckpoint can advance between reading the value and setting up - // in SegmentInfos.userData. This could lead to data loss as, during recovery, translog will be replayed based on - // LOCAL_CHECKPOINT_KEY. - // lastRefreshedCheckpoint is updated after refresh listeners are executed, this means, InternalEngine.lastRefreshedCheckpoint() - // will return checkpoint of last refresh but that does not impact the correctness as duplicate sequence numbers - // will not be replayed. - assert indexShard.getEngine() instanceof InternalEngine : "Expected shard with InternalEngine, got: " - + indexShard.getEngine().getClass(); - final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); + final long maxSeqNoFromSegmentInfos = indexShard.getEngine().getMaxSeqNoFromSegmentInfos(segmentInfosSnapshot); Map userData = segmentInfosSnapshot.getUserData(); - userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(lastRefreshedCheckpoint)); - userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(lastRefreshedCheckpoint)); + userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(maxSeqNoFromSegmentInfos)); + userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNoFromSegmentInfos)); segmentInfosSnapshot.setUserData(userData, false); long commitGeneration = SegmentInfos.generationFromSegmentsFileName(latestSegmentsNFilename); diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 0cfaa5234c1fe..a3a6eba39e126 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -8,6 +8,9 @@ package org.opensearch.index.translog; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.action.ActionListener; +import org.opensearch.common.SetOnce; import org.opensearch.common.io.FileSystemUtils; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; @@ -26,12 +29,15 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; import java.util.function.LongSupplier; +import java.util.stream.Collectors; +import java.util.stream.LongStream; /** * A Translog implementation which syncs local FS with a remote store @@ -51,6 +57,12 @@ public class RemoteFsTranslog extends Translog { private volatile long minSeqNoToKeep; + // min generation referred by last uploaded translog + private volatile long minRemoteGenReferenced; + + // clean up translog folder uploaded by previous primaries once + private final SetOnce olderPrimaryCleaned = new SetOnce<>(); + public RemoteFsTranslog( TranslogConfig config, String translogUUID, @@ -230,6 +242,7 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOExcepti transferReleasable.close(); closeFilesIfNoPendingRetentionLocks(); maxRemoteTranslogGenerationUploaded = generation; + minRemoteGenReferenced = getMinFileGeneration(); logger.trace("uploaded translog for {} {} ", primaryTerm, generation); } @@ -327,13 +340,72 @@ protected void setMinSeqNoToKeep(long seqNo) { this.minSeqNoToKeep = seqNo; } - @Override - void deleteReaderFiles(TranslogReader reader) { + private void deleteRemoteGeneration(Set generations) { try { - translogTransferManager.deleteTranslog(primaryTermSupplier.getAsLong(), reader.generation); - } catch (IOException ignored) { - logger.error("Exception {} while deleting generation {}", ignored, reader.generation); + translogTransferManager.deleteTranslogAsync(primaryTermSupplier.getAsLong(), generations); + } catch (IOException e) { + logger.error(() -> new ParameterizedMessage("Exception occurred while deleting generation {}", generations), e); + } + } + + @Override + public void trimUnreferencedReaders() throws IOException { + // clean up local translog files and updates readers + super.trimUnreferencedReaders(); + + // cleans up remote translog files not referenced in latest uploaded metadata. + // This enables us to restore translog from the metadata in case of failover or relocation. + Set generationsToDelete = new HashSet<>(); + for (long generation = minRemoteGenReferenced - 1; generation >= 0; generation--) { + if (fileTransferTracker.uploaded(Translog.getFilename(generation)) == false) { + break; + } + generationsToDelete.add(generation); + } + if (generationsToDelete.isEmpty() == false) { + deleteRemoteGeneration(generationsToDelete); + deleteOlderPrimaryTranslogFilesFromRemoteStore(); + } + } + + /** + * This method must be called only after there are valid generations to delete in trimUnreferencedReaders as it ensures + * implicitly that minimum primary term in latest translog metadata in remote store is the current primary term. + */ + private void deleteOlderPrimaryTranslogFilesFromRemoteStore() { + // The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there + // are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part + // of older primary term. + if (olderPrimaryCleaned.trySet(Boolean.TRUE)) { + logger.info("Cleaning up translog uploaded by previous primaries"); + long minPrimaryTermInMetadata = current.getPrimaryTerm(); + Set primaryTermsInRemote; + try { + primaryTermsInRemote = translogTransferManager.listPrimaryTerms(); + } catch (IOException e) { + logger.error("Exception occurred while getting primary terms from remote store", e); + // If there are exceptions encountered, then we try to delete all older primary terms lesser than the + // minimum referenced primary term in remote translog metadata. + primaryTermsInRemote = LongStream.range(0, current.getPrimaryTerm()).boxed().collect(Collectors.toSet()); + } + // Delete all primary terms that are no more referenced by the metadata file and exists in the + Set primaryTermsToDelete = primaryTermsInRemote.stream() + .filter(term -> term < minPrimaryTermInMetadata) + .collect(Collectors.toSet()); + primaryTermsToDelete.forEach(term -> translogTransferManager.deleteTranslogAsync(term, new ActionListener<>() { + @Override + public void onResponse(Void response) { + // NO-OP + } + + @Override + public void onFailure(Exception e) { + logger.error( + () -> new ParameterizedMessage("Exception occurred while deleting older translog files for primary_term={}", term), + e + ); + } + })); } - super.deleteReaderFiles(reader); } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 78a26baa052ef..08a98a491a035 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -81,8 +81,37 @@ public void deleteBlobs(Iterable path, List fileNames) throws IO blobStore.blobContainer((BlobPath) path).deleteBlobsIgnoringIfNotExists(fileNames); } + @Override + public void deleteBlobsAsync(Iterable path, List fileNames, ActionListener listener) { + executorService.execute(() -> { + try { + blobStore.blobContainer((BlobPath) path).deleteBlobsIgnoringIfNotExists(fileNames); + listener.onResponse(null); + } catch (IOException e) { + listener.onFailure(e); + } + }); + } + + @Override + public void deleteAsync(Iterable path, ActionListener listener) { + executorService.execute(() -> { + try { + blobStore.blobContainer((BlobPath) path).delete(); + listener.onResponse(null); + } catch (IOException e) { + listener.onFailure(e); + } + }); + } + @Override public Set listAll(Iterable path) throws IOException { return blobStore.blobContainer((BlobPath) path).listBlobs().keySet(); } + + @Override + public Set listFolders(Iterable path) throws IOException { + return blobStore.blobContainer((BlobPath) path).children().keySet(); + } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java index 5338142afed33..1909164bd821a 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java @@ -13,6 +13,7 @@ import org.opensearch.index.translog.transfer.listener.FileTransferListener; import java.util.HashSet; +import java.util.List; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -55,9 +56,14 @@ public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { add(fileSnapshot.getName(), TransferState.FAILED); } - @Override - public void onDelete(String name) { - fileTransferTracker.remove(name); + public void delete(List names) { + for (String name : names) { + fileTransferTracker.remove(name); + } + } + + public boolean uploaded(String file) { + return fileTransferTracker.get(file) == TransferState.SUCCESS; } public Set exclusionFilter(Set original) { diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 5745d0838efb3..5ba15ad01d44e 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -45,6 +45,10 @@ void uploadBlobAsync( void deleteBlobs(Iterable path, List fileNames) throws IOException; + void deleteBlobsAsync(Iterable path, List fileNames, ActionListener listener); + + void deleteAsync(Iterable path, ActionListener listener); + /** * Lists the files * @param path : the path to list @@ -53,6 +57,14 @@ void uploadBlobAsync( */ Set listAll(Iterable path) throws IOException; + /** + * Lists the folders inside the path. + * @param path : the path + * @return list of folders inside the path + * @throws IOException the exception while listing folders inside the path + */ + Set listFolders(Iterable path) throws IOException; + /** * * @param path the remote path from where download should be made diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 35ccb4ccf17db..48331f6528606 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -195,14 +195,68 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) ); } - public void deleteTranslog(long primaryTerm, long generation) throws IOException { - String ckpFileName = Translog.getCommitCheckpointFileName(generation); - String translogFilename = Translog.getFilename(generation); - // ToDo - Take care of metadata file cleanup - // https://github.com/opensearch-project/OpenSearch/issues/5677 - fileTransferTracker.onDelete(ckpFileName); - fileTransferTracker.onDelete(translogFilename); - List files = List.of(ckpFileName, translogFilename); - transferService.deleteBlobs(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), files); + /** + * This method handles deletion of multiple generations for a single primary term. + * TODO: Take care of metadata file cleanup. Github Issue #5677 + * + * @param primaryTerm primary term + * @param generations set of generation + */ + public void deleteTranslogAsync(long primaryTerm, Set generations) throws IOException { + if (generations.isEmpty()) { + return; + } + List files = new ArrayList<>(); + generations.forEach(generation -> { + String ckpFileName = Translog.getCommitCheckpointFileName(generation); + String translogFilename = Translog.getFilename(generation); + files.addAll(List.of(ckpFileName, translogFilename)); + }); + transferService.deleteBlobsAsync(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), files, new ActionListener<>() { + @Override + public void onResponse(Void unused) { + fileTransferTracker.delete(files); + } + + @Override + public void onFailure(Exception e) { + logger.error( + () -> new ParameterizedMessage( + "Exception occurred while deleting translog for primary_term={} generations={}", + primaryTerm, + generations + ), + e + ); + } + }); + } + + /** + * Handles deletion of translog files for a particular primary term. + * + * @param primaryTerm primary term + * @param listener listener for response and failure + */ + public void deleteTranslogAsync(long primaryTerm, ActionListener listener) { + transferService.deleteAsync(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), listener); + } + + /** + * Lists all primary terms existing on remote store. + * + * @return the list of primary terms. + * @throws IOException is thrown if it can read the data. + */ + public Set listPrimaryTerms() throws IOException { + return transferService.listFolders(remoteBaseTransferPath).stream().filter(s -> { + try { + Long.parseLong(s); + return true; + } catch (Exception ignored) { + // NO-OP + } + return false; + }).map(Long::parseLong).collect(Collectors.toSet()); } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java b/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java index c489e4b9a5809..af78cb50b63c6 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java @@ -30,5 +30,4 @@ public interface FileTransferListener { */ void onFailure(TransferFileSnapshot fileSnapshot, Exception e); - void onDelete(String name); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/support/MultiTermsValuesSourceConfig.java b/server/src/main/java/org/opensearch/search/aggregations/support/MultiTermsValuesSourceConfig.java index b0c828ca6b902..e1c6ac4150a5d 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/support/MultiTermsValuesSourceConfig.java +++ b/server/src/main/java/org/opensearch/search/aggregations/support/MultiTermsValuesSourceConfig.java @@ -57,7 +57,7 @@ ObjectParser apply( MultiTermsValuesSourceConfig.Builder::new ); - BaseMultiValuesSourceFieldConfig.PARSER.apply(parser, scriptable, timezoneAware); + BaseMultiValuesSourceFieldConfig.PARSER.accept(parser, scriptable, timezoneAware); if (valueTypeHinted) { parser.declareField( diff --git a/server/src/main/java/org/opensearch/search/aggregations/support/MultiValuesSourceFieldConfig.java b/server/src/main/java/org/opensearch/search/aggregations/support/MultiValuesSourceFieldConfig.java index f31d17919e7e3..a0fc23f328364 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/support/MultiValuesSourceFieldConfig.java +++ b/server/src/main/java/org/opensearch/search/aggregations/support/MultiValuesSourceFieldConfig.java @@ -68,7 +68,7 @@ public class MultiValuesSourceFieldConfig extends BaseMultiValuesSourceFieldConf MultiValuesSourceFieldConfig.Builder::new ); - BaseMultiValuesSourceFieldConfig.PARSER.apply(parser, scriptable, timezoneAware); + BaseMultiValuesSourceFieldConfig.PARSER.accept(parser, scriptable, timezoneAware); if (filtered) { parser.declareField( diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java index 1ba69694eaec1..3762e137ac8da 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -75,7 +75,7 @@ public class BalanceConfigurationTests extends OpenSearchAllocationTestCase { public void testIndexBalance() { /* Tests balance over indices only */ final float indexBalance = 1.0f; - final float replicaBalance = 0.0f; + final float shardBalance = 0.0f; final float balanceThreshold = 1.0f; Settings.Builder settings = Settings.builder(); @@ -84,7 +84,7 @@ public void testIndexBalance() { ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString() ); settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalance); - settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), replicaBalance); + settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceThreshold); AllocationService strategy = createAllocationService(settings.build(), new TestGatewayAllocator()); @@ -123,10 +123,148 @@ public void testIndexBalance() { ); } - public void testReplicaBalance() { + /** + * This test verifies that with only primary shard balance, the primary shard distribution is balanced within thresholds. + */ + public void testPrimaryBalance() { + /* Tests balance over primary shards only */ + final float indexBalance = 0.0f; + final float shardBalance = 0.0f; + final float primaryBalance = 1.0f; + final float balanceThreshold = 1.0f; + + Settings.Builder settings = Settings.builder(); + settings.put( + ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), + ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString() + ); + settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalance); + settings.put(BalancedShardsAllocator.PRIMARY_SHARD_BALANCE_FACTOR_SETTING.getKey(), primaryBalance); + settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); + settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceThreshold); + + AllocationService strategy = createAllocationService(settings.build(), new TestGatewayAllocator()); + + ClusterState clusterState = initCluster(strategy); + assertPrimaryBalance( + clusterState.getRoutingTable(), + clusterState.getRoutingNodes(), + numberOfNodes, + numberOfIndices, + numberOfReplicas, + numberOfShards, + balanceThreshold + ); + + clusterState = addNode(clusterState, strategy); + assertPrimaryBalance( + clusterState.getRoutingTable(), + clusterState.getRoutingNodes(), + numberOfNodes + 1, + numberOfIndices, + numberOfReplicas, + numberOfShards, + balanceThreshold + ); + + clusterState = removeNodes(clusterState, strategy); + assertPrimaryBalance( + clusterState.getRoutingTable(), + clusterState.getRoutingNodes(), + (numberOfNodes + 1) - (numberOfNodes + 1) / 2, + numberOfIndices, + numberOfReplicas, + numberOfShards, + balanceThreshold + ); + } + + /** + * This test verifies + */ + public void testBalanceDefaults() { + final float indexBalance = 0.55f; + final float shardBalance = 0.45f; + final float primaryBalance = 0.40f; + final float balanceThreshold = 1.0f; + + Settings.Builder settings = Settings.builder(); + settings.put( + ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), + ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString() + ); + settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalance); + settings.put(BalancedShardsAllocator.PRIMARY_SHARD_BALANCE_FACTOR_SETTING.getKey(), primaryBalance); + settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); + settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceThreshold); + + AllocationService strategy = createAllocationService(settings.build(), new TestGatewayAllocator()); + + ClusterState clusterState = initCluster(strategy); + assertPrimaryBalance( + clusterState.getRoutingTable(), + clusterState.getRoutingNodes(), + numberOfNodes, + numberOfIndices, + numberOfReplicas, + numberOfShards, + balanceThreshold + ); + assertIndexBalance( + clusterState.getRoutingTable(), + clusterState.getRoutingNodes(), + numberOfNodes, + numberOfIndices, + numberOfReplicas, + numberOfShards, + balanceThreshold + ); + + clusterState = addNode(clusterState, strategy); + assertPrimaryBalance( + clusterState.getRoutingTable(), + clusterState.getRoutingNodes(), + numberOfNodes + 1, + numberOfIndices, + numberOfReplicas, + numberOfShards, + balanceThreshold + ); + assertIndexBalance( + clusterState.getRoutingTable(), + clusterState.getRoutingNodes(), + numberOfNodes + 1, + numberOfIndices, + numberOfReplicas, + numberOfShards, + balanceThreshold + ); + + clusterState = removeNodes(clusterState, strategy); + assertPrimaryBalance( + clusterState.getRoutingTable(), + clusterState.getRoutingNodes(), + (numberOfNodes + 1) - (numberOfNodes + 1) / 2, + numberOfIndices, + numberOfReplicas, + numberOfShards, + balanceThreshold + ); + assertIndexBalance( + clusterState.getRoutingTable(), + clusterState.getRoutingNodes(), + (numberOfNodes + 1) - (numberOfNodes + 1) / 2, + numberOfIndices, + numberOfReplicas, + numberOfShards, + balanceThreshold + ); + } + + public void testShardBalance() { /* Tests balance over replicas only */ final float indexBalance = 0.0f; - final float replicaBalance = 1.0f; + final float shardBalance = 1.0f; final float balanceThreshold = 1.0f; Settings.Builder settings = Settings.builder(); @@ -135,13 +273,13 @@ public void testReplicaBalance() { ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString() ); settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalance); - settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), replicaBalance); + settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceThreshold); AllocationService strategy = createAllocationService(settings.build(), new TestGatewayAllocator()); ClusterState clusterState = initCluster(strategy); - assertReplicaBalance( + assertShardBalance( clusterState.getRoutingNodes(), numberOfNodes, numberOfIndices, @@ -151,7 +289,7 @@ public void testReplicaBalance() { ); clusterState = addNode(clusterState, strategy); - assertReplicaBalance( + assertShardBalance( clusterState.getRoutingNodes(), numberOfNodes + 1, numberOfIndices, @@ -161,7 +299,7 @@ public void testReplicaBalance() { ); clusterState = removeNodes(clusterState, strategy); - assertReplicaBalance( + assertShardBalance( clusterState.getRoutingNodes(), numberOfNodes + 1 - (numberOfNodes + 1) / 2, numberOfIndices, @@ -254,7 +392,7 @@ private ClusterState removeNodes(ClusterState clusterState, AllocationService st return applyStartedShardsUntilNoChange(clusterState, strategy); } - private void assertReplicaBalance( + private void assertShardBalance( RoutingNodes nodes, int numberOfNodes, int numberOfIndices, @@ -309,6 +447,27 @@ private void assertIndexBalance( } } + private void assertPrimaryBalance( + RoutingTable routingTable, + RoutingNodes nodes, + int numberOfNodes, + int numberOfIndices, + int numberOfReplicas, + int numberOfShards, + float threshold + ) { + + final int numShards = numberOfShards * numberOfIndices; + final float avgNumShards = (float) (numShards) / (float) (numberOfNodes); + final int minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards - threshold))); + final int maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards + threshold))); + + for (RoutingNode node : nodes) { + assertThat(node.primaryShardsWithState(STARTED).size(), Matchers.greaterThanOrEqualTo(minAvgNumberOfShards)); + assertThat(node.primaryShardsWithState(STARTED).size(), Matchers.lessThanOrEqualTo(maxAvgNumberOfShards)); + } + } + public void testPersistedSettings() { Settings.Builder settings = Settings.builder(); settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), 0.2); diff --git a/server/src/test/java/org/opensearch/common/settings/SettingTests.java b/server/src/test/java/org/opensearch/common/settings/SettingTests.java index 7703cb394397e..45810b421c027 100644 --- a/server/src/test/java/org/opensearch/common/settings/SettingTests.java +++ b/server/src/test/java/org/opensearch/common/settings/SettingTests.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.core.LogEvent; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.collect.Triplet; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.AbstractScopedSettings.SettingUpdater; import org.opensearch.common.settings.Setting.Property; @@ -625,6 +626,28 @@ public void validate(Integer a, Integer b) { } } + // This test class is used to verify behavior of BalancedShardAllocator.WeightFunction and ensure set function is called + // whenever there is a change in any of the settings. + public static class TriSettingConsumer { + + private Integer b; + private Integer a; + + private Integer c; + + public void set(Integer a, Integer b, Integer c) { + this.a = a; + this.b = b; + this.c = c; + } + + public void validate(Integer a, Integer b, Integer c) { + if (Integer.signum(a) != Integer.signum(b) || Integer.signum(a) != Integer.signum(c)) { + throw new IllegalArgumentException("boom"); + } + } + } + public void testComposite() { Composite c = new Composite(); Setting a = Setting.intSetting("foo.int.bar.a", 1, Property.Dynamic, Property.NodeScope); @@ -689,6 +712,110 @@ public void testCompositeValidator() { } + public void testTriSettingConsumer() { + TriSettingConsumer consumer = new TriSettingConsumer(); + Setting a = Setting.intSetting("foo.int.bar.a", 1, Property.Dynamic, Property.NodeScope); + Setting b = Setting.intSetting("foo.int.bar.b", 1, Property.Dynamic, Property.NodeScope); + Setting c = Setting.intSetting("foo.int.bar.c", 1, Property.Dynamic, Property.NodeScope); + ClusterSettings.SettingUpdater> settingUpdater = Setting.compoundUpdater( + consumer::set, + consumer::validate, + a, + b, + c, + logger + ); + assertFalse(settingUpdater.apply(Settings.EMPTY, Settings.EMPTY)); + assertNull(consumer.a); + assertNull(consumer.b); + assertNull(consumer.c); + + Settings build = Settings.builder().put("foo.int.bar.a", 2).build(); + assertTrue(settingUpdater.apply(build, Settings.EMPTY)); + assertEquals(2, consumer.a.intValue()); + assertEquals(1, consumer.b.intValue()); + assertEquals(1, consumer.c.intValue()); + + Integer aValue = consumer.a; + assertFalse(settingUpdater.apply(build, build)); + assertSame(aValue, consumer.a); + Settings previous = build; + build = Settings.builder().put("foo.int.bar.a", 2).put("foo.int.bar.b", 5).build(); + assertTrue(settingUpdater.apply(build, previous)); + assertEquals(2, consumer.a.intValue()); + assertEquals(5, consumer.b.intValue()); + + Integer bValue = consumer.b; + assertFalse(settingUpdater.apply(build, build)); + assertSame(bValue, consumer.b); + previous = build; + build = Settings.builder().put("foo.int.bar.a", 2).put("foo.int.bar.b", 5).put("foo.int.bar.c", 10).build(); + assertTrue(settingUpdater.apply(build, previous)); + assertEquals(2, consumer.a.intValue()); + assertEquals(5, consumer.b.intValue()); + assertEquals(10, consumer.c.intValue()); + + // reset to default + assertTrue(settingUpdater.apply(Settings.EMPTY, build)); + assertEquals(1, consumer.a.intValue()); + assertEquals(1, consumer.b.intValue()); + assertEquals(1, consumer.c.intValue()); + } + + public void testTriSettingConsumerValidator() { + TriSettingConsumer consumer = new TriSettingConsumer(); + Setting a = Setting.intSetting("foo.int.bar.a", 1, Property.Dynamic, Property.NodeScope); + Setting b = Setting.intSetting("foo.int.bar.b", 1, Property.Dynamic, Property.NodeScope); + Setting c = Setting.intSetting("foo.int.bar.c", 1, Property.Dynamic, Property.NodeScope); + ClusterSettings.SettingUpdater> settingUpdater = Setting.compoundUpdater( + consumer::set, + consumer::validate, + a, + b, + c, + logger + ); + assertFalse(settingUpdater.apply(Settings.EMPTY, Settings.EMPTY)); + assertNull(consumer.a); + assertNull(consumer.b); + assertNull(consumer.c); + + Settings build = Settings.builder().put("foo.int.bar.a", 2).build(); + assertTrue(settingUpdater.apply(build, Settings.EMPTY)); + assertEquals(2, consumer.a.intValue()); + assertEquals(1, consumer.b.intValue()); + assertEquals(1, consumer.c.intValue()); + + Integer aValue = consumer.a; + assertFalse(settingUpdater.apply(build, build)); + assertSame(aValue, consumer.a); + final Settings previous = build; + build = Settings.builder().put("foo.int.bar.a", 2).put("foo.int.bar.b", 5).build(); + assertTrue(settingUpdater.apply(build, previous)); + assertEquals(2, consumer.a.intValue()); + assertEquals(5, consumer.b.intValue()); + + Integer bValue = consumer.b; + assertFalse(settingUpdater.apply(build, build)); + assertSame(bValue, consumer.b); + final Settings previous2 = build; + build = Settings.builder().put("foo.int.bar.a", 2).put("foo.int.bar.b", 5).put("foo.int.bar.c", 10).build(); + assertTrue(settingUpdater.apply(build, previous)); + assertEquals(2, consumer.a.intValue()); + assertEquals(5, consumer.b.intValue()); + assertEquals(10, consumer.c.intValue()); + + Settings invalid = Settings.builder().put("foo.int.bar.a", -2).put("foo.int.bar.b", 5).build(); + IllegalArgumentException exc = expectThrows(IllegalArgumentException.class, () -> settingUpdater.apply(invalid, previous2)); + assertThat(exc.getMessage(), equalTo("boom")); + + // reset to default + assertTrue(settingUpdater.apply(Settings.EMPTY, build)); + assertEquals(1, consumer.a.intValue()); + assertEquals(1, consumer.b.intValue()); + assertEquals(1, consumer.c.intValue()); + } + public void testListSettingsDeprecated() { final Setting> deprecatedListSetting = Setting.listSetting( "foo.deprecated", diff --git a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java index 44ca2175621e7..05ff119cbe082 100644 --- a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java +++ b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java @@ -124,7 +124,7 @@ public class ExtensionsManagerTests extends OpenSearchTestCase { " minimumCompatibleVersion: '2.0.0'", " dependencies:", " - uniqueId: 'uniqueid0'", - " - version: '2.0.0'" + " version: '2.0.0'" ); private DiscoveryExtensionNode extensionNode; @@ -236,8 +236,7 @@ public void testDiscover() throws Exception { assertEquals(extension.getAttributes(), initializedExtension.getAttributes()); assertEquals(extension.getVersion(), initializedExtension.getVersion()); assertEquals(extension.getMinimumCompatibleVersion(), initializedExtension.getMinimumCompatibleVersion()); - // TODO: Will fail due to bug : https://github.com/opensearch-project/OpenSearch/issues/6115 - // assertEquals(extension.getDependencies(), initializedExtension.getDependencies()); + assertEquals(extension.getDependencies(), initializedExtension.getDependencies()); } } @@ -272,10 +271,10 @@ public void testNonUniqueExtensionsDiscovery() throws Exception { assertEquals(extension.getAttributes(), initializedExtension.getAttributes()); assertEquals(extension.getVersion(), initializedExtension.getVersion()); assertEquals(extension.getMinimumCompatibleVersion(), initializedExtension.getMinimumCompatibleVersion()); + assertEquals(extension.getDependencies(), initializedExtension.getDependencies()); } assertTrue(expectedExtensions.containsAll(emptyList())); - // TODO: Will fail due to bug : https://github.com/opensearch-project/OpenSearch/issues/6115 - // assertEquals(extension.getDependencies(), initializedExtension.getDependencies()); + assertTrue(expectedExtensions.containsAll(emptyList())); } public void testDiscoveryExtension() throws Exception { diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index 0e728ca3f1d4e..cb3affb71b3dc 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -465,7 +465,7 @@ public void testRangeSnapshot() throws Exception { } } - public void testSimpleOperationsUpload() throws IOException { + public void testSimpleOperationsUpload() throws Exception { ArrayList ops = new ArrayList<>(); try (Translog.Snapshot snapshot = translog.newSnapshot()) { assertThat(snapshot, SnapshotMatchers.size(0)); @@ -477,18 +477,18 @@ public void testSimpleOperationsUpload() throws IOException { assertThat(snapshot.totalOperations(), equalTo(ops.size())); } - assertEquals(translog.allUploaded().size(), 2); + assertEquals(4, translog.allUploaded().size()); addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 1, primaryTerm.get(), new byte[] { 1 })); - assertEquals(translog.allUploaded().size(), 4); + assertEquals(6, translog.allUploaded().size()); translog.rollGeneration(); - assertEquals(translog.allUploaded().size(), 4); + assertEquals(6, translog.allUploaded().size()); Set mdFiles = blobStoreTransferService.listAll( repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add("metadata") ); - assertEquals(mdFiles.size(), 2); + assertEquals(2, mdFiles.size()); logger.info("All md files {}", mdFiles); Set tlogFiles = blobStoreTransferService.listAll( @@ -529,33 +529,48 @@ public void testSimpleOperationsUpload() throws IOException { translog.deletionPolicy.setLocalCheckpointOfSafeCommit(0); // simulating the remote segment upload . translog.setMinSeqNoToKeep(0); - // This should not trim anything + // This should not trim anything from local translog.trimUnreferencedReaders(); - assertEquals(translog.allUploaded().size(), 4); - assertEquals( - blobStoreTransferService.listAll( - repository.basePath() - .add(shardId.getIndex().getUUID()) - .add(String.valueOf(shardId.id())) - .add(String.valueOf(primaryTerm.get())) - ).size(), - 4 - ); + assertEquals(2, translog.readers.size()); + assertBusy(() -> { + assertEquals(4, translog.allUploaded().size()); + assertEquals( + 4, + blobStoreTransferService.listAll( + repository.basePath() + .add(shardId.getIndex().getUUID()) + .add(String.valueOf(shardId.id())) + .add(String.valueOf(primaryTerm.get())) + ).size() + ); + }); - // This should trim tlog-2.* files as it contains seq no 0 + // This should trim tlog-2 from local + // This should not trim tlog-2.* files from remote as we not uploading any more translog to remote translog.setMinSeqNoToKeep(1); + translog.deletionPolicy.setLocalCheckpointOfSafeCommit(1); translog.trimUnreferencedReaders(); - assertEquals(translog.allUploaded().size(), 2); - assertEquals( - blobStoreTransferService.listAll( - repository.basePath() - .add(shardId.getIndex().getUUID()) - .add(String.valueOf(shardId.id())) - .add(String.valueOf(primaryTerm.get())) - ).size(), - 2 - ); + assertEquals(1, translog.readers.size()); + assertBusy(() -> { + assertEquals(4, translog.allUploaded().size()); + assertEquals( + 4, + blobStoreTransferService.listAll( + repository.basePath() + .add(shardId.getIndex().getUUID()) + .add(String.valueOf(shardId.id())) + .add(String.valueOf(primaryTerm.get())) + ).size() + ); + }); + // this should now trim as tlog-2 files from remote, but not tlog-3 and tlog-4 + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 2, primaryTerm.get(), new byte[] { 1 })); + translog.deletionPolicy.setLocalCheckpointOfSafeCommit(1); + translog.setMinSeqNoToKeep(2); + translog.trimUnreferencedReaders(); + assertEquals(1, translog.readers.size()); + assertBusy(() -> assertEquals(4, translog.allUploaded().size())); } private Long populateTranslogOps(boolean withMissingOps) throws IOException { diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java index c6b4579f5ddd1..be14e4a7bd380 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java @@ -15,6 +15,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.List; public class FileTransferTrackerTests extends OpenSearchTestCase { @@ -74,4 +75,25 @@ public void testOnFailure() throws IOException { } } + public void testUploaded() throws IOException { + fileTransferTracker = new FileTransferTracker(shardId); + Path testFile = createTempFile(); + Files.write(testFile, randomByteArrayOfLength(128), StandardOpenOption.APPEND); + try ( + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot( + testFile, + randomNonNegativeLong() + ); + + ) { + fileTransferTracker.onSuccess(transferFileSnapshot); + String fileName = String.valueOf(testFile.getFileName()); + assertTrue(fileTransferTracker.uploaded(fileName)); + assertFalse(fileTransferTracker.uploaded("random-name")); + + fileTransferTracker.delete(List.of(fileName)); + assertFalse(fileTransferTracker.uploaded(fileName)); + } + } + } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 0677879549905..0abbfcd3eb69c 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -12,31 +12,35 @@ import org.apache.lucene.util.Constants; import org.mockito.Mockito; import org.opensearch.action.ActionListener; +import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.set.Sets; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.Translog; -import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; -import org.opensearch.test.OpenSearchTestCase; import org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot; -import org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; +import org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; +import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; +import org.opensearch.test.OpenSearchTestCase; import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.any; import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @LuceneTestCase.SuppressFileSystems("*") public class TranslogTransferManagerTests extends OpenSearchTestCase { @@ -74,23 +78,25 @@ public void testTransferSnapshot() throws IOException { return null; }).when(transferService).uploadBlobAsync(any(TransferFileSnapshot.class), any(BlobPath.class), any(ActionListener.class)); + FileTransferTracker fileTransferTracker = new FileTransferTracker(new ShardId("index", "indexUUid", 0)) { + @Override + public void onSuccess(TransferFileSnapshot fileSnapshot) { + fileTransferSucceeded.incrementAndGet(); + super.onSuccess(fileSnapshot); + } + + @Override + public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { + fileTransferFailed.incrementAndGet(); + super.onFailure(fileSnapshot, e); + } + + }; + TranslogTransferManager translogTransferManager = new TranslogTransferManager( transferService, remoteBaseTransferPath, - new FileTransferTracker(new ShardId("index", "indexUUid", 0)) { - @Override - public void onSuccess(TransferFileSnapshot fileSnapshot) { - fileTransferSucceeded.incrementAndGet(); - } - - @Override - public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { - fileTransferFailed.incrementAndGet(); - } - - @Override - public void onDelete(String name) {} - } + fileTransferTracker ); assertTrue(translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() { @@ -108,6 +114,7 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) { assertEquals(0, fileTransferFailed.get()); assertEquals(1, translogTransferSucceeded.get()); assertEquals(0, translogTransferFailed.get()); + assertEquals(4, fileTransferTracker.allUploaded().size()); } private TransferSnapshot createTransferSnapshot() { @@ -295,6 +302,54 @@ public void testDownloadTranslogWithTrackerUpdated() throws IOException { // Since the tracker already holds the files with success state, adding them with success state is allowed tracker.add(translogFile, true); tracker.add(checkpointFile, true); + } + + public void testDeleteTranslogSuccess() throws Exception { + FileTransferTracker tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0)); + BlobStore blobStore = mock(BlobStore.class); + BlobContainer blobContainer = mock(BlobContainer.class); + when(blobStore.blobContainer(any(BlobPath.class))).thenReturn(blobContainer); + BlobStoreTransferService blobStoreTransferService = new BlobStoreTransferService( + blobStore, + OpenSearchExecutors.newDirectExecutorService() + ); + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + blobStoreTransferService, + remoteBaseTransferPath, + tracker + ); + String translogFile = "translog-19.tlog", checkpointFile = "translog-19.ckp"; + tracker.add(translogFile, true); + tracker.add(checkpointFile, true); + assertEquals(2, tracker.allUploaded().size()); + + List files = List.of(checkpointFile, translogFile); + translogTransferManager.deleteTranslogAsync(primaryTerm, Set.of(19L)); + assertBusy(() -> assertEquals(0, tracker.allUploaded().size())); + verify(blobContainer).deleteBlobsIgnoringIfNotExists(eq(files)); + } + + public void testDeleteTranslogFailure() throws Exception { + FileTransferTracker tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0)); + BlobStore blobStore = mock(BlobStore.class); + BlobContainer blobContainer = mock(BlobContainer.class); + doAnswer(invocation -> { throw new IOException("test exception"); }).when(blobStore).blobContainer(any(BlobPath.class)); + // when(blobStore.blobContainer(any(BlobPath.class))).thenReturn(blobContainer); + BlobStoreTransferService blobStoreTransferService = new BlobStoreTransferService( + blobStore, + OpenSearchExecutors.newDirectExecutorService() + ); + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + blobStoreTransferService, + remoteBaseTransferPath, + tracker + ); + String translogFile = "translog-19.tlog", checkpointFile = "translog-19.ckp"; + tracker.add(translogFile, true); + tracker.add(checkpointFile, true); + assertEquals(2, tracker.allUploaded().size()); + translogTransferManager.deleteTranslogAsync(primaryTerm, Set.of(19L)); + assertEquals(2, tracker.allUploaded().size()); } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/metrics/SumAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/metrics/SumAggregatorTests.java index 8c0087ca0b87d..b38d4eee850ef 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/metrics/SumAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/metrics/SumAggregatorTests.java @@ -385,7 +385,7 @@ private void sumRandomDocsTestCase( builder, new MatchAllDocsQuery(), writer -> writer.addDocuments(docs), - internalSum -> verify.apply(finalSum, docs, internalSum), + internalSum -> verify.accept(finalSum, docs, internalSum), fieldType ); } diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 86123012fee5d..11280ddfeadef 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -2443,4 +2443,10 @@ public void manageReplicaSettingForDefaultReplica(boolean apply) { updateSettingsRequest.persistentSettings(settings); assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); } + + protected String primaryNodeName(String indexName) { + ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + String nodeId = clusterState.getRoutingTable().index(indexName).shard(0).primaryShard().currentNodeId(); + return clusterState.getRoutingNodes().node(nodeId).node().getName(); + } }