From 2009e07e9619b155da4b52871e63b417e55daacd Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Mon, 27 Nov 2023 21:00:27 +0530 Subject: [PATCH] Make number of segment metadata files in remote segment store configurable (#11329) Signed-off-by: Sachin Kale --- CHANGELOG.md | 1 + .../opensearch/remotestore/RemoteStoreIT.java | 97 ++++++++++++++++++- .../common/settings/ClusterSettings.java | 1 + .../opensearch/index/shard/IndexShard.java | 6 ++ .../shard/RemoteStoreRefreshListener.java | 4 +- .../store/RemoteSegmentStoreDirectory.java | 6 ++ .../recovery/PeerRecoveryTargetService.java | 13 ++- .../indices/recovery/RecoverySettings.java | 33 +++++++ .../RemoteStoreRefreshListenerTests.java | 6 +- .../RecoverySettingsDynamicUpdateTests.java | 45 +++++++++ 10 files changed, 203 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f58f4e8d235b4..9759a32953c5a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Allow to pass the list settings through environment variables (like [], ["a", "b", "c"], ...) ([#10625](https://github.com/opensearch-project/OpenSearch/pull/10625)) - [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853)) - Add back half_float BKD based sort query optimization ([#11024](https://github.com/opensearch-project/OpenSearch/pull/11024)) +- Make number of segment metadata files in remote segment store configurable ([#11329](https://github.com/opensearch-project/OpenSearch/pull/11329)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index b3b4f8e10fd31..28294686d4370 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -34,6 +34,8 @@ import org.opensearch.test.transport.MockTransportService; import org.hamcrest.MatcherAssert; +import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; import java.util.Collection; @@ -42,10 +44,10 @@ import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; -import static org.opensearch.index.shard.RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP; import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @@ -167,7 +169,7 @@ public void testRemoteTranslogCleanup() throws Exception { } public void testStaleCommitDeletionWithInvokeFlush() throws Exception { - internalCluster().startNode(); + String dataNode = internalCluster().startNode(); createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1)); int numberOfIterations = randomIntBetween(5, 15); indexData(numberOfIterations, true, INDEX_NAME); @@ -177,17 +179,20 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception { .get() .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata"); + + IndexShard indexShard = getIndexShard(dataNode); + int lastNMetadataFilesToKeep = indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles(); // Delete is async. assertBusy(() -> { int actualFileCount = getFileCount(indexPath); - if (numberOfIterations <= LAST_N_METADATA_FILES_TO_KEEP) { + if (numberOfIterations <= lastNMetadataFilesToKeep) { MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1))); } else { // As delete is async its possible that the file gets created before the deletion or after // deletion. MatcherAssert.assertThat( actualFileCount, - is(oneOf(LAST_N_METADATA_FILES_TO_KEEP - 1, LAST_N_METADATA_FILES_TO_KEEP, LAST_N_METADATA_FILES_TO_KEEP + 1)) + is(oneOf(lastNMetadataFilesToKeep - 1, lastNMetadataFilesToKeep, lastNMetadataFilesToKeep + 1)) ); } }, 30, TimeUnit.SECONDS); @@ -209,6 +214,44 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception { MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1))); } + public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception { + Settings.Builder settings = Settings.builder() + .put(RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), "3"); + internalCluster().startNode(settings); + + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1)); + int numberOfIterations = randomIntBetween(5, 15); + indexData(numberOfIterations, true, INDEX_NAME); + String indexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata"); + int actualFileCount = getFileCount(indexPath); + // We also allow (numberOfIterations + 1) as index creation also triggers refresh. + MatcherAssert.assertThat(actualFileCount, is(oneOf(4))); + } + + public void testStaleCommitDeletionWithMinSegmentFiles_Disabled() throws Exception { + Settings.Builder settings = Settings.builder() + .put(RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), "-1"); + internalCluster().startNode(settings); + + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1)); + int numberOfIterations = randomIntBetween(12, 18); + indexData(numberOfIterations, true, INDEX_NAME); + String indexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata"); + int actualFileCount = getFileCount(indexPath); + // We also allow (numberOfIterations + 1) as index creation also triggers refresh. + MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations + 1))); + } + /** * Tests that when the index setting is not passed during index creation, the buffer interval picked up is the cluster * default. @@ -532,4 +575,50 @@ public void testNoSearchIdleForAnyReplicaCount() throws ExecutionException, Inte indexShard = getIndexShard(replicaShardNode); assertFalse(indexShard.isSearchIdleSupported()); } + + public void testFallbackToNodeToNodeSegmentCopy() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + List dataNodes = internalCluster().startDataOnlyNodes(2); + + // 1. Create index with 0 replica + createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1)); + ensureGreen(INDEX_NAME); + + // 2. Index docs + indexBulk(INDEX_NAME, 50); + flushAndRefresh(INDEX_NAME); + + // 3. Delete data from remote segment store + String indexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + Path segmentDataPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/data"); + + try (Stream files = Files.list(segmentDataPath)) { + files.forEach(p -> { + try { + Files.delete(p); + } catch (IOException e) { + // Ignore + } + }); + } + + // 4. Start recovery by changing number of replicas to 1 + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + ); + + // 5. Ensure green and verify number of docs + ensureGreen(INDEX_NAME); + assertBusy(() -> { + assertHitCount(client(dataNodes.get(0)).prepareSearch(INDEX_NAME).setSize(0).get(), 50); + assertHitCount(client(dataNodes.get(1)).prepareSearch(INDEX_NAME).setSize(0).get(), 50); + }); + } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index a817e63328110..1ee177d861eca 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -292,6 +292,7 @@ public void apply(Settings value, Settings current, Settings previous) { RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING, + RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index f1873ac659400..405845d58f227 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -345,6 +345,7 @@ Runnable getGlobalCheckpointSyncer() { private final List internalRefreshListener = new ArrayList<>(); private final RemoteStoreFileDownloader fileDownloader; + private final RecoverySettings recoverySettings; public IndexShard( final ShardRouting shardRouting, @@ -469,6 +470,7 @@ public boolean shouldCache(Query query) { ? false : mapperService.documentMapper().mappers().containsTimeStampField(); this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory; + this.recoverySettings = recoverySettings; this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings); } @@ -567,6 +569,10 @@ public String getNodeId() { return translogConfig.getNodeId(); } + public RecoverySettings getRecoverySettings() { + return recoverySettings; + } + public RemoteStoreFileDownloader getFileDownloader() { return fileDownloader; } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index dd40327298874..b02d858ce8b39 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -79,8 +79,6 @@ public final class RemoteStoreRefreshListener extends CloseableRetryableRefreshL ); public static final Set EXCLUDE_FILES = Set.of("write.lock"); - // Visible for testing - public static final int LAST_N_METADATA_FILES_TO_KEEP = 10; private final IndexShard indexShard; private final Directory storeDirectory; @@ -205,7 +203,7 @@ private boolean syncSegments() { // is considered as a first refresh post commit. A cleanup of stale commit files is triggered. // This is done to avoid delete post each refresh. if (isRefreshAfterCommit()) { - remoteDirectory.deleteStaleSegmentsAsync(LAST_N_METADATA_FILES_TO_KEEP); + remoteDirectory.deleteStaleSegmentsAsync(indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles()); } try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index c7d0d5713925f..7b57fabdf1486 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -727,6 +727,12 @@ public Map getSegmentsUploadedToRemoteStore() { * @throws IOException in case of I/O error while reading from / writing to remote segment store */ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException { + if (lastNMetadataFilesToKeep == -1) { + logger.info( + "Stale segment deletion is disabled if cluster.remote_store.index.segment_metadata.retention.max_count is set to -1" + ); + return; + } List sortedMetadataFileList = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( MetadataFilenameUtils.METADATA_PREFIX, Integer.MAX_VALUE diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 6bfcb8d9fa3d0..4232d32987e86 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -247,7 +247,18 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi indexShard.prepareForIndexRecovery(); final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled(); if (hasRemoteSegmentStore) { - indexShard.syncSegmentsFromRemoteSegmentStore(false, recoveryTarget::setLastAccessTime); + // ToDo: This is a temporary mitigation to not fail the peer recovery flow in case there is + // an exception while downloading segments from remote store. For remote backed indexes, we + // plan to revamp this flow so that node-node segment copy will not happen. + // GitHub Issue to track the revamp: https://github.com/opensearch-project/OpenSearch/issues/11331 + try { + indexShard.syncSegmentsFromRemoteSegmentStore(false, recoveryTarget::setLastAccessTime); + } catch (Exception e) { + logger.error( + "Exception while downloading segment files from remote store, will continue with peer to peer segment copy", + e + ); + } } final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot(); diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java index d91bfc19ee833..5351ae7fe08dd 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java @@ -157,6 +157,25 @@ public class RecoverySettings { Property.NodeScope ); + /** + * Controls minimum number of metadata files to keep in remote segment store. + * {@code value < 1} will disable deletion of stale segment metadata files. + */ + public static final Setting CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING = Setting.intSetting( + "cluster.remote_store.index.segment_metadata.retention.max_count", + 10, + -1, + v -> { + if (v == 0) { + throw new IllegalArgumentException( + "Value 0 is not allowed for this setting as it would delete all the data from remote segment store" + ); + } + }, + Property.NodeScope, + Property.Dynamic + ); + // choose 512KB-16B to ensure that the resulting byte[] is not a humongous allocation in G1. public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES); @@ -171,6 +190,7 @@ public class RecoverySettings { private volatile TimeValue internalActionTimeout; private volatile TimeValue internalActionRetryTimeout; private volatile TimeValue internalActionLongTimeout; + private volatile int minRemoteSegmentMetadataFiles; private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE; @@ -212,6 +232,11 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { this::setInternalActionLongTimeout ); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setActivityTimeout); + minRemoteSegmentMetadataFiles = CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer( + CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING, + this::setMinRemoteSegmentMetadataFiles + ); } public RateLimiter rateLimiter() { @@ -307,4 +332,12 @@ public int getMaxConcurrentRemoteStoreStreams() { private void setMaxConcurrentRemoteStoreStreams(int maxConcurrentRemoteStoreStreams) { this.maxConcurrentRemoteStoreStreams = maxConcurrentRemoteStoreStreams; } + + private void setMinRemoteSegmentMetadataFiles(int minRemoteSegmentMetadataFiles) { + this.minRemoteSegmentMetadataFiles = minRemoteSegmentMetadataFiles; + } + + public int getMinRemoteSegmentMetadataFiles() { + return this.minRemoteSegmentMetadataFiles; + } } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 51814283c5eb3..74da9a3fff19c 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -33,6 +33,7 @@ import org.opensearch.index.store.RemoteSegmentStoreDirectory.MetadataFilenameUtils; import org.opensearch.index.store.Store; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; +import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; @@ -237,7 +238,7 @@ public void testAfterMultipleCommits() throws IOException { setup(true, 3); assertDocs(indexShard, "1", "2", "3"); - for (int i = 0; i < RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP + 3; i++) { + for (int i = 0; i < indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles() + 3; i++) { indexDocs(4 * (i + 1), 4); flushShard(indexShard); } @@ -550,6 +551,9 @@ private Tuple mockIn RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory = indexShard.getRemoteStoreStatsTrackerFactory(); when(shard.indexSettings()).thenReturn(indexShard.indexSettings()); when(shard.shardId()).thenReturn(indexShard.shardId()); + RecoverySettings recoverySettings = mock(RecoverySettings.class); + when(recoverySettings.getMinRemoteSegmentMetadataFiles()).thenReturn(10); + when(shard.getRecoverySettings()).thenReturn(recoverySettings); RemoteSegmentTransferTracker tracker = remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId()); RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard, emptyCheckpointPublisher, tracker); refreshListener.afterRefresh(true); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java index 75639661f539d..18e7dfb375132 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java @@ -96,4 +96,49 @@ public void testInternalLongActionTimeout() { ); assertEquals(new TimeValue(duration, timeUnit), recoverySettings.internalActionLongTimeout()); } + + public void testSegmentMetadataRetention() { + // Default value + assertEquals(10, recoverySettings.getMinRemoteSegmentMetadataFiles()); + + // Setting value < default (10) + clusterSettings.applySettings( + Settings.builder().put(RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), 5).build() + ); + assertEquals(5, recoverySettings.getMinRemoteSegmentMetadataFiles()); + + // Setting min value + clusterSettings.applySettings( + Settings.builder().put(RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), -1).build() + ); + assertEquals(-1, recoverySettings.getMinRemoteSegmentMetadataFiles()); + + // Setting value > default (10) + clusterSettings.applySettings( + Settings.builder().put(RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), 15).build() + ); + assertEquals(15, recoverySettings.getMinRemoteSegmentMetadataFiles()); + + // Setting value to 0 should fail and retain the existing value + assertThrows( + IllegalArgumentException.class, + () -> clusterSettings.applySettings( + Settings.builder() + .put(RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), 0) + .build() + ) + ); + assertEquals(15, recoverySettings.getMinRemoteSegmentMetadataFiles()); + + // Setting value < -1 should fail and retain the existing value + assertThrows( + IllegalArgumentException.class, + () -> clusterSettings.applySettings( + Settings.builder() + .put(RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), -5) + .build() + ) + ); + assertEquals(15, recoverySettings.getMinRemoteSegmentMetadataFiles()); + } }