diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java new file mode 100644 index 0000000000000..633bf4cb849be --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java @@ -0,0 +1,182 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.UUIDs; +import org.opensearch.common.io.FileSystemUtils; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.snapshots.AbstractSnapshotIntegTestCase; +import org.opensearch.test.FeatureFlagSetter; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.Locale; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteStoreRefreshListenerIT extends AbstractSnapshotIntegTestCase { + + private static final String REPOSITORY_NAME = "my-segment-repo-1"; + private static final String INDEX_NAME = "remote-store-test-idx-1"; + + @Override + protected Settings featureFlagSettings() { + return Settings.builder() + .put(super.featureFlagSettings()) + .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") + .put(FeatureFlags.REMOTE_STORE, "true") + .build(); + } + + @Before + public void setup() { + FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); + internalCluster().startClusterManagerOnlyNode(); + } + + @Override + public Settings indexSettings() { + return remoteStoreIndexSettings(0); + } + + private Settings remoteStoreIndexSettings(int numberOfReplicas) { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME) + .build(); + } + + @After + public void teardown() { + logger.info("--> Deleting the repository={}", REPOSITORY_NAME); + assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + } + + public void testRemoteRefreshRetryOnFailure() throws Exception { + + Path location = randomRepoPath().toAbsolutePath(); + setup(location, randomDoubleBetween(0.1, 0.25, true), "metadata"); + + // Here we are having flush/refresh after each iteration of indexing. However, the refresh will not always succeed + // due to IOExceptions that are thrown while doing uploadBlobs. + indexData(randomIntBetween(5, 10), randomBoolean()); + logger.info("--> Indexed data"); + + // TODO - Once the segments stats api is available, we need to verify that there were failed upload attempts. + IndicesStatsResponse response = client().admin().indices().stats(new IndicesStatsRequest()).get(); + assertEquals(1, response.getShards().length); + + String indexUuid = response.getShards()[0].getShardRouting().index().getUUID(); + Path segmentDataRepoPath = location.resolve(String.format(Locale.ROOT, "%s/0/segments/data", indexUuid)); + String segmentDataLocalPath = String.format(Locale.ROOT, "%s/indices/%s/0/index", response.getShards()[0].getDataPath(), indexUuid); + + logger.info("--> Verify that the segment files are same on local and repository eventually"); + // This can take time as the retry interval is exponential and maxed at 30s + assertBusy(() -> { + Set filesInLocal = getSegmentFiles(location.getRoot().resolve(segmentDataLocalPath)); + Set filesInRepo = getSegmentFiles(segmentDataRepoPath); + assertTrue(filesInRepo.containsAll(filesInLocal)); + }, 60, TimeUnit.SECONDS); + } + + private void setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList) { + logger.info("--> Creating repository={} at the path={}", REPOSITORY_NAME, repoLocation); + // The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in + /// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the + // repository creation can happen without failure. + createRepository( + REPOSITORY_NAME, + "mock", + Settings.builder() + .put("location", repoLocation) + .put("random_control_io_exception_rate", ioFailureRate) + .put("skip_exception_on_verification_file", true) + .put("skip_exception_on_list_blobs", true) + .put("max_failure_number", Long.MAX_VALUE) + ); + + internalCluster().startDataOnlyNodes(1); + createIndex(INDEX_NAME); + logger.info("--> Created index={}", INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + logger.info("--> Cluster is yellow with no initializing shards"); + ensureGreen(INDEX_NAME); + logger.info("--> Cluster is green"); + } + + /** + * Gets all segment files which starts with "_". For instance, _0.cfe, _o.cfs etc. + * + * @param location the path to location where segment files are being searched. + * @return set of file names of all segment file or empty set if there was IOException thrown. + */ + private Set getSegmentFiles(Path location) { + try { + return Arrays.stream(FileSystemUtils.files(location)) + .filter(path -> path.getFileName().toString().startsWith("_")) + .map(path -> path.getFileName().toString()) + .map(this::getLocalSegmentFilename) + .collect(Collectors.toSet()); + } catch (IOException exception) { + logger.error("Exception occurred while getting segment files", exception); + } + return Collections.emptySet(); + } + + private String getLocalSegmentFilename(String remoteFilename) { + return remoteFilename.split(RemoteSegmentStoreDirectory.SEGMENT_NAME_UUID_SEPARATOR)[0]; + } + + private IndexResponse indexSingleDoc() { + return client().prepareIndex(INDEX_NAME) + .setId(UUIDs.randomBase64UUID()) + .setSource(randomAlphaOfLength(5), randomAlphaOfLength(5)) + .get(); + } + + private void indexData(int numberOfIterations, boolean invokeFlush) { + logger.info("--> Indexing data for {} iterations with flush={}", numberOfIterations, invokeFlush); + for (int i = 0; i < numberOfIterations; i++) { + int numberOfOperations = randomIntBetween(1, 5); + logger.info("--> Indexing {} operations in iteration #{}", numberOfOperations, i); + for (int j = 0; j < numberOfOperations; j++) { + indexSingleDoc(); + } + if (invokeFlush) { + flush(INDEX_NAME); + } else { + refresh(INDEX_NAME); + } + } + } +} diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index ee3b392472fa0..6509387efcc42 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -20,21 +20,29 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; +import org.opensearch.action.bulk.BackoffPolicy; import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.common.unit.TimeValue; import org.opensearch.index.engine.EngineException; import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.threadpool.Scheduler; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -46,6 +54,27 @@ * @opensearch.internal */ public final class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener { + + private static final Logger logger = LogManager.getLogger(RemoteStoreRefreshListener.class); + + /** + * The initial retry interval at which the retry job gets scheduled after a failure. + */ + private static final int REMOTE_REFRESH_RETRY_BASE_INTERVAL_MILLIS = 1_000; + + /** + * In an exponential back off setup, the maximum retry interval after the retry interval increases exponentially. + */ + private static final int REMOTE_REFRESH_RETRY_MAX_INTERVAL_MILLIS = 10_000; + + /** + * Exponential back off policy with max retry interval. + */ + private static final BackoffPolicy EXPONENTIAL_BACKOFF_POLICY = BackoffPolicy.exponentialEqualJitterBackoff( + REMOTE_REFRESH_RETRY_BASE_INTERVAL_MILLIS, + REMOTE_REFRESH_RETRY_MAX_INTERVAL_MILLIS + ); + // Visible for testing static final Set EXCLUDE_FILES = Set.of("write.lock"); // Visible for testing @@ -57,7 +86,15 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres private final RemoteSegmentStoreDirectory remoteDirectory; private final Map localSegmentChecksumMap; private long primaryTerm; - private static final Logger logger = LogManager.getLogger(RemoteStoreRefreshListener.class); + + /** + * Semaphore that ensures there is only 1 retry scheduled at any time. + */ + private final Semaphore SCHEDULE_RETRY_PERMITS = new Semaphore(1); + + private volatile Iterator backoffDelayIterator; + + private volatile Scheduler.ScheduledCancellable scheduledCancellableRetry; public RemoteStoreRefreshListener(IndexShard indexShard) { this.indexShard = indexShard; @@ -66,13 +103,14 @@ public RemoteStoreRefreshListener(IndexShard indexShard) { .getDelegate()).getDelegate(); this.primaryTerm = indexShard.getOperationPrimaryTerm(); localSegmentChecksumMap = new HashMap<>(); - if (indexShard.shardRouting.primary()) { + if (indexShard.routingEntry().primary()) { try { this.remoteDirectory.init(); } catch (IOException e) { logger.error("Exception while initialising RemoteSegmentStoreDirectory", e); } } + resetBackOffDelayIterator(); } @Override @@ -83,93 +121,150 @@ public void beforeRefresh() throws IOException { /** * Upload new segment files created as part of the last refresh to the remote segment store. * This method also uploads remote_segments_metadata file which contains metadata of each segment file uploaded. + * * @param didRefresh true if the refresh opened a new reference */ @Override public void afterRefresh(boolean didRefresh) { - synchronized (this) { - try { - if (indexShard.getReplicationTracker().isPrimaryMode()) { - if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { - this.primaryTerm = indexShard.getOperationPrimaryTerm(); - this.remoteDirectory.init(); + try { + indexShard.getThreadPool().executor(ThreadPool.Names.REMOTE_REFRESH).submit(() -> syncSegments(false)).get(); + } catch (InterruptedException | ExecutionException e) { + logger.info("Exception occurred while scheduling syncSegments", e); + } + } + + private synchronized void syncSegments(boolean isRetry) { + boolean shouldRetry = false; + beforeSegmentsSync(isRetry); + try { + if (indexShard.getReplicationTracker().isPrimaryMode()) { + if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { + this.primaryTerm = indexShard.getOperationPrimaryTerm(); + this.remoteDirectory.init(); + } + try { + // if a new segments_N file is present in local that is not uploaded to remote store yet, it + // is considered as a first refresh post commit. A cleanup of stale commit files is triggered. + // This is done to avoid delete post each refresh. + // Ideally, we want this to be done in async flow. (GitHub issue #4315) + if (isRefreshAfterCommit()) { + deleteStaleCommits(); } - try { - // if a new segments_N file is present in local that is not uploaded to remote store yet, it - // is considered as a first refresh post commit. A cleanup of stale commit files is triggered. - // This is done to avoid delete post each refresh. - // Ideally, we want this to be done in async flow. (GitHub issue #4315) - if (isRefreshAfterCommit()) { - deleteStaleCommits(); - } - String segmentInfoSnapshotFilename = null; - try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { - SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); - - Collection localSegmentsPostRefresh = segmentInfos.files(true); - - List segmentInfosFiles = localSegmentsPostRefresh.stream() - .filter(file -> file.startsWith(IndexFileNames.SEGMENTS)) - .collect(Collectors.toList()); - Optional latestSegmentInfos = segmentInfosFiles.stream() - .max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName)); - - if (latestSegmentInfos.isPresent()) { - // SegmentInfosSnapshot is a snapshot of reader's view of segments and may not contain - // all the segments from last commit if they are merged away but not yet committed. - // Each metadata file in the remote segment store represents a commit and the following - // statement keeps sure that each metadata will always contain all the segments from last commit + refreshed - // segments. - localSegmentsPostRefresh.addAll( - SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true) + String segmentInfoSnapshotFilename = null; + try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { + SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); + + Collection localSegmentsPostRefresh = segmentInfos.files(true); + + List segmentInfosFiles = localSegmentsPostRefresh.stream() + .filter(file -> file.startsWith(IndexFileNames.SEGMENTS)) + .collect(Collectors.toList()); + Optional latestSegmentInfos = segmentInfosFiles.stream() + .max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName)); + + if (latestSegmentInfos.isPresent()) { + // SegmentInfosSnapshot is a snapshot of reader's view of segments and may not contain + // all the segments from last commit if they are merged away but not yet committed. + // Each metadata file in the remote segment store represents a commit and the following + // statement keeps sure that each metadata will always contain all the segments from last commit + refreshed + // segments. + localSegmentsPostRefresh.addAll(SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true)); + segmentInfosFiles.stream() + .filter(file -> !file.equals(latestSegmentInfos.get())) + .forEach(localSegmentsPostRefresh::remove); + + boolean uploadStatus = uploadNewSegments(localSegmentsPostRefresh); + if (uploadStatus) { + segmentInfoSnapshotFilename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos); + localSegmentsPostRefresh.add(segmentInfoSnapshotFilename); + + remoteDirectory.uploadMetadata( + localSegmentsPostRefresh, + storeDirectory, + indexShard.getOperationPrimaryTerm(), + segmentInfos.getGeneration() ); - segmentInfosFiles.stream() - .filter(file -> !file.equals(latestSegmentInfos.get())) - .forEach(localSegmentsPostRefresh::remove); - - boolean uploadStatus = uploadNewSegments(localSegmentsPostRefresh); - if (uploadStatus) { - segmentInfoSnapshotFilename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos); - localSegmentsPostRefresh.add(segmentInfoSnapshotFilename); - - remoteDirectory.uploadMetadata( - localSegmentsPostRefresh, - storeDirectory, - indexShard.getOperationPrimaryTerm(), - segmentInfos.getGeneration() - ); - localSegmentChecksumMap.keySet() - .stream() - .filter(file -> !localSegmentsPostRefresh.contains(file)) - .collect(Collectors.toSet()) - .forEach(localSegmentChecksumMap::remove); - final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()) - .lastRefreshedCheckpoint(); - ((InternalEngine) indexShard.getEngine()).translogManager() - .setMinSeqNoToKeep(lastRefreshedCheckpoint + 1); - } + localSegmentChecksumMap.keySet() + .stream() + .filter(file -> !localSegmentsPostRefresh.contains(file)) + .collect(Collectors.toSet()) + .forEach(localSegmentChecksumMap::remove); + OnSuccessfulSegmentsSync(); + final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); + ((InternalEngine) indexShard.getEngine()).translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1); + } else { + shouldRetry = true; } - } catch (EngineException e) { - logger.warn("Exception while reading SegmentInfosSnapshot", e); - } finally { - try { - if (segmentInfoSnapshotFilename != null) { - storeDirectory.deleteFile(segmentInfoSnapshotFilename); - } - } catch (IOException e) { - logger.warn("Exception while deleting: " + segmentInfoSnapshotFilename, e); + } + } catch (EngineException e) { + shouldRetry = true; + logger.warn("Exception while reading SegmentInfosSnapshot", e); + } finally { + try { + if (segmentInfoSnapshotFilename != null) { + storeDirectory.deleteFile(segmentInfoSnapshotFilename); } + } catch (IOException e) { + logger.warn("Exception while deleting: " + segmentInfoSnapshotFilename, e); } - } catch (IOException e) { - // We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried - // in the next refresh. This should not affect durability of the indexed data after remote trans-log integration. - logger.warn("Exception while uploading new segments to the remote segment store", e); } + } catch (IOException e) { + shouldRetry = true; + // We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried + // in the next refresh. This should not affect durability of the indexed data after remote trans-log integration. + logger.warn("Exception while uploading new segments to the remote segment store", e); } - } catch (Throwable t) { - logger.error("Exception in RemoteStoreRefreshListener.afterRefresh()", t); } + } catch (Throwable t) { + shouldRetry = true; + logger.error("Exception in RemoteStoreRefreshListener.afterRefresh()", t); + } + afterSegmentsSync(isRetry, shouldRetry); + } + + private void beforeSegmentsSync(boolean isRetry) { + if (isRetry) { + logger.info("Retrying to sync the segments to remote store"); + } + } + + private void OnSuccessfulSegmentsSync() { + // Reset the backoffDelayIterator for the future failures + resetBackOffDelayIterator(); + // Cancel the scheduled cancellable retry if possible and set it to null + cancelAndResetScheduledCancellableRetry(); + } + + /** + * Cancels the scheduled retry if there is one scheduled, and it has not started yet. Clears the reference as the + * schedule retry has been cancelled, or it was null in the first place, or it is running/ran already. + */ + private void cancelAndResetScheduledCancellableRetry() { + if (scheduledCancellableRetry != null && scheduledCancellableRetry.getDelay(TimeUnit.NANOSECONDS) > 0) { + scheduledCancellableRetry.cancel(); + } + scheduledCancellableRetry = null; + } + + /** + * Resets the backoff delay iterator so that the next set of failures starts with the base delay and goes upto max delay. + */ + private void resetBackOffDelayIterator() { + backoffDelayIterator = EXPONENTIAL_BACKOFF_POLICY.iterator(); + } + + private void afterSegmentsSync(boolean isRetry, boolean shouldRetry) { + // If this was a retry attempt, then we release the semaphore at the end so that further retries can be scheduled + if (isRetry) { + SCHEDULE_RETRY_PERMITS.release(); + } + + // If there are failures in uploading segments, then we should retry as search idle can lead to + // refresh not occurring until write happens. + if (shouldRetry && indexShard.state() != IndexShardState.CLOSED && SCHEDULE_RETRY_PERMITS.tryAcquire()) { + scheduledCancellableRetry = indexShard.getThreadPool() + .schedule(() -> this.syncSegments(true), backoffDelayIterator.next(), ThreadPool.Names.REMOTE_REFRESH); } } diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 74c5c6055a889..987f38e8dd8fd 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -111,6 +111,7 @@ public static class Names { public static final String TRANSLOG_TRANSFER = "translog_transfer"; public static final String TRANSLOG_SYNC = "translog_sync"; public static final String REMOTE_PURGE = "remote_purge"; + public static final String REMOTE_REFRESH = "remote_refresh"; public static final String INDEX_SEARCHER = "index_searcher"; } @@ -179,6 +180,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.TRANSLOG_TRANSFER, ThreadPoolType.SCALING); map.put(Names.TRANSLOG_SYNC, ThreadPoolType.FIXED); map.put(Names.REMOTE_PURGE, ThreadPoolType.SCALING); + map.put(Names.REMOTE_REFRESH, ThreadPoolType.SCALING); if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) { map.put(Names.INDEX_SEARCHER, ThreadPoolType.FIXED); } @@ -272,6 +274,10 @@ public ThreadPool( ); builders.put(Names.TRANSLOG_SYNC, new FixedExecutorBuilder(settings, Names.TRANSLOG_SYNC, allocatedProcessors * 4, 10000)); builders.put(Names.REMOTE_PURGE, new ScalingExecutorBuilder(Names.REMOTE_PURGE, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); + builders.put( + Names.REMOTE_REFRESH, + new ScalingExecutorBuilder(Names.REMOTE_REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)) + ); if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) { builders.put(Names.INDEX_SEARCHER, new FixedExecutorBuilder(settings, Names.INDEX_SEARCHER, allocatedProcessors, 1000, false)); } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index c9b8c023e26aa..84848bb87d634 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -29,8 +29,13 @@ import java.io.IOException; import java.util.Collections; import java.util.Map; +import java.util.Objects; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX; public class RemoteStoreRefreshListenerTests extends IndexShardTestCase { @@ -58,9 +63,11 @@ private void indexDocs(int startDocId, int numberOfDocs) throws IOException { @After public void tearDown() throws Exception { - Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) indexShard.store().directory()).getDelegate()).getDelegate(); - ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); - closeShards(indexShard); + if (indexShard != null) { + Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) indexShard.store().directory()).getDelegate()).getDelegate(); + ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); + closeShards(indexShard); + } super.tearDown(); } @@ -204,6 +211,127 @@ public void onFailure(Exception e) { verifyUploadedSegments(remoteSegmentStoreDirectory); } + public void testRefreshSuccessOnFirstAttempt() throws Exception { + // This is the case of isRetry=false, shouldRetry=false + // Succeed on 1st attempt + int succeedOnAttempt = 1; + // We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation. + CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); + // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. + // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down + CountDownLatch successLatch = new CountDownLatch(3); + mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch); + assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); + assertBusy(() -> assertEquals(0, successLatch.getCount())); + } + + public void testRefreshSuccessOnSecondAttempt() throws Exception { + // This covers 2 cases - 1) isRetry=false, shouldRetry=true 2) isRetry=true, shouldRetry=false + // Succeed on 2nd attempt + int succeedOnAttempt = 2; + // We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation. + CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); + // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. + // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down + CountDownLatch successLatch = new CountDownLatch(3); + mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch); + assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); + assertBusy(() -> assertEquals(0, successLatch.getCount())); + } + + public void testRefreshSuccessOnThirdAttemptAttempt() throws Exception { + // This covers 3 cases - 1) isRetry=false, shouldRetry=true 2) isRetry=true, shouldRetry=false 3) isRetry=True, shouldRetry=true + // Succeed on 3rd attempt + int succeedOnAttempt = 3; + // We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation. + CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); + // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. + // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down + CountDownLatch successLatch = new CountDownLatch(3); + mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch); + assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); + assertBusy(() -> assertEquals(0, successLatch.getCount())); + } + + private void mockIndexShardWithRetryAndScheduleRefresh( + int succeedOnAttempt, + CountDownLatch refreshCountLatch, + CountDownLatch successLatch + ) throws IOException { + // Create index shard that we will be using to mock different methods in IndexShard for the unit test + indexShard = newStartedShard( + true, + Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true).build(), + new InternalEngineFactory() + ); + + indexDocs(1, randomIntBetween(1, 100)); + + // Mock indexShard.store().directory() + IndexShard shard = mock(IndexShard.class); + Store store = mock(Store.class); + when(shard.store()).thenReturn(store); + when(store.directory()).thenReturn(indexShard.store().directory()); + + // Mock (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()) + Store remoteStore = mock(Store.class); + when(shard.remoteStore()).thenReturn(remoteStore); + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = + (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()).getDelegate()) + .getDelegate(); + FilterDirectory remoteStoreFilterDirectory = new TestFilterDirectory(new TestFilterDirectory(remoteSegmentStoreDirectory)); + when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory); + + // Mock indexShard.getOperationPrimaryTerm() + when(shard.getOperationPrimaryTerm()).thenReturn(indexShard.getOperationPrimaryTerm()); + + // Mock indexShard.routingEntry().primary() + when(shard.routingEntry()).thenReturn(indexShard.routingEntry()); + + // Mock threadpool + when(shard.getThreadPool()).thenReturn(threadPool); + + // Mock indexShard.getReplicationTracker().isPrimaryMode() + + doAnswer(invocation -> { + if (Objects.nonNull(refreshCountLatch)) { + refreshCountLatch.countDown(); + } + return indexShard.getReplicationTracker(); + }).when(shard).getReplicationTracker(); + + AtomicLong counter = new AtomicLong(); + // Mock indexShard.getSegmentInfosSnapshot() + doAnswer(invocation -> { + if (counter.incrementAndGet() <= succeedOnAttempt - 1) { + throw new RuntimeException("Inducing failure in upload"); + } + return indexShard.getSegmentInfosSnapshot(); + }).when(shard).getSegmentInfosSnapshot(); + + doAnswer(invocation -> { + if (Objects.nonNull(successLatch)) { + successLatch.countDown(); + } + return indexShard.getEngine(); + }).when(shard).getEngine(); + + RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard); + refreshListener.afterRefresh(false); + } + + private static class TestFilterDirectory extends FilterDirectory { + + /** + * Sole constructor, typically called from sub-classes. + * + * @param in + */ + protected TestFilterDirectory(Directory in) { + super(in); + } + } + private void verifyUploadedSegments(RemoteSegmentStoreDirectory remoteSegmentStoreDirectory) throws IOException { Map uploadedSegments = remoteSegmentStoreDirectory .getSegmentsUploadedToRemoteStore(); diff --git a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java index d1e7e25369b12..bfa97fb1d3fe7 100644 --- a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java +++ b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java @@ -135,6 +135,7 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso sizes.put(ThreadPool.Names.TRANSLOG_TRANSFER, ThreadPool::halfAllocatedProcessorsMaxTen); sizes.put(ThreadPool.Names.TRANSLOG_SYNC, n -> 4 * n); sizes.put(ThreadPool.Names.REMOTE_PURGE, ThreadPool::halfAllocatedProcessorsMaxFive); + sizes.put(ThreadPool.Names.REMOTE_REFRESH, n -> 4 * n); return sizes.get(threadPoolName).apply(numberOfProcessors); } diff --git a/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java index 7b53c36fbacf9..0e47130e424cd 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java @@ -78,6 +78,8 @@ public class MockRepository extends FsRepository { private static final Logger logger = LogManager.getLogger(MockRepository.class); + private static final String DUMMY_FILE_NAME_LIST_BLOBS = "dummy-name-list-blobs"; + public static class Plugin extends org.opensearch.plugins.Plugin implements RepositoryPlugin { public static final Setting USERNAME_SETTING = Setting.simpleString("secret.mock.username", Property.NodeScope); @@ -116,6 +118,10 @@ public long getFailureCount() { private final double randomDataFileIOExceptionRate; + private final boolean skipExceptionOnVerificationFile; + + private final boolean skipExceptionOnListBlobs; + private final boolean useLuceneCorruptionException; private final long maximumNumberOfFailures; @@ -174,6 +180,8 @@ public MockRepository( super(overrideSettings(metadata, environment), environment, namedXContentRegistry, clusterService, recoverySettings); randomControlIOExceptionRate = metadata.settings().getAsDouble("random_control_io_exception_rate", 0.0); randomDataFileIOExceptionRate = metadata.settings().getAsDouble("random_data_file_io_exception_rate", 0.0); + skipExceptionOnVerificationFile = metadata.settings().getAsBoolean("skip_exception_on_verification_file", false); + skipExceptionOnListBlobs = metadata.settings().getAsBoolean("skip_exception_on_list_blobs", false); useLuceneCorruptionException = metadata.settings().getAsBoolean("use_lucene_corruption", false); maximumNumberOfFailures = metadata.settings().getAsLong("max_failure_number", 100L); blockOnAnyFiles = metadata.settings().getAsBoolean("block_on_control", false); @@ -360,9 +368,14 @@ private int hashCode(String path) { } private void maybeIOExceptionOrBlock(String blobName) throws IOException { - if (INDEX_LATEST_BLOB.equals(blobName)) { - // Don't mess with the index.latest blob here, failures to write to it are ignored by upstream logic and we have - // specific tests that cover the error handling around this blob. + if (INDEX_LATEST_BLOB.equals(blobName) // Condition 1 + || skipExceptionOnVerificationFiles(blobName) // Condition 2 + || skipExceptionOnListBlobs(blobName)) { // Condition 3 + // Condition 1 - Don't mess with the index.latest blob here, failures to write to it are ignored by + // upstream logic and we have specific tests that cover the error handling around this blob. + // Condition 2 & 3 - This condition has been added to allow creation of repository which throws IO + // exception during normal remote store operations. However, if we fail during verification as well, + // then we can not add the repository as well. return; } if (blobName.startsWith("__")) { @@ -482,7 +495,7 @@ public void deleteBlobsIgnoringIfNotExists(List blobNames) throws IOExce @Override public Map listBlobs() throws IOException { - maybeIOExceptionOrBlock(""); + maybeIOExceptionOrBlock(DUMMY_FILE_NAME_LIST_BLOBS); return super.listBlobs(); } @@ -550,5 +563,24 @@ public void writeBlobAtomic( } } } + + private boolean skipExceptionOnVerificationFiles(String blobName) { + return skipExceptionOnVerificationFile && isVerificationFile(blobName); + } + + /** + * Checks if the file name is one of the types of verification files that is created at the time of creation of + * repository. + * + * @param blobName name of the blob + * @return true if it is the file created at the time of repository creation + */ + private boolean isVerificationFile(String blobName) { + return blobName.equals("master.dat") || (blobName.startsWith("data-") && blobName.endsWith(".dat")); + } + + private boolean skipExceptionOnListBlobs(String blobName) { + return skipExceptionOnListBlobs && DUMMY_FILE_NAME_LIST_BLOBS.equals(blobName); + } } }