From b936e9d561629e52b719e2b61de2a8e94b5f048f Mon Sep 17 00:00:00 2001 From: Kunal Kotwani Date: Mon, 10 Jul 2023 17:12:25 -0700 Subject: [PATCH] Add restore level safeguards to prevent file cache oversubscription Signed-off-by: Kunal Kotwani --- CHANGELOG.md | 1 + .../restore/RestoreSnapshotRequest.java | 2 +- .../cluster/routing/RoutingTable.java | 10 ++ .../decider/DiskThresholdDecider.java | 6 +- .../common/settings/ClusterSettings.java | 2 + .../store/remote/filecache/FileCache.java | 17 ++- .../main/java/org/opensearch/node/Node.java | 5 +- .../opensearch/snapshots/RestoreService.java | 83 ++++++++-- .../restore/RestoreSnapshotRequestTests.java | 4 + .../cluster/routing/RoutingTableTests.java | 43 ++++++ .../decider/DiskThresholdDeciderTests.java | 2 + .../snapshots/SnapshotResiliencyTests.java | 143 +++++++++++++++++- .../opensearch/test/OpenSearchTestCase.java | 2 + 13 files changed, 300 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f20b3b8d7b51..861674753861a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -79,6 +79,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add server version as REST response header [#6583](https://github.com/opensearch-project/OpenSearch/issues/6583) - Start replication checkpointTimers on primary before segments upload to remote store. ([#8221]()https://github.com/opensearch-project/OpenSearch/pull/8221) - [distribution/archives] [Linux] [x64] Provide the variant of the distributions bundled with JRE ([#8195]()https://github.com/opensearch-project/OpenSearch/pull/8195) +- Add configuration for file cache size to max remote data ratio to prevent oversubscription of file cache ([#8606](https://github.com/opensearch-project/OpenSearch/pull/8606)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307)) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequest.java index 840564a4bd7a2..7a142e70305ae 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequest.java @@ -517,7 +517,7 @@ public String snapshotUuid() { /** * Sets the storage type for this request. */ - RestoreSnapshotRequest storageType(StorageType storageType) { + public RestoreSnapshotRequest storageType(StorageType storageType) { this.storageType = storageType; return this; } diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java index 7934649a6d3eb..d6a67bc714689 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java @@ -295,6 +295,16 @@ public ShardsIterator allShardsIncludingRelocationTargets(String[] indices) { return allShardsSatisfyingPredicate(indices, shardRouting -> true, true); } + /** + * All the shards on the node which match the predicate + * @param predicate condition to match + * @return iterator over shards matching the predicate + */ + public ShardsIterator allShardsSatisfyingPredicate(Predicate predicate) { + String[] indices = indicesRouting.keySet().toArray(new String[0]); + return allShardsSatisfyingPredicate(indices, predicate, false); + } + private ShardsIterator allShardsSatisfyingPredicate( String[] indices, Predicate predicate, diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index 4b69c05807ae4..61b96184abcc4 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -68,7 +68,7 @@ import static org.opensearch.cluster.routing.RoutingPool.getShardPool; import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING; import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING; -import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO; +import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING; /** * The {@link DiskThresholdDecider} checks that the node a shard is potentially @@ -199,8 +199,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing final FileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null); final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0; final long totalNodeRemoteShardSize = currentNodeRemoteShardSize + shardSize; - - if (totalNodeRemoteShardSize > DATA_TO_FILE_CACHE_SIZE_RATIO * nodeCacheSize) { + final double dataToFileCacheSizeRatio = DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.get(allocation.metadata().settings()); + if (dataToFileCacheSizeRatio > 0.0f && totalNodeRemoteShardSize > dataToFileCacheSizeRatio * nodeCacheSize) { return allocation.decision( Decision.NO, NAME, diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 46a43842451d9..9da9e1b14d307 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -44,6 +44,7 @@ import org.opensearch.index.ShardIndexingPressureMemoryManager; import org.opensearch.index.ShardIndexingPressureSettings; import org.opensearch.index.ShardIndexingPressureStore; +import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.search.backpressure.settings.NodeDuressSettings; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; @@ -643,6 +644,7 @@ public void apply(Settings value, Settings current, Settings previous) { // Settings related to Searchable Snapshots Node.NODE_SEARCH_CACHE_SIZE_SETTING, + FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING, // Settings related to Remote Refresh Segment Pressure RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED, diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java index 3d23b4d22538c..47b891fdb8d21 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java @@ -11,6 +11,7 @@ import org.apache.lucene.store.IndexInput; import org.opensearch.common.breaker.CircuitBreaker; import org.opensearch.common.breaker.CircuitBreakingException; +import org.opensearch.common.settings.Setting; import org.opensearch.index.store.remote.utils.cache.CacheUsage; import org.opensearch.index.store.remote.utils.cache.RefCountedCache; import org.opensearch.index.store.remote.utils.cache.SegmentedCache; @@ -49,8 +50,20 @@ public class FileCache implements RefCountedCache { private final CircuitBreaker circuitBreaker; - // TODO: Convert the constant into an integer setting - public static final int DATA_TO_FILE_CACHE_SIZE_RATIO = 5; + /** + * Defines a limit of how much total remote data can be referenced as a ratio of the size of the disk reserved for + * the file cache. For example, if 100GB disk space is configured for use as a file cache and the + * remote_data_ratio of 5 is defined, then a total of 500GB of remote data can be loaded as searchable snapshots. + * This is designed to be a safeguard to prevent oversubscribing a cluster. + * Specify a value of zero for no limit, which is the default for compatibility reasons. + */ + public static final Setting DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING = Setting.doubleSetting( + "cluster.filecache.remote_data_ratio", + 0.0, + 0.0, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); public FileCache(SegmentedCache cache, CircuitBreaker circuitBreaker) { this.theCache = cache; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index fe630dfe27e3a..0bc824c5a0704 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -941,8 +941,9 @@ protected Node( clusterModule.getAllocationService(), metadataCreateIndexService, metadataIndexUpgradeService, - clusterService.getClusterSettings(), - shardLimitValidator + shardLimitValidator, + indicesService, + clusterInfoService::getClusterInfo ); final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor( diff --git a/server/src/main/java/org/opensearch/snapshots/RestoreService.java b/server/src/main/java/org/opensearch/snapshots/RestoreService.java index d7e89172c5837..119e632928cc7 100644 --- a/server/src/main/java/org/opensearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/opensearch/snapshots/RestoreService.java @@ -41,6 +41,7 @@ import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; import org.opensearch.action.support.IndicesOptions; import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterInfo; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateApplier; import org.opensearch.cluster.ClusterStateTaskConfig; @@ -69,6 +70,7 @@ import org.opensearch.cluster.routing.RoutingChangesObserver; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardsIterator; import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.service.ClusterManagerTaskKeys; @@ -87,6 +89,9 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.IndexShard; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.snapshots.IndexShardSnapshotStatus; +import org.opensearch.index.store.remote.filecache.FileCacheStats; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.ShardLimitValidator; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.RepositoriesService; @@ -104,6 +109,7 @@ import java.util.Set; import java.util.function.Function; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; import static java.util.Collections.unmodifiableSet; @@ -119,6 +125,8 @@ import static org.opensearch.common.util.FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY; import static org.opensearch.common.util.set.Sets.newHashSet; import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectory.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_MINIMUM_VERSION; +import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING; +import static org.opensearch.node.Node.NODE_SEARCH_CACHE_SIZE_SETTING; import static org.opensearch.snapshots.SnapshotUtils.filterIndices; /** @@ -177,6 +185,10 @@ public class RestoreService implements ClusterStateApplier { private final ClusterSettings clusterSettings; + private final IndicesService indicesService; + + private final Supplier clusterInfoSupplier; + private final ClusterManagerTaskThrottler.ThrottlingKey restoreSnapshotTaskKey; private static final CleanRestoreStateTaskExecutor cleanRestoreStateTaskExecutor = new CleanRestoreStateTaskExecutor(); @@ -187,8 +199,9 @@ public RestoreService( AllocationService allocationService, MetadataCreateIndexService createIndexService, MetadataIndexUpgradeService metadataIndexUpgradeService, - ClusterSettings clusterSettings, - ShardLimitValidator shardLimitValidator + ShardLimitValidator shardLimitValidator, + IndicesService indicesService, + Supplier clusterInfoSupplier ) { this.clusterService = clusterService; this.repositoriesService = repositoriesService; @@ -200,6 +213,8 @@ public RestoreService( } this.clusterSettings = clusterService.getClusterSettings(); this.shardLimitValidator = shardLimitValidator; + this.indicesService = indicesService; + this.clusterInfoSupplier = clusterInfoSupplier; // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. restoreSnapshotTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.RESTORE_SNAPSHOT_KEY, true); @@ -415,7 +430,6 @@ public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey( @Override public ClusterState execute(ClusterState currentState) { - RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY); // Check if the snapshot to restore is currently being deleted SnapshotDeletionsInProgress deletionsInProgress = currentState.custom( SnapshotDeletionsInProgress.TYPE, @@ -436,7 +450,9 @@ public ClusterState execute(ClusterState currentState) { ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable()); final Map shards; + final boolean isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(request.storageType().toString()); Set aliases = new HashSet<>(); + long totalRestorableRemoteIndexesSize = 0; if (indices.isEmpty() == false) { // We have some indices to restore @@ -447,17 +463,14 @@ public ClusterState execute(ClusterState currentState) { String index = indexEntry.getValue(); boolean partial = checkPartial(index); + IndexId snapshotIndexId = repositoryData.resolveIndexId(index); IndexMetadata snapshotIndexMetadata = updateIndexSettings( metadata.index(index), request.indexSettings(), request.ignoreIndexSettings() ); - if (IndexModule.Type.REMOTE_SNAPSHOT.match(request.storageType().toString())) { - snapshotIndexMetadata = addSnapshotToIndexSettings( - snapshotIndexMetadata, - snapshot, - repositoryData.resolveIndexId(index) - ); + if (isRemoteSnapshot) { + snapshotIndexMetadata = addSnapshotToIndexSettings(snapshotIndexMetadata, snapshot, snapshotIndexId); } final boolean isSearchableSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match( snapshotIndexMetadata.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()) @@ -483,7 +496,7 @@ public ClusterState execute(ClusterState currentState) { restoreUUID, snapshot, snapshotInfo.version(), - repositoryData.resolveIndexId(index), + snapshotIndexId, isSearchableSnapshot, isRemoteStoreShallowCopy, request.getSourceRemoteStoreRepository() @@ -602,6 +615,14 @@ public ClusterState execute(ClusterState currentState) { } for (int shard = 0; shard < snapshotIndexMetadata.getNumberOfShards(); shard++) { + if (isRemoteSnapshot) { + IndexShardSnapshotStatus.Copy shardStatus = repository.getShardSnapshotStatus( + snapshotInfo.snapshotId(), + snapshotIndexId, + new ShardId(metadata.index(index).getIndex(), shard) + ).asCopy(); + totalRestorableRemoteIndexesSize += shardStatus.getTotalSize(); + } if (!ignoreShards.contains(shard)) { shardsBuilder.put( new ShardId(renamedIndex, shard), @@ -638,6 +659,9 @@ public ClusterState execute(ClusterState currentState) { } checkAliasNameConflicts(indices, aliases); + if (isRemoteSnapshot) { + validateSearchableSnapshotRestorable(totalRestorableRemoteIndexesSize); + } Map updatedDataStreams = new HashMap<>(currentState.metadata().dataStreams()); updatedDataStreams.putAll( @@ -837,6 +861,45 @@ private IndexMetadata updateIndexSettings( return builder.settings(settingsBuilder).build(); } + private void validateSearchableSnapshotRestorable(long totalRestorableRemoteIndexesSize) { + ClusterInfo clusterInfo = clusterInfoSupplier.get(); + double remoteDataToFileCacheRatio = DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.get(clusterService.getSettings()); + Map nodeFileCacheStats = clusterInfo.getNodeFileCacheStats(); + if (nodeFileCacheStats.isEmpty() || remoteDataToFileCacheRatio <= 0.01f) { + return; + } + + long totalNodeFileCacheSize = clusterInfo.getNodeFileCacheStats() + .values() + .stream() + .map(fileCacheStats -> fileCacheStats.getTotal().getBytes()) + .mapToLong(Long::longValue) + .sum(); + + Predicate isRemoteSnapshotShard = shardRouting -> shardRouting.primary() + && indicesService.indexService(shardRouting.index()).getIndexSettings().isRemoteSnapshot(); + + ShardsIterator shardsIterator = clusterService.state() + .routingTable() + .allShardsSatisfyingPredicate(isRemoteSnapshotShard); + + long totalRestoredRemoteIndexesSize = shardsIterator.getShardRoutings() + .stream() + .map(clusterInfo::getShardSize) + .mapToLong(Long::longValue) + .sum(); + + if (totalRestoredRemoteIndexesSize + totalRestorableRemoteIndexesSize > remoteDataToFileCacheRatio + * totalNodeFileCacheSize) { + throw new SnapshotRestoreException( + snapshot, + "Size of the indexes to be restored exceeds the file cache bounds. Increase the file cache capacity on the cluster nodes using " + + NODE_SEARCH_CACHE_SIZE_SETTING.getKey() + + " setting." + ); + } + } + @Override public void onFailure(String source, Exception e) { logger.warn(() -> new ParameterizedMessage("[{}] failed to restore snapshot", snapshotId), e); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequestTests.java index 5c20b3b262730..82b2cfb2e3e51 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequestTests.java @@ -112,6 +112,10 @@ private RestoreSnapshotRequest randomState(RestoreSnapshotRequest instance) { instance.snapshotUuid(randomBoolean() ? null : randomAlphaOfLength(10)); } + instance.storageType( + randomBoolean() ? RestoreSnapshotRequest.StorageType.LOCAL : RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT + ); + if (randomBoolean()) { instance.setSourceRemoteStoreRepository(randomAlphaOfLengthBetween(5, 10)); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java b/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java index 0ff9d6f07751a..53f1d71947f7c 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java @@ -231,6 +231,49 @@ public void testShardsMatchingPredicateCount() { assertThat(clusterState.routingTable().shardsMatchingPredicateCount(predicate), is(2)); } + public void testAllShardsMatchingPredicate() { + MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator()); + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .put(IndexMetadata.builder("test2").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(RoutingTable.builder().addAsNew(metadata.index("test1")).addAsNew(metadata.index("test2")).build()) + .build(); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))) + .build(); + clusterState = allocation.reroute(clusterState, "reroute"); + + Predicate predicate = s -> s.state() == ShardRoutingState.UNASSIGNED && s.unassignedInfo().isDelayed(); + assertThat(clusterState.routingTable().allShardsSatisfyingPredicate(predicate).size(), is(0)); + + // starting primaries + clusterState = startInitializingShardsAndReroute(allocation, clusterState); + // starting replicas + clusterState = startInitializingShardsAndReroute(allocation, clusterState); + // remove node2 and reroute + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build(); + // make sure both replicas are marked as delayed (i.e. not reallocated) + clusterState = allocation.disassociateDeadNodes(clusterState, true, "reroute"); + assertThat(clusterState.routingTable().allShardsSatisfyingPredicate(predicate).size(), is(2)); + + // Verifies true against all shards on the node (active/inactive) + assertThat(clusterState.routingTable().allShardsSatisfyingPredicate(shard -> true).size(), is(4)); + // Verifies false against all shards on the node (active/inactive) + assertThat(clusterState.routingTable().allShardsSatisfyingPredicate(shard -> false).size(), is(0)); + // Verifies against all primary shards on the node + assertThat(clusterState.routingTable().allShardsSatisfyingPredicate(ShardRouting::primary).size(), is(2)); + // Verifies a predicate which tests for inactive replicas + assertThat( + clusterState.routingTable() + .allShardsSatisfyingPredicate(shardRouting -> !shardRouting.primary() && !shardRouting.active()) + .size(), + is(2) + ); + } + public void testActivePrimaryShardsGrouped() { assertThat(this.emptyRoutingTable.activePrimaryShardsGrouped(new String[0], true).size(), is(0)); assertThat(this.emptyRoutingTable.activePrimaryShardsGrouped(new String[0], false).size(), is(0)); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 2180a14f5bf30..bde8a45359814 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -69,6 +69,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.store.remote.filecache.FileCacheStats; import org.opensearch.repositories.IndexId; import org.opensearch.snapshots.EmptySnapshotsInfoService; @@ -405,6 +406,7 @@ public void testFileCacheRemoteShardsDecisions() { DiskThresholdDecider diskThresholdDecider = makeDecider(diskSettings); Metadata metadata = Metadata.builder() .put(IndexMetadata.builder("test").settings(remoteIndexSettings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0)) + .persistentSettings(Settings.builder().put(FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.getKey(), 5).build()) .build(); RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index a121a190096b4..e4dec5163c400 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -34,6 +34,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.mockito.Mockito; import org.opensearch.ExceptionsHelper; import org.opensearch.Version; import org.opensearch.action.ActionListener; @@ -105,6 +106,8 @@ import org.opensearch.client.AdminClient; import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterInfo; +import org.opensearch.cluster.ClusterInfoService; import org.opensearch.cluster.ClusterModule; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; @@ -176,7 +179,9 @@ import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.shard.PrimaryReplicaSyncer; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; +import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.store.remote.filecache.FileCacheCleaner; +import org.opensearch.index.store.remote.filecache.FileCacheStats; import org.opensearch.indices.IndicesModule; import org.opensearch.indices.IndicesService; import org.opensearch.indices.ShardLimitValidator; @@ -213,6 +218,7 @@ import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.disruption.DisruptableMockTransport; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.RemoteTransportException; import org.opensearch.transport.TransportException; import org.opensearch.transport.TransportInterceptor; import org.opensearch.transport.TransportRequest; @@ -245,6 +251,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.mockito.Mockito.when; import static org.opensearch.action.support.ActionTestUtils.assertNoFailureListener; import static org.opensearch.env.Environment.PATH_HOME_SETTING; import static org.opensearch.monitor.StatusInfo.Status.HEALTHY; @@ -260,6 +267,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.mockito.Mockito.mock; +import static org.opensearch.node.Node.NODE_SEARCH_CACHE_SIZE_SETTING; public class SnapshotResiliencyTests extends OpenSearchTestCase { @@ -413,6 +421,106 @@ public void testSuccessfulSnapshotAndRestore() { assertEquals(0, snapshotInfo.failedShards()); } + public void testSearchableSnapshotOverSubscription() { + setupTestCluster(1, 2, 2); + + String repoName = "repo"; + String snapshotName = "snapshot"; + final String index = "test"; + final int shards = randomIntBetween(1, 10); + final int documents = randomIntBetween(0, 100); + + final TestClusterNodes.TestClusterNode clusterManagerNode = testClusterNodes.currentClusterManager( + testClusterNodes.nodes.values().iterator().next().clusterService.state() + ); + + Map nodeFileCacheStats = new HashMap<>(); + for (TestClusterNodes.TestClusterNode node : testClusterNodes.nodes.values()) { + nodeFileCacheStats.put(node.node.getId(), new FileCacheStats(0, 1, 0, 0, 0, 0, 0)); + } + ClusterInfo clusterInfo = new ClusterInfo(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), nodeFileCacheStats); + testClusterNodes.nodes.values().forEach(node -> when(node.getMockClusterInfoService().getClusterInfo()).thenReturn(clusterInfo)); + + final StepListener createSnapshotResponseListener = new StepListener<>(); + + continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> { + final Runnable afterIndexing = () -> client().admin() + .cluster() + .prepareCreateSnapshot(repoName, snapshotName) + .setWaitForCompletion(true) + .execute(createSnapshotResponseListener); + if (documents == 0) { + afterIndexing.run(); + } else { + final BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int i = 0; i < documents; ++i) { + bulkRequest.add(new IndexRequest(index).source(Collections.singletonMap("foo", "bar" + i))); + } + final StepListener bulkResponseStepListener = new StepListener<>(); + client().bulk(bulkRequest, bulkResponseStepListener); + continueOrDie(bulkResponseStepListener, bulkResponse -> { + assertFalse("Failures in bulk response: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures()); + assertEquals(documents, bulkResponse.getItems().length); + afterIndexing.run(); + }); + } + }); + + final StepListener deleteIndexListener = new StepListener<>(); + + continueOrDie( + createSnapshotResponseListener, + createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), deleteIndexListener) + ); + + final StepListener restoreSnapshotResponseListener = new StepListener<>(); + continueOrDie( + deleteIndexListener, + ignored -> client().admin() + .cluster() + .restoreSnapshot( + new RestoreSnapshotRequest(repoName, snapshotName).waitForCompletion(true) + .storageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT), + restoreSnapshotResponseListener + ) + ); + + final AtomicBoolean exceptionVerified = new AtomicBoolean(); + + restoreSnapshotResponseListener.whenComplete(null, restoreSnapshotException -> { + Throwable throwable = restoreSnapshotException; + if (restoreSnapshotException instanceof RemoteTransportException) { + throwable = restoreSnapshotException.getCause(); + } + try { + assertTrue(throwable instanceof SnapshotRestoreException); + assertTrue( + throwable.getMessage() + .contains( + "Size of the indexes to be restored exceeds the file cache bounds. Increase the file cache capacity on the cluster nodes using " + + NODE_SEARCH_CACHE_SIZE_SETTING.getKey() + + " setting." + ) + ); + } catch (SnapshotRestoreException ignored) {} + exceptionVerified.set(true); + }); + + runUntil(exceptionVerified::get, TimeUnit.MINUTES.toMillis(5L)); + assertTrue(exceptionVerified.get()); + SnapshotsInProgress finalSnapshotsInProgress = clusterManagerNode.clusterService.state().custom(SnapshotsInProgress.TYPE); + assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); + final Repository repository = clusterManagerNode.repositoriesService.repository(repoName); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); + assertThat(snapshotIds, hasSize(1)); + + final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next()); + assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + assertThat(snapshotInfo.indices(), containsInAnyOrder(index)); + assertEquals(shards, snapshotInfo.successfulShards()); + assertEquals(0, snapshotInfo.failedShards()); + } + public void testSnapshotWithNodeDisconnects() { final int dataNodes = randomIntBetween(2, 10); final int clusterManagerNodes = randomFrom(1, 3, 5); @@ -1415,6 +1523,11 @@ private void setupTestCluster(int clusterManagerNodes, int dataNodes) { startCluster(); } + private void setupTestCluster(int clusterManagerNodes, int dataNodes, int searchNodes) { + testClusterNodes = new TestClusterNodes(clusterManagerNodes, dataNodes, searchNodes); + startCluster(); + } + private void scheduleSoon(Runnable runnable) { deterministicTaskQueue.scheduleAt(deterministicTaskQueue.getCurrentTimeMillis() + randomLongBetween(0, 100L), runnable); } @@ -1465,6 +1578,7 @@ private Environment createEnvironment(String nodeName) { ClusterBootstrapService.INITIAL_CLUSTER_MANAGER_NODES_SETTING.getKey(), ClusterBootstrapService.INITIAL_CLUSTER_MANAGER_NODES_SETTING.get(Settings.EMPTY) ) + .put(FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.getKey(), 5) .put(MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING.getKey(), 1000) // o.w. some tests might block .build() ); @@ -1488,6 +1602,10 @@ private final class TestClusterNodes { private final Set disconnectedNodes = new HashSet<>(); TestClusterNodes(int clusterManagerNodes, int dataNodes) { + this(clusterManagerNodes, dataNodes, 0); + } + + TestClusterNodes(int clusterManagerNodes, int dataNodes, int searchNodes) { for (int i = 0; i < clusterManagerNodes; ++i) { nodes.computeIfAbsent("node" + i, nodeName -> { try { @@ -1506,6 +1624,15 @@ private final class TestClusterNodes { } }); } + for (int i = 0; i < searchNodes; ++i) { + nodes.computeIfAbsent("search-node" + i, nodeName -> { + try { + return newSearchNode(nodeName); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + } } public TestClusterNode nodeById(final String nodeId) { @@ -1524,6 +1651,10 @@ private TestClusterNode newDataNode(String nodeName) throws IOException { return newNode(nodeName, DiscoveryNodeRole.DATA_ROLE); } + private TestClusterNode newSearchNode(String nodeName) throws IOException { + return newNode(nodeName, DiscoveryNodeRole.SEARCH_ROLE); + } + private TestClusterNode newNode(String nodeName, DiscoveryNodeRole role) throws IOException { return new TestClusterNode( new DiscoveryNode( @@ -1667,6 +1798,8 @@ private final class TestClusterNode { private final ThreadPool threadPool; + private final ClusterInfoService clusterInfoService; + private Coordinator coordinator; TestClusterNode(DiscoveryNode node) throws IOException { @@ -1784,6 +1917,7 @@ public void onFailure(final Exception e) { final NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry(Collections.emptyList()); final ScriptService scriptService = new ScriptService(settings, emptyMap(), emptyMap()); client = new NodeClient(settings, threadPool); + clusterInfoService = Mockito.mock(ClusterInfoService.class); final SetOnce rerouteServiceSetOnce = new SetOnce<>(); final SnapshotsInfoService snapshotsInfoService = new InternalSnapshotsInfoService( settings, @@ -1993,8 +2127,9 @@ public void onFailure(final Exception e) { new SystemIndices(emptyMap()), null ), - clusterSettings, - shardLimitValidator + shardLimitValidator, + indicesService, + clusterInfoService::getClusterInfo ); actions.put( PutMappingAction.INSTANCE, @@ -2207,6 +2342,10 @@ protected void assertSnapshotOrGenericThread() { } } + public ClusterInfoService getMockClusterInfoService() { + return clusterInfoService; + } + public void restart() { testClusterNodes.disconnectNode(this); final ClusterState oldState = this.clusterService.state(); diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java index cdaa42c592c06..14275d838e6a9 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java @@ -117,6 +117,7 @@ import org.opensearch.index.analysis.NamedAnalyzer; import org.opensearch.index.analysis.TokenFilterFactory; import org.opensearch.index.analysis.TokenizerFactory; +import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.indices.analysis.AnalysisModule; import org.opensearch.monitor.jvm.JvmInfo; import org.opensearch.plugins.AnalysisPlugin; @@ -1201,6 +1202,7 @@ public static Settings.Builder settings(Version version) { public static Settings.Builder remoteIndexSettings(Version version) { Settings.Builder builder = Settings.builder() + .put(FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.getKey(), 5) .put(IndexMetadata.SETTING_VERSION_CREATED, version) .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey()); return builder;