From 4bc21869ecb871249d45261b372294376750f543 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Tue, 9 May 2023 15:02:47 +0530 Subject: [PATCH 1/9] Add backpressure in write path on remote segments lag Signed-off-by: Ashish Singh --- .../opensearch/index/shard/IndexShardIT.java | 1 + .../RemoteStoreRefreshListenerIT.java | 41 ++- .../action/bulk/TransportShardBulkAction.java | 17 +- .../org/opensearch/index/IndexService.java | 7 +- .../RemoteRefreshSegmentPressureService.java | 11 +- .../remote/RemoteRefreshSegmentTracker.java | 80 ++--- .../opensearch/index/shard/IndexShard.java | 9 +- .../shard/RemoteStoreRefreshListener.java | 297 +++++++++++++----- .../org/opensearch/indices/IndicesModule.java | 5 + .../opensearch/indices/IndicesService.java | 12 +- .../cluster/IndicesClusterStateService.java | 24 +- .../bulk/TransportShardBulkActionTests.java | 5 + ...oteRefreshSegmentPressureServiceTests.java | 2 +- .../RemoteRefreshSegmentTrackerTests.java | 8 +- .../RemoteStoreRefreshListenerTests.java | 79 ++++- ...dicesLifecycleListenerSingleNodeTests.java | 3 +- ...actIndicesClusterStateServiceTestCase.java | 4 +- ...ClusterStateServiceRandomUpdatesTests.java | 3 +- .../snapshots/SnapshotResiliencyTests.java | 5 +- .../index/shard/IndexShardTestCase.java | 21 +- .../snapshots/mockstore/MockRepository.java | 11 +- 21 files changed, 484 insertions(+), 161 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index 11f187ac6e619..ba567c125c6e9 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -701,6 +701,7 @@ public static final IndexShard newIndexShard( cbs, (indexSettings, shardRouting) -> new InternalTranslogFactory(), SegmentReplicationCheckpointPublisher.EMPTY, + null, null ); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java index 30e370f3a528c..3833ed6e4e1bd 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java @@ -8,8 +8,8 @@ package org.opensearch.remotestore; -import org.junit.After; import org.junit.Before; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import org.opensearch.action.index.IndexResponse; @@ -17,6 +17,7 @@ import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.core.util.FileSystemUtils; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; @@ -35,6 +36,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) @@ -76,8 +78,7 @@ private Settings remoteStoreIndexSettings(int numberOfReplicas) { .build(); } - @After - public void teardown() { + public void deleteRepo() { logger.info("--> Deleting the repository={}", REPOSITORY_NAME); assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); } @@ -107,6 +108,38 @@ public void testRemoteRefreshRetryOnFailure() throws Exception { Set filesInRepo = getSegmentFiles(segmentDataRepoPath); assertTrue(filesInRepo.containsAll(filesInLocal)); }, 60, TimeUnit.SECONDS); + deleteRepo(); + } + + public void testWritesRejected() { + Path location = randomRepoPath().toAbsolutePath(); + setup(location, 1d, "metadata"); + + Settings request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true).build(); + ClusterUpdateSettingsResponse clusterUpdateResponse = client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(request) + .get(); + assertEquals(clusterUpdateResponse.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "true"); + + logger.info("--> Indexing data"); + OpenSearchRejectedExecutionException ex = assertThrows( + OpenSearchRejectedExecutionException.class, + () -> indexData(randomIntBetween(10, 20), randomBoolean()) + ); + assertTrue(ex.getMessage().contains("rejected execution on primary shard")); + deleteRepo(); + } + + public void testRemoteRefreshSegmentPressureSettingChanged() { + Settings request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true).build(); + ClusterUpdateSettingsResponse response = client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get(); + assertEquals(response.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "true"); + + request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), false).build(); + response = client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get(); + assertEquals(response.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "false"); } private void setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList) { @@ -122,6 +155,8 @@ private void setup(Path repoLocation, double ioFailureRate, String skipException .put("random_control_io_exception_rate", ioFailureRate) .put("skip_exception_on_verification_file", true) .put("skip_exception_on_list_blobs", true) + // Skipping is required for metadata as it is part of recovery + .put("skip_exception_on_blobs", skipExceptionBlobList) .put("max_failure_number", Long.MAX_VALUE) ); diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index 6af512b011653..8ca90ab7ca577 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -76,6 +76,7 @@ import org.opensearch.common.lease.Releasable; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.common.xcontent.XContentHelper; @@ -88,6 +89,7 @@ import org.opensearch.index.mapper.MapperException; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.mapper.SourceToParse; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; @@ -135,6 +137,7 @@ public class TransportShardBulkAction extends TransportWriteAction globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, - final SegmentReplicationCheckpointPublisher checkpointPublisher + final SegmentReplicationCheckpointPublisher checkpointPublisher, + final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); /* @@ -506,7 +508,8 @@ public synchronized IndexShard createShard( circuitBreakerService, translogFactorySupplier, this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null, - remoteStore + remoteStore, + remoteRefreshSegmentPressureService ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java index 37935cc0eb29d..0ce5acc5abd41 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java @@ -87,11 +87,10 @@ public void afterIndexShardCreated(IndexShard indexShard) { @Override public void afterIndexShardClosed(ShardId shardId, IndexShard indexShard, Settings indexSettings) { - if (indexShard.indexSettings().isRemoteStoreEnabled() == false) { - return; + RemoteRefreshSegmentTracker remoteRefreshSegmentTracker = trackerMap.remove(shardId); + if (remoteRefreshSegmentTracker != null) { + logger.trace("Deleted tracker for shardId={}", shardId); } - trackerMap.remove(shardId); - logger.trace("Deleted tracker for shardId={}", shardId); } /** @@ -105,6 +104,10 @@ public boolean isSegmentsUploadBackpressureEnabled() { public void validateSegmentsUploadLag(ShardId shardId) { RemoteRefreshSegmentTracker remoteRefreshSegmentTracker = getRemoteRefreshSegmentTracker(shardId); + // This will be null for non-remote backed indexes + if (remoteRefreshSegmentTracker == null) { + return; + } // Check if refresh checkpoint (a.k.a. seq number) lag is 2 or below - this is to handle segment merges that can // increase the bytes to upload almost suddenly. if (remoteRefreshSegmentTracker.getRefreshSeqNoLag() <= 1) { diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java index 109eadf34509b..b595ef600ba7a 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java @@ -169,28 +169,28 @@ ShardId getShardId() { return shardId; } - long getLocalRefreshSeqNo() { + public long getLocalRefreshSeqNo() { return localRefreshSeqNo; } - void updateLocalRefreshSeqNo(long localRefreshSeqNo) { - assert localRefreshSeqNo > this.localRefreshSeqNo : "newLocalRefreshSeqNo=" + public void updateLocalRefreshSeqNo(long localRefreshSeqNo) { + assert localRefreshSeqNo >= this.localRefreshSeqNo : "newLocalRefreshSeqNo=" + localRefreshSeqNo - + ">=" + + " < " + "currentLocalRefreshSeqNo=" + this.localRefreshSeqNo; this.localRefreshSeqNo = localRefreshSeqNo; computeRefreshSeqNoLag(); } - long getLocalRefreshTimeMs() { + public long getLocalRefreshTimeMs() { return localRefreshTimeMs; } - void updateLocalRefreshTimeMs(long localRefreshTimeMs) { - assert localRefreshTimeMs > this.localRefreshTimeMs : "newLocalRefreshTimeMs=" + public void updateLocalRefreshTimeMs(long localRefreshTimeMs) { + assert localRefreshTimeMs >= this.localRefreshTimeMs : "newLocalRefreshTimeMs=" + localRefreshTimeMs - + ">=" + + " < " + "currentLocalRefreshTimeMs=" + this.localRefreshTimeMs; this.localRefreshTimeMs = localRefreshTimeMs; @@ -201,10 +201,10 @@ long getRemoteRefreshSeqNo() { return remoteRefreshSeqNo; } - void updateRemoteRefreshSeqNo(long remoteRefreshSeqNo) { - assert remoteRefreshSeqNo > this.remoteRefreshSeqNo : "newRemoteRefreshSeqNo=" + public void updateRemoteRefreshSeqNo(long remoteRefreshSeqNo) { + assert remoteRefreshSeqNo >= this.remoteRefreshSeqNo : "newRemoteRefreshSeqNo=" + remoteRefreshSeqNo - + ">=" + + " < " + "currentRemoteRefreshSeqNo=" + this.remoteRefreshSeqNo; this.remoteRefreshSeqNo = remoteRefreshSeqNo; @@ -215,10 +215,10 @@ long getRemoteRefreshTimeMs() { return remoteRefreshTimeMs; } - void updateRemoteRefreshTimeMs(long remoteRefreshTimeMs) { - assert remoteRefreshTimeMs > this.remoteRefreshTimeMs : "newRemoteRefreshTimeMs=" + public void updateRemoteRefreshTimeMs(long remoteRefreshTimeMs) { + assert remoteRefreshTimeMs >= this.remoteRefreshTimeMs : "newRemoteRefreshTimeMs=" + remoteRefreshTimeMs - + ">=" + + " < " + "currentRemoteRefreshTimeMs=" + this.remoteRefreshTimeMs; this.remoteRefreshTimeMs = remoteRefreshTimeMs; @@ -229,7 +229,7 @@ private void computeRefreshSeqNoLag() { refreshSeqNoLag = localRefreshSeqNo - remoteRefreshSeqNo; } - long getRefreshSeqNoLag() { + public long getRefreshSeqNoLag() { return refreshSeqNoLag; } @@ -237,73 +237,73 @@ private void computeTimeMsLag() { timeMsLag = localRefreshTimeMs - remoteRefreshTimeMs; } - long getTimeMsLag() { + public long getTimeMsLag() { return timeMsLag; } - long getBytesLag() { + public long getBytesLag() { return bytesLag; } - long getUploadBytesStarted() { + public long getUploadBytesStarted() { return uploadBytesStarted; } - void addUploadBytesStarted(long size) { + public void addUploadBytesStarted(long size) { uploadBytesStarted += size; } - long getUploadBytesFailed() { + public long getUploadBytesFailed() { return uploadBytesFailed; } - void addUploadBytesFailed(long size) { + public void addUploadBytesFailed(long size) { uploadBytesFailed += size; } - long getUploadBytesSucceeded() { + public long getUploadBytesSucceeded() { return uploadBytesSucceeded; } - void addUploadBytesSucceeded(long size) { + public void addUploadBytesSucceeded(long size) { uploadBytesSucceeded += size; } - long getInflightUploadBytes() { + public long getInflightUploadBytes() { return uploadBytesStarted - uploadBytesFailed - uploadBytesSucceeded; } - long getTotalUploadsStarted() { + public long getTotalUploadsStarted() { return totalUploadsStarted; } - void incrementTotalUploadsStarted() { + public void incrementTotalUploadsStarted() { totalUploadsStarted += 1; } - long getTotalUploadsFailed() { + public long getTotalUploadsFailed() { return totalUploadsFailed; } - void incrementTotalUploadsFailed() { + public void incrementTotalUploadsFailed() { totalUploadsFailed += 1; failures.record(true); } - long getTotalUploadsSucceeded() { + public long getTotalUploadsSucceeded() { return totalUploadsSucceeded; } - void incrementTotalUploadSucceeded() { + public void incrementTotalUploadsSucceeded() { totalUploadsSucceeded += 1; failures.record(false); } - long getInflightUploads() { + public long getInflightUploads() { return totalUploadsStarted - totalUploadsFailed - totalUploadsSucceeded; } - long getRejectionCount() { + public long getRejectionCount() { return rejectionCount.get(); } @@ -323,16 +323,22 @@ Map getLatestLocalFileNameLengthMap() { return latestLocalFileNameLengthMap; } - void setLatestLocalFileNameLengthMap(Map latestLocalFileNameLengthMap) { + public void setLatestLocalFileNameLengthMap(Map latestLocalFileNameLengthMap) { this.latestLocalFileNameLengthMap = latestLocalFileNameLengthMap; computeBytesLag(); } - void addToLatestUploadFiles(String file) { + public void addToLatestUploadFiles(String file) { this.latestUploadFiles.add(file); computeBytesLag(); } + public void setLatestUploadFiles(Set files) { + this.latestUploadFiles.clear(); + this.latestUploadFiles.addAll(files); + computeBytesLag(); + } + private void computeBytesLag() { if (latestLocalFileNameLengthMap == null || latestLocalFileNameLengthMap.isEmpty()) { return; @@ -356,7 +362,7 @@ boolean isUploadBytesAverageReady() { return uploadBytesMovingAverageReference.get().getAverage(); } - void addUploadBytes(long size) { + public void addUploadBytes(long size) { synchronized (uploadBytesMutex) { this.uploadBytesMovingAverageReference.get().record(size); } @@ -381,7 +387,7 @@ boolean isUploadBytesPerSecAverageReady() { return uploadBytesPerSecMovingAverageReference.get().getAverage(); } - void addUploadBytesPerSec(long bytesPerSec) { + public void addUploadBytesPerSec(long bytesPerSec) { synchronized (uploadBytesPerSecMutex) { this.uploadBytesPerSecMovingAverageReference.get().record(bytesPerSec); } @@ -406,7 +412,7 @@ boolean isUploadTimeMsAverageReady() { return uploadTimeMsMovingAverageReference.get().getAverage(); } - void addUploadTimeMs(long timeMs) { + public void addUploadTimeMs(long timeMs) { synchronized (uploadTimeMsMutex) { this.uploadTimeMsMovingAverageReference.get().record(timeMs); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index f2aa886aed374..94282f673398d 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -143,6 +143,7 @@ import org.opensearch.index.merge.MergeStats; import org.opensearch.index.recovery.RecoveryStats; import org.opensearch.index.refresh.RefreshStats; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.search.stats.SearchStats; import org.opensearch.index.search.stats.ShardSearchStats; import org.opensearch.index.seqno.ReplicationTracker; @@ -328,8 +329,8 @@ Runnable getGlobalCheckpointSyncer() { private volatile boolean useRetentionLeasesInPeerRecovery; private final Store remoteStore; private final BiFunction translogFactorySupplier; - private final boolean isTimeSeriesIndex; + private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; public IndexShard( final ShardRouting shardRouting, @@ -354,7 +355,8 @@ public IndexShard( final CircuitBreakerService circuitBreakerService, final BiFunction translogFactorySupplier, @Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher, - @Nullable final Store remoteStore + @Nullable final Store remoteStore, + final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -449,6 +451,7 @@ public boolean shouldCache(Query query) { this.isTimeSeriesIndex = (mapperService == null || mapperService.documentMapper() == null) ? false : mapperService.documentMapper().mappers().containsTimeStampField(); + this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService; } public ThreadPool getThreadPool() { @@ -3546,7 +3549,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro final List internalRefreshListener = new ArrayList<>(); internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric)); if (isRemoteStoreEnabled()) { - internalRefreshListener.add(new RemoteStoreRefreshListener(this)); + internalRefreshListener.add(new RemoteStoreRefreshListener(this, remoteRefreshSegmentPressureService)); } if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) { internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher)); diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 8672ba6c59a13..57d292f7ae369 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -25,6 +25,8 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.index.engine.EngineException; import org.opensearch.index.engine.InternalEngine; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteRefreshSegmentTracker; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.threadpool.Scheduler; @@ -44,6 +46,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import java.util.stream.Collectors; import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; @@ -84,6 +87,7 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres private final IndexShard indexShard; private final Directory storeDirectory; private final RemoteSegmentStoreDirectory remoteDirectory; + private final RemoteRefreshSegmentPressureService pressureService; private final Map localSegmentChecksumMap; private long primaryTerm; @@ -96,7 +100,17 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres private volatile Scheduler.ScheduledCancellable scheduledCancellableRetry; - public RemoteStoreRefreshListener(IndexShard indexShard) { + /** + * This keeps the pending refresh time which is set before the refresh. This is used to set the last refresh time. + */ + private volatile long pendingRefreshTimeMs; + + /** + * Keeps track of segment files and their size in bytes which are part of the most recent refresh. + */ + private final Map latestFileNameSizeOnLocalMap = new HashMap<>(); + + public RemoteStoreRefreshListener(IndexShard indexShard, RemoteRefreshSegmentPressureService pressureService) { this.indexShard = indexShard; this.storeDirectory = indexShard.store().directory(); this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()) @@ -110,12 +124,14 @@ public RemoteStoreRefreshListener(IndexShard indexShard) { logger.error("Exception while initialising RemoteSegmentStoreDirectory", e); } } + this.pressureService = pressureService; resetBackOffDelayIterator(); } @Override public void beforeRefresh() throws IOException { - // Do Nothing + // Set the pending refresh time which gets set on account of success refresh. + pendingRefreshTimeMs = System.nanoTime() / 1_000_000L; } /** @@ -126,6 +142,11 @@ public void beforeRefresh() throws IOException { */ @Override public void afterRefresh(boolean didRefresh) { + + if (didRefresh) { + updateLocalRefreshTimeAndSeqNo(); + } + try { indexShard.getThreadPool().executor(ThreadPool.Names.REMOTE_REFRESH).submit(() -> syncSegments(false)).get(); } catch (InterruptedException | ExecutionException e) { @@ -134,95 +155,122 @@ public void afterRefresh(boolean didRefresh) { } private synchronized void syncSegments(boolean isRetry) { - boolean shouldRetry = false; + if (indexShard.getReplicationTracker().isPrimaryMode() == false) { + return; + } beforeSegmentsSync(isRetry); + RemoteRefreshSegmentTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshSeqNo = segmentTracker.getLocalRefreshSeqNo(); + long bytesBeforeUpload = segmentTracker.getUploadBytesSucceeded(), startTimeInNS = System.nanoTime(); + boolean shouldRetry = true; try { - if (indexShard.getReplicationTracker().isPrimaryMode()) { - if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { - this.primaryTerm = indexShard.getOperationPrimaryTerm(); - this.remoteDirectory.init(); + // Start tracking total uploads started + segmentTracker.incrementTotalUploadsStarted(); + + if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { + this.primaryTerm = indexShard.getOperationPrimaryTerm(); + this.remoteDirectory.init(); + } + try { + // if a new segments_N file is present in local that is not uploaded to remote store yet, it + // is considered as a first refresh post commit. A cleanup of stale commit files is triggered. + // This is done to avoid delete post each refresh. + // Ideally, we want this to be done in async flow. (GitHub issue #4315) + if (isRefreshAfterCommit()) { + deleteStaleCommits(); } - try { - // if a new segments_N file is present in local that is not uploaded to remote store yet, it - // is considered as a first refresh post commit. A cleanup of stale commit files is triggered. - // This is done to avoid delete post each refresh. - // Ideally, we want this to be done in async flow. (GitHub issue #4315) - if (isRefreshAfterCommit()) { - deleteStaleCommits(); - } - String segmentInfoSnapshotFilename = null; - try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { - SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); - - Collection localSegmentsPostRefresh = segmentInfos.files(true); - - List segmentInfosFiles = localSegmentsPostRefresh.stream() - .filter(file -> file.startsWith(IndexFileNames.SEGMENTS)) - .collect(Collectors.toList()); - Optional latestSegmentInfos = segmentInfosFiles.stream() - .max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName)); - - if (latestSegmentInfos.isPresent()) { - // SegmentInfosSnapshot is a snapshot of reader's view of segments and may not contain - // all the segments from last commit if they are merged away but not yet committed. - // Each metadata file in the remote segment store represents a commit and the following - // statement keeps sure that each metadata will always contain all the segments from last commit + refreshed - // segments. - localSegmentsPostRefresh.addAll(SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true)); - segmentInfosFiles.stream() - .filter(file -> !file.equals(latestSegmentInfos.get())) - .forEach(localSegmentsPostRefresh::remove); - - boolean uploadStatus = uploadNewSegments(localSegmentsPostRefresh); - if (uploadStatus) { - segmentInfoSnapshotFilename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos); - localSegmentsPostRefresh.add(segmentInfoSnapshotFilename); - - remoteDirectory.uploadMetadata( - localSegmentsPostRefresh, - storeDirectory, - indexShard.getOperationPrimaryTerm(), - segmentInfos.getGeneration() - ); - localSegmentChecksumMap.keySet() - .stream() - .filter(file -> !localSegmentsPostRefresh.contains(file)) - .collect(Collectors.toSet()) - .forEach(localSegmentChecksumMap::remove); - OnSuccessfulSegmentsSync(); - final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); - indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1); - } else { - shouldRetry = true; - } + String segmentInfoSnapshotFilename = null; + try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { + SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); + + Collection localSegmentsPostRefresh = segmentInfos.files(true); + + List segmentInfosFiles = localSegmentsPostRefresh.stream() + .filter(file -> file.startsWith(IndexFileNames.SEGMENTS)) + .collect(Collectors.toList()); + Optional latestSegmentInfos = segmentInfosFiles.stream() + .max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName)); + + if (latestSegmentInfos.isPresent()) { + // SegmentInfosSnapshot is a snapshot of reader's view of segments and may not contain + // all the segments from last commit if they are merged away but not yet committed. + // Each metadata file in the remote segment store represents a commit and the following + // statement keeps sure that each metadata will always contain all the segments from last commit + refreshed + // segments. + localSegmentsPostRefresh.addAll(SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true)); + segmentInfosFiles.stream() + .filter(file -> !file.equals(latestSegmentInfos.get())) + .forEach(localSegmentsPostRefresh::remove); + + // Create a map of file name to size and update the refresh segment tracker + Map currentLocalSizeMap = createSizeMap(localSegmentsPostRefresh); + segmentTracker.setLatestLocalFileNameLengthMap(currentLocalSizeMap); + + // Start the segments files upload + boolean newSegmentsUploadStatus = uploadNewSegments(localSegmentsPostRefresh, segmentTracker, currentLocalSizeMap); + if (newSegmentsUploadStatus) { + segmentInfoSnapshotFilename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos); + localSegmentsPostRefresh.add(segmentInfoSnapshotFilename); + + // Start metadata file upload + remoteDirectory.uploadMetadata( + localSegmentsPostRefresh, + storeDirectory, + indexShard.getOperationPrimaryTerm(), + segmentInfos.getGeneration() + ); + // Update latest uploaded segment files name in segment tracker + segmentTracker.setLatestUploadFiles(currentLocalSizeMap.keySet()); + // Update the remote refresh time and refresh seq no + updateRemoteRefreshTimeAndSeqNo(segmentTracker, refreshTimeMs, refreshSeqNo); + clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh); + OnSuccessfulSegmentsSync(); + final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); + indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1); + // At this point since we have uploaded new segments, segment infos and segment metadata file, + // along with marking minSeqNoToKeep, upload has succeeded completely. + shouldRetry = false; } - } catch (EngineException e) { - shouldRetry = true; - logger.warn("Exception while reading SegmentInfosSnapshot", e); - } finally { - try { - if (segmentInfoSnapshotFilename != null) { - storeDirectory.deleteFile(segmentInfoSnapshotFilename); - } - } catch (IOException e) { - logger.warn("Exception while deleting: " + segmentInfoSnapshotFilename, e); + } + } catch (EngineException e) { + logger.warn("Exception while reading SegmentInfosSnapshot", e); + } finally { + try { + if (segmentInfoSnapshotFilename != null) { + storeDirectory.deleteFile(segmentInfoSnapshotFilename); } + } catch (IOException e) { + logger.warn("Exception while deleting: " + segmentInfoSnapshotFilename, e); } - } catch (IOException e) { - shouldRetry = true; - // We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried - // in the next refresh. This should not affect durability of the indexed data after remote trans-log integration. - logger.warn("Exception while uploading new segments to the remote segment store", e); } + } catch (IOException e) { + // We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried + // in the next refresh. This should not affect durability of the indexed data after remote trans-log integration. + logger.warn("Exception while uploading new segments to the remote segment store", e); } } catch (Throwable t) { - shouldRetry = true; logger.error("Exception in RemoteStoreRefreshListener.afterRefresh()", t); + } finally { + // Update the segment tracker with the final upload status as seen at the end + updateFinalUploadStatusInSegmentTracker(segmentTracker, shouldRetry == false, bytesBeforeUpload, startTimeInNS); } afterSegmentsSync(isRetry, shouldRetry); } + /** + * // Clears the stale files from the latest local segment checksum map. + * + * @param localSegmentsPostRefresh list of segment files present post refresh + */ + private void clearStaleFilesFromLocalSegmentChecksumMap(Collection localSegmentsPostRefresh) { + localSegmentChecksumMap.keySet() + .stream() + .filter(file -> !localSegmentsPostRefresh.contains(file)) + .collect(Collectors.toSet()) + .forEach(localSegmentChecksumMap::remove); + } + private void beforeSegmentsSync(boolean isRetry) { if (isRetry) { logger.info("Retrying to sync the segments to remote store"); @@ -293,9 +341,14 @@ String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos s } // Visible for testing - boolean uploadNewSegments(Collection localFiles) throws IOException { + boolean uploadNewSegments( + Collection localSegmentsPostRefresh, + RemoteRefreshSegmentTracker segmentTracker, + Map sizeMap + ) throws IOException { AtomicBoolean uploadSuccess = new AtomicBoolean(true); - localFiles.stream().filter(file -> !EXCLUDE_FILES.contains(file)).filter(file -> { + // Exclude files that are already uploaded and the exclude files to come up with the list of files to be uploaded. + List filesToUpload = localSegmentsPostRefresh.stream().filter(file -> !EXCLUDE_FILES.contains(file)).filter(file -> { try { return !remoteDirectory.containsFile(file, getChecksumOfLocalFile(file)); } catch (IOException e) { @@ -305,13 +358,31 @@ boolean uploadNewSegments(Collection localFiles) throws IOException { ); return true; } - }).forEach(file -> { + }).collect(Collectors.toList()); + + // Start tracking the upload bytes started + filesToUpload.forEach(file -> segmentTracker.addUploadBytesStarted(sizeMap.get(file))); + + // Starting the uploads now + filesToUpload.forEach(file -> { + boolean success = false; + long fileSize = sizeMap.get(file); try { + // Start upload to remote store remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT); + // Upload succeeded + segmentTracker.addUploadBytesSucceeded(fileSize); + segmentTracker.addToLatestUploadFiles(file); + success = true; } catch (IOException e) { - uploadSuccess.set(false); // ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397) logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e); + } finally { + if (success == false) { + uploadSuccess.set(false); + // Upload failed + segmentTracker.addUploadBytesFailed(fileSize); + } } }); return uploadSuccess.get(); @@ -334,4 +405,70 @@ private void deleteStaleCommits() { logger.info("Exception while deleting stale commits from remote segment store, will retry delete post next commit", e); } } + + /** + * Updates the last refresh time and refresh seq no which is seen by local store. + */ + private void updateLocalRefreshTimeAndSeqNo() { + RemoteRefreshSegmentTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + segmentTracker.updateLocalRefreshTimeMs(pendingRefreshTimeMs); + segmentTracker.updateLocalRefreshSeqNo(segmentTracker.getLocalRefreshSeqNo() + 1); + } + + /** + * Updates the last refresh time and refresh seq no which is seen by remote store. + */ + private void updateRemoteRefreshTimeAndSeqNo(RemoteRefreshSegmentTracker segmentTracker, long refreshTimeMs, long refreshSeqNo) { + segmentTracker.updateRemoteRefreshTimeMs(refreshTimeMs); + segmentTracker.updateRemoteRefreshSeqNo(refreshSeqNo); + } + + /** + * Returns map of file name to size of the input segment files. Tries to reuse existing information by caching the size + * data, otherwise uses {@code storeDirectory.fileLength(file)} to get the size. This method also removes from the map + * such files that are not present in the list of segment files given in the input. + * + * @param segmentFiles list of segment files for which size needs to be known + * @return the map as mentioned above + */ + private Map createSizeMap(Collection segmentFiles) { + // Create a map of file name to size + Map sizeMap = segmentFiles.stream() + .filter(file -> !EXCLUDE_FILES.contains(file)) + .collect(Collectors.toMap(Function.identity(), file -> { + if (latestFileNameSizeOnLocalMap.containsKey(file)) { + return latestFileNameSizeOnLocalMap.get(file); + } + long fileSize = 0; + try { + fileSize = storeDirectory.fileLength(file); + latestFileNameSizeOnLocalMap.put(file, fileSize); + } catch (IOException e) { + logger.warn(new ParameterizedMessage("Exception while reading the fileLength of file={}", file), e); + } + return fileSize; + })); + // Remove keys from the fileSizeMap that do not exist in the latest segment files + latestFileNameSizeOnLocalMap.entrySet().removeIf(entry -> sizeMap.containsKey(entry.getKey()) == false); + return sizeMap; + } + + private void updateFinalUploadStatusInSegmentTracker( + RemoteRefreshSegmentTracker statsTracker, + boolean uploadStatus, + long bytesBeforeUpload, + long startTimeInNS + ) { + if (uploadStatus) { + long bytesUploaded = statsTracker.getUploadBytesSucceeded() - bytesBeforeUpload; + long timeTakenInMS = (System.nanoTime() - startTimeInNS) / 1_000_000L; + + statsTracker.incrementTotalUploadsSucceeded(); + statsTracker.addUploadBytes(bytesUploaded); + statsTracker.addUploadBytesPerSec((bytesUploaded * 1_000L) / timeTakenInMS); + statsTracker.addUploadTimeMs(timeTakenInMS); + } else { + statsTracker.incrementTotalUploadsFailed(); + } + } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesModule.java b/server/src/main/java/org/opensearch/indices/IndicesModule.java index a5276350d582a..b868f6aa35aee 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesModule.java +++ b/server/src/main/java/org/opensearch/indices/IndicesModule.java @@ -37,6 +37,7 @@ import org.opensearch.action.admin.indices.rollover.MaxDocsCondition; import org.opensearch.action.admin.indices.rollover.MaxSizeCondition; import org.opensearch.action.resync.TransportResyncReplicationAction; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.ParseField; import org.opensearch.common.inject.AbstractModule; import org.opensearch.common.io.stream.NamedWriteableRegistry; @@ -69,6 +70,7 @@ import org.opensearch.index.mapper.SourceFieldMapper; import org.opensearch.index.mapper.TextFieldMapper; import org.opensearch.index.mapper.VersionFieldMapper; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.seqno.RetentionLeaseBackgroundSyncAction; import org.opensearch.index.seqno.RetentionLeaseSyncAction; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -286,6 +288,9 @@ protected void configure() { bind(RetentionLeaseSyncer.class).asEagerSingleton(); bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton(); bind(SegmentReplicationPressureService.class).asEagerSingleton(); + if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) { + bind(RemoteRefreshSegmentPressureService.class).asEagerSingleton(); + } } /** diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index b3843dfd114a9..cdad2c45638e5 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -120,6 +120,7 @@ import org.opensearch.index.query.QueryRewriteContext; import org.opensearch.index.recovery.RecoveryStats; import org.opensearch.index.refresh.RefreshStats; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.search.stats.SearchStats; import org.opensearch.index.seqno.RetentionLeaseStats; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -941,14 +942,21 @@ public IndexShard createShard( final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, - final DiscoveryNode sourceNode + final DiscoveryNode sourceNode, + final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); ensureChangesAllowed(); IndexService indexService = indexService(shardRouting.index()); assert indexService != null; RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode); - IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher); + IndexShard indexShard = indexService.createShard( + shardRouting, + globalCheckpointSyncer, + retentionLeaseSyncer, + checkpointPublisher, + remoteRefreshSegmentPressureService + ); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> { assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index f2a6583ae47bc..4a0fab82f9adc 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -56,6 +56,7 @@ import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.env.ShardLockObtainFailedException; @@ -64,6 +65,7 @@ import org.opensearch.index.IndexComponent; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.seqno.GlobalCheckpointSyncAction; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -146,6 +148,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final SegmentReplicationCheckpointPublisher checkpointPublisher; + private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; + @Inject public IndicesClusterStateService( final Settings settings, @@ -164,7 +168,8 @@ public IndicesClusterStateService( final PrimaryReplicaSyncer primaryReplicaSyncer, final GlobalCheckpointSyncAction globalCheckpointSyncAction, final RetentionLeaseSyncer retentionLeaseSyncer, - final SegmentReplicationCheckpointPublisher checkpointPublisher + final SegmentReplicationCheckpointPublisher checkpointPublisher, + final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService ) { this( settings, @@ -183,7 +188,8 @@ public IndicesClusterStateService( snapshotShardsService, primaryReplicaSyncer, globalCheckpointSyncAction::updateGlobalCheckpointForShard, - retentionLeaseSyncer + retentionLeaseSyncer, + remoteRefreshSegmentPressureService ); } @@ -205,7 +211,8 @@ public IndicesClusterStateService( final SnapshotShardsService snapshotShardsService, final PrimaryReplicaSyncer primaryReplicaSyncer, final Consumer globalCheckpointSyncer, - final RetentionLeaseSyncer retentionLeaseSyncer + final RetentionLeaseSyncer retentionLeaseSyncer, + final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService ) { this.settings = settings; this.checkpointPublisher = checkpointPublisher; @@ -215,6 +222,10 @@ public IndicesClusterStateService( ); indexEventListeners.add(segmentReplicationTargetService); indexEventListeners.add(segmentReplicationSourceService); + // if remote store feature is not enabled, do not wire the remote upload pressure service as an IndexEventListener. + if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) { + indexEventListeners.add(remoteRefreshSegmentPressureService); + } this.segmentReplicationTargetService = segmentReplicationTargetService; this.builtInIndexListener = Collections.unmodifiableList(indexEventListeners); this.indicesService = indicesService; @@ -228,6 +239,7 @@ public IndicesClusterStateService( this.globalCheckpointSyncer = globalCheckpointSyncer; this.retentionLeaseSyncer = Objects.requireNonNull(retentionLeaseSyncer); this.sendRefreshMapping = settings.getAsBoolean("indices.cluster.send_refresh_mapping", true); + this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService; } @Override @@ -657,7 +669,8 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR globalCheckpointSyncer, retentionLeaseSyncer, nodes.getLocalNode(), - sourceNode + sourceNode, + remoteRefreshSegmentPressureService ); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed to create shard", e, state); @@ -1015,7 +1028,8 @@ T createShard( Consumer globalCheckpointSyncer, RetentionLeaseSyncer retentionLeaseSyncer, DiscoveryNode targetNode, - @Nullable DiscoveryNode sourceNode + @Nullable DiscoveryNode sourceNode, + RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService ) throws IOException; /** diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java index d412b5383bc89..cc7b5cb8dc845 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java @@ -77,6 +77,7 @@ import org.opensearch.index.mapper.Mapping; import org.opensearch.index.mapper.MetadataFieldMapper; import org.opensearch.index.mapper.RootObjectMapper; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; @@ -1072,6 +1073,7 @@ public void testHandlePrimaryTermValidationRequestWithDifferentAllocationId() { mock(ActionFilters.class), mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), + mock(RemoteRefreshSegmentPressureService.class), mock(SystemIndices.class) ); action.handlePrimaryTermValidationRequest( @@ -1102,6 +1104,7 @@ public void testHandlePrimaryTermValidationRequestWithOlderPrimaryTerm() { mock(ActionFilters.class), mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), + mock(RemoteRefreshSegmentPressureService.class), mock(SystemIndices.class) ); action.handlePrimaryTermValidationRequest( @@ -1132,6 +1135,7 @@ public void testHandlePrimaryTermValidationRequestSuccess() { mock(ActionFilters.class), mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), + mock(RemoteRefreshSegmentPressureService.class), mock(SystemIndices.class) ); action.handlePrimaryTermValidationRequest( @@ -1173,6 +1177,7 @@ private TransportShardBulkAction createAction() { mock(ActionFilters.class), mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), + mock(RemoteRefreshSegmentPressureService.class), mock(SystemIndices.class) ); } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java index c5a6c0323a6f9..529f65ad21083 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java @@ -149,7 +149,7 @@ public void testValidateSegmentUploadLag() { e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments")); assertTrue(e.getMessage().contains("failure_streak_count:11 min_consecutive_failure_threshold:10")); - pressureTracker.incrementTotalUploadSucceeded(); + pressureTracker.incrementTotalUploadsSucceeded(); pressureService.validateSegmentsUploadLag(shardId); } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java index 48bc28e3a497d..1510623050d40 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java @@ -240,9 +240,9 @@ public void testIncrementTotalUploadSucceeded() { pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() ); - pressureTracker.incrementTotalUploadSucceeded(); + pressureTracker.incrementTotalUploadsSucceeded(); assertEquals(1, pressureTracker.getTotalUploadsSucceeded()); - pressureTracker.incrementTotalUploadSucceeded(); + pressureTracker.incrementTotalUploadsSucceeded(); assertEquals(2, pressureTracker.getTotalUploadsSucceeded()); } @@ -257,7 +257,7 @@ public void testGetInflightUploads() { assertEquals(1, pressureTracker.getInflightUploads()); pressureTracker.incrementTotalUploadsStarted(); assertEquals(2, pressureTracker.getInflightUploads()); - pressureTracker.incrementTotalUploadSucceeded(); + pressureTracker.incrementTotalUploadsSucceeded(); assertEquals(1, pressureTracker.getInflightUploads()); pressureTracker.incrementTotalUploadsFailed(); assertEquals(0, pressureTracker.getInflightUploads()); @@ -287,7 +287,7 @@ public void testGetConsecutiveFailureCount() { assertEquals(1, pressureTracker.getConsecutiveFailureCount()); pressureTracker.incrementTotalUploadsFailed(); assertEquals(2, pressureTracker.getConsecutiveFailureCount()); - pressureTracker.incrementTotalUploadSucceeded(); + pressureTracker.incrementTotalUploadsSucceeded(); assertEquals(0, pressureTracker.getConsecutiveFailureCount()); } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 84848bb87d634..04ca3c7c762bb 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -18,10 +18,15 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.collect.Tuple; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.index.engine.InternalEngineFactory; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteRefreshSegmentTracker; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.threadpool.ThreadPool; @@ -40,7 +45,9 @@ public class RemoteStoreRefreshListenerTests extends IndexShardTestCase { private IndexShard indexShard; + private ClusterService clusterService; private RemoteStoreRefreshListener remoteStoreRefreshListener; + private RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; public void setup(boolean primary, int numberOfDocs) throws IOException { indexShard = newStartedShard( @@ -52,7 +59,15 @@ public void setup(boolean primary, int numberOfDocs) throws IOException { indexDocs(1, numberOfDocs); indexShard.refresh("test"); - remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + remoteRefreshSegmentPressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); + remoteRefreshSegmentPressureService.afterIndexShardCreated(indexShard); + remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard, remoteRefreshSegmentPressureService); + remoteStoreRefreshListener.beforeRefresh(); } private void indexDocs(int startDocId, int numberOfDocs) throws IOException { @@ -63,11 +78,9 @@ private void indexDocs(int startDocId, int numberOfDocs) throws IOException { @After public void tearDown() throws Exception { - if (indexShard != null) { - Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) indexShard.store().directory()).getDelegate()).getDelegate(); - ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); - closeShards(indexShard); - } + Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) indexShard.store().directory()).getDelegate()).getDelegate(); + ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); + closeShards(indexShard); super.tearDown(); } @@ -253,7 +266,39 @@ public void testRefreshSuccessOnThirdAttemptAttempt() throws Exception { assertBusy(() -> assertEquals(0, successLatch.getCount())); } - private void mockIndexShardWithRetryAndScheduleRefresh( + public void testTrackerData() throws Exception { + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh(1); + RemoteStoreRefreshListener listener = tuple.v1(); + RemoteRefreshSegmentPressureService pressureService = tuple.v2(); + RemoteRefreshSegmentTracker tracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + assertNoLag(tracker); + indexDocs(100, randomIntBetween(100, 200)); + indexShard.refresh("test"); + listener.afterRefresh(true); + assertBusy(() -> assertNoLag(tracker)); + } + + private void assertNoLag(RemoteRefreshSegmentTracker tracker) { + assertEquals(0, tracker.getRefreshSeqNoLag()); + assertEquals(0, tracker.getBytesLag()); + assertEquals(0, tracker.getTimeMsLag()); + assertEquals(0, tracker.getRejectionCount()); + assertEquals(tracker.getUploadBytesStarted(), tracker.getUploadBytesSucceeded()); + assertTrue(tracker.getUploadBytesStarted() > 0); + assertEquals(0, tracker.getUploadBytesFailed()); + assertEquals(0, tracker.getInflightUploads()); + assertEquals(tracker.getTotalUploadsStarted(), tracker.getTotalUploadsSucceeded()); + assertTrue(tracker.getTotalUploadsStarted() > 0); + assertEquals(0, tracker.getTotalUploadsFailed()); + } + + private Tuple mockIndexShardWithRetryAndScheduleRefresh( + int succeedOnAttempt + ) throws IOException { + return mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, null, null); + } + + private Tuple mockIndexShardWithRetryAndScheduleRefresh( int succeedOnAttempt, CountDownLatch refreshCountLatch, CountDownLatch successLatch @@ -316,8 +361,22 @@ private void mockIndexShardWithRetryAndScheduleRefresh( return indexShard.getEngine(); }).when(shard).getEngine(); - RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard); - refreshListener.afterRefresh(false); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService = new RemoteRefreshSegmentPressureService( + clusterService, + Settings.EMPTY + ); + when(shard.indexSettings()).thenReturn(indexShard.indexSettings()); + when(shard.shardId()).thenReturn(indexShard.shardId()); + remoteRefreshSegmentPressureService.afterIndexShardCreated(shard); + RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard, remoteRefreshSegmentPressureService); + refreshListener.beforeRefresh(); + refreshListener.afterRefresh(true); + return Tuple.tuple(refreshListener, remoteRefreshSegmentPressureService); } private static class TestFilterDirectory extends FilterDirectory { @@ -325,7 +384,7 @@ private static class TestFilterDirectory extends FilterDirectory { /** * Sole constructor, typically called from sub-classes. * - * @param in + * @param in Directory */ protected TestFilterDirectory(Directory in) { super(in); diff --git a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 0989bf869f18e..213a22539971f 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -153,7 +153,8 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem newRouting, s -> {}, RetentionLeaseSyncer.EMPTY, - SegmentReplicationCheckpointPublisher.EMPTY + SegmentReplicationCheckpointPublisher.EMPTY, + null ); IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(5, counter.get()); diff --git a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 0619e3e3f62a2..c8e0460758df1 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -46,6 +46,7 @@ import org.opensearch.index.Index; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; @@ -262,7 +263,8 @@ public MockIndexShard createShard( final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, - final DiscoveryNode sourceNode + final DiscoveryNode sourceNode, + final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService ) throws IOException { failRandomly(); RecoveryState recoveryState = new RecoveryState(shardRouting, targetNode, sourceNode); diff --git a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 22a5194b50f6d..6fa39fea4cbd9 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -581,7 +581,8 @@ private IndicesClusterStateService createIndicesClusterStateService( null, primaryReplicaSyncer, s -> {}, - RetentionLeaseSyncer.EMPTY + RetentionLeaseSyncer.EMPTY, + null ); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 3f5ef4b824afa..01e931dfcb629 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -171,6 +171,7 @@ import org.opensearch.index.IndexingPressureService; import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.analysis.AnalysisRegistry; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.seqno.GlobalCheckpointSyncAction; import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.shard.PrimaryReplicaSyncer; @@ -1901,7 +1902,8 @@ public void onFailure(final Exception e) { actionFilters ), RetentionLeaseSyncer.EMPTY, - SegmentReplicationCheckpointPublisher.EMPTY + SegmentReplicationCheckpointPublisher.EMPTY, + mock(RemoteRefreshSegmentPressureService.class) ); Map actions = new HashMap<>(); final SystemIndices systemIndices = new SystemIndices(emptyMap()); @@ -1952,6 +1954,7 @@ public void onFailure(final Exception e) { mock(ShardStateAction.class), mock(ThreadPool.class) ), + mock(RemoteRefreshSegmentPressureService.class), new SystemIndices(emptyMap()) ); actions.put( diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index bb3b016560fa7..b609bfc8811e6 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -56,6 +56,7 @@ import org.opensearch.cluster.routing.ShardRoutingHelper; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.TestShardRouting; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.CheckedFunction; import org.opensearch.common.Nullable; import org.opensearch.common.UUIDs; @@ -89,6 +90,7 @@ import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.mapper.SourceToParse; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -169,6 +171,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; +import static org.opensearch.test.ClusterServiceUtils.createClusterService; /** * A base class for unit tests that need to create and shutdown {@link IndexShard} instances easily, @@ -204,12 +207,14 @@ public void onFailure(ReplicationState state, ReplicationFailedException e, bool protected ThreadPool threadPool; protected long primaryTerm; + protected ClusterService clusterService; @Override public void setUp() throws Exception { super.setUp(); threadPool = setUpThreadPool(); primaryTerm = randomIntBetween(1, 100); // use random but fixed term for creating shards + clusterService = createClusterService(threadPool); failOnShardFailures(); } @@ -221,6 +226,7 @@ protected ThreadPool setUpThreadPool() { public void tearDown() throws Exception { try { tearDownThreadPool(); + clusterService.close(); } finally { super.tearDown(); } @@ -564,8 +570,13 @@ protected IndexShard newShard( Collections.emptyList(), clusterSettings ); - if (remoteStore == null && indexSettings.isRemoteStoreEnabled()) { - remoteStore = createRemoteStore(createTempDir(), routing, indexMetadata); + + RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService = null; + if (indexSettings.isRemoteStoreEnabled()) { + if (remoteStore == null) { + remoteStore = createRemoteStore(createTempDir(), routing, indexMetadata); + } + remoteRefreshSegmentPressureService = new RemoteRefreshSegmentPressureService(clusterService, indexSettings.getSettings()); } final BiFunction translogFactorySupplier = (settings, shardRouting) -> { @@ -601,9 +612,13 @@ protected IndexShard newShard( breakerService, translogFactorySupplier, checkpointPublisher, - remoteStore + remoteStore, + remoteRefreshSegmentPressureService ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); + if (remoteRefreshSegmentPressureService != null) { + remoteRefreshSegmentPressureService.afterIndexShardCreated(indexShard); + } success = true; } finally { if (success == false) { diff --git a/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java index 0e47130e424cd..fcaf9f6c900d3 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java @@ -122,6 +122,8 @@ public long getFailureCount() { private final boolean skipExceptionOnListBlobs; + private final List skipExceptionOnBlobs; + private final boolean useLuceneCorruptionException; private final long maximumNumberOfFailures; @@ -182,6 +184,7 @@ public MockRepository( randomDataFileIOExceptionRate = metadata.settings().getAsDouble("random_data_file_io_exception_rate", 0.0); skipExceptionOnVerificationFile = metadata.settings().getAsBoolean("skip_exception_on_verification_file", false); skipExceptionOnListBlobs = metadata.settings().getAsBoolean("skip_exception_on_list_blobs", false); + skipExceptionOnBlobs = metadata.settings().getAsList("skip_exception_on_blobs"); useLuceneCorruptionException = metadata.settings().getAsBoolean("use_lucene_corruption", false); maximumNumberOfFailures = metadata.settings().getAsLong("max_failure_number", 100L); blockOnAnyFiles = metadata.settings().getAsBoolean("block_on_control", false); @@ -370,12 +373,14 @@ private int hashCode(String path) { private void maybeIOExceptionOrBlock(String blobName) throws IOException { if (INDEX_LATEST_BLOB.equals(blobName) // Condition 1 || skipExceptionOnVerificationFiles(blobName) // Condition 2 - || skipExceptionOnListBlobs(blobName)) { // Condition 3 + || skipExceptionOnListBlobs(blobName) // Condition 3 + || skipExceptionOnBlob(blobName)) { // Condition 4 // Condition 1 - Don't mess with the index.latest blob here, failures to write to it are ignored by // upstream logic and we have specific tests that cover the error handling around this blob. // Condition 2 & 3 - This condition has been added to allow creation of repository which throws IO // exception during normal remote store operations. However, if we fail during verification as well, // then we can not add the repository as well. + // Condition 4 - This condition allows to skip exception on specific blobName or blobPrefix return; } if (blobName.startsWith("__")) { @@ -582,5 +587,9 @@ private boolean isVerificationFile(String blobName) { private boolean skipExceptionOnListBlobs(String blobName) { return skipExceptionOnListBlobs && DUMMY_FILE_NAME_LIST_BLOBS.equals(blobName); } + + private boolean skipExceptionOnBlob(String blobName) { + return skipExceptionOnBlobs.contains(blobName); + } } } From 66ed34de9960a16fce1de7a6bdbcf2e005b7289a Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Thu, 18 May 2023 14:29:54 +0530 Subject: [PATCH 2/9] Remove refresh seq no lag Signed-off-by: Ashish Singh --- .../common/settings/ClusterSettings.java | 1 - .../RemoteRefreshSegmentPressureService.java | 42 +------------------ .../RemoteRefreshSegmentPressureSettings.java | 15 +------ ...teRefreshSegmentPressureSettingsTests.java | 18 +------- 4 files changed, 4 insertions(+), 72 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 0f50f8ae5fef2..c5808c8e3851a 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -643,7 +643,6 @@ public void apply(Settings value, Settings current, Settings previous) { // Settings related to Remote Refresh Segment Pressure RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED, - RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT, RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_FACTOR, RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR, RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT, diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java index 0ce5acc5abd41..ff71c5292f1fd 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java @@ -50,10 +50,9 @@ public class RemoteRefreshSegmentPressureService implements IndexEventListener { public RemoteRefreshSegmentPressureService(ClusterService clusterService, Settings settings) { pressureSettings = new RemoteRefreshSegmentPressureSettings(clusterService, settings, this); lagValidators = Arrays.asList( - new RefreshSeqNoLagValidator(pressureSettings), + new ConsecutiveFailureValidator(pressureSettings), new BytesLagValidator(pressureSettings), - new TimeLagValidator(pressureSettings), - new ConsecutiveFailureValidator(pressureSettings) + new TimeLagValidator(pressureSettings) ); } @@ -170,43 +169,6 @@ private LagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) { abstract String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId); } - /** - * Check if the remote store seq no lag is above the min seq no lag limit - * - * @opensearch.internal - */ - private static class RefreshSeqNoLagValidator extends LagValidator { - - private static final String NAME = "refresh_seq_no_lag"; - - private RefreshSeqNoLagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) { - super(pressureSettings); - } - - @Override - public boolean validate(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) { - // Check if the remote store seq no lag is above the min seq no lag limit - return pressureTracker.getRefreshSeqNoLag() <= pressureSettings.getMinRefreshSeqNoLagLimit(); - } - - @Override - String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) { - return String.format( - Locale.ROOT, - "rejected execution on primary shard:%s due to remote segments lagging behind local segments." - + "remote_refresh_seq_no:%s local_refresh_seq_no:%s", - shardId, - pressureTracker.getRemoteRefreshSeqNo(), - pressureTracker.getLocalRefreshSeqNo() - ); - } - - @Override - String name() { - return NAME; - } - } - /** * Check if the remote store is lagging more than the upload bytes average multiplied by a variance factor * diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettings.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettings.java index 6cb0d1d07e78b..3da21506d60e9 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettings.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettings.java @@ -21,12 +21,10 @@ public class RemoteRefreshSegmentPressureSettings { private static class Defaults { - private static final long MIN_SEQ_NO_LAG_LIMIT = 5; - private static final long MIN_SEQ_NO_LAG_LIMIT_MIN_VALUE = 2; private static final double BYTES_LAG_VARIANCE_FACTOR = 2.0; private static final double UPLOAD_TIME_LAG_VARIANCE_FACTOR = 2.0; private static final double VARIANCE_FACTOR_MIN_VALUE = 1.0; - private static final int MIN_CONSECUTIVE_FAILURES_LIMIT = 10; + private static final int MIN_CONSECUTIVE_FAILURES_LIMIT = 5; private static final int MIN_CONSECUTIVE_FAILURES_LIMIT_MIN_VALUE = 1; private static final int UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE = 20; private static final int UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE = 20; @@ -41,14 +39,6 @@ private static class Defaults { Setting.Property.NodeScope ); - public static final Setting MIN_SEQ_NO_LAG_LIMIT = Setting.longSetting( - "remote_store.segment.pressure.seq_no_lag.limit", - Defaults.MIN_SEQ_NO_LAG_LIMIT, - Defaults.MIN_SEQ_NO_LAG_LIMIT_MIN_VALUE, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - public static final Setting BYTES_LAG_VARIANCE_FACTOR = Setting.doubleSetting( "remote_store.segment.pressure.bytes_lag.variance_factor", Defaults.BYTES_LAG_VARIANCE_FACTOR, @@ -123,9 +113,6 @@ public RemoteRefreshSegmentPressureSettings( this.remoteRefreshSegmentPressureEnabled = REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.get(settings); clusterSettings.addSettingsUpdateConsumer(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED, this::setRemoteRefreshSegmentPressureEnabled); - this.minRefreshSeqNoLagLimit = MIN_SEQ_NO_LAG_LIMIT.get(settings); - clusterSettings.addSettingsUpdateConsumer(MIN_SEQ_NO_LAG_LIMIT, this::setMinRefreshSeqNoLagLimit); - this.bytesLagVarianceFactor = BYTES_LAG_VARIANCE_FACTOR.get(settings); clusterSettings.addSettingsUpdateConsumer(BYTES_LAG_VARIANCE_FACTOR, this::setBytesLagVarianceFactor); diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java index 66b5d6c4c19d8..b3c05a50ac881 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java @@ -54,9 +54,6 @@ public void testGetDefaultSettings() { // Check remote refresh segment pressure enabled is false assertFalse(pressureSettings.isRemoteRefreshSegmentPressureEnabled()); - // Check min sequence number lag limit default value - assertEquals(5L, pressureSettings.getMinRefreshSeqNoLagLimit()); - // Check bytes lag variance threshold default value assertEquals(2.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d); @@ -64,7 +61,7 @@ public void testGetDefaultSettings() { assertEquals(2.0, pressureSettings.getUploadTimeLagVarianceFactor(), 0.0d); // Check minimum consecutive failures limit default value - assertEquals(10, pressureSettings.getMinConsecutiveFailuresLimit()); + assertEquals(5, pressureSettings.getMinConsecutiveFailuresLimit()); // Check upload bytes moving average window size default value assertEquals(20, pressureSettings.getUploadBytesMovingAverageWindowSize()); @@ -79,7 +76,6 @@ public void testGetDefaultSettings() { public void testGetConfiguredSettings() { Settings settings = Settings.builder() .put(RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true) - .put(RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT.getKey(), 100) .put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 50.0) .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 60.0) .put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) @@ -96,9 +92,6 @@ public void testGetConfiguredSettings() { // Check remote refresh segment pressure enabled is true assertTrue(pressureSettings.isRemoteRefreshSegmentPressureEnabled()); - // Check min sequence number lag limit configured value - assertEquals(100L, pressureSettings.getMinRefreshSeqNoLagLimit()); - // Check bytes lag variance threshold configured value assertEquals(50.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d); @@ -127,7 +120,6 @@ public void testUpdateAfterGetDefaultSettings() { Settings newSettings = Settings.builder() .put(RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true) - .put(RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT.getKey(), 100) .put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 50.0) .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 60.0) .put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) @@ -140,9 +132,6 @@ public void testUpdateAfterGetDefaultSettings() { // Check updated remote refresh segment pressure enabled is false assertTrue(pressureSettings.isRemoteRefreshSegmentPressureEnabled()); - // Check min sequence number lag limit - assertEquals(100L, pressureSettings.getMinRefreshSeqNoLagLimit()); - // Check bytes lag variance threshold updated assertEquals(50.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d); @@ -165,7 +154,6 @@ public void testUpdateAfterGetDefaultSettings() { public void testUpdateAfterGetConfiguredSettings() { Settings settings = Settings.builder() .put(RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true) - .put(RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT.getKey(), 100) .put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 50.0) .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 60.0) .put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) @@ -180,7 +168,6 @@ public void testUpdateAfterGetConfiguredSettings() { ); Settings newSettings = Settings.builder() - .put(RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT.getKey(), 80) .put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 40.0) .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 50.0) .put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 111) @@ -194,9 +181,6 @@ public void testUpdateAfterGetConfiguredSettings() { // Check updated remote refresh segment pressure enabled is true assertTrue(pressureSettings.isRemoteRefreshSegmentPressureEnabled()); - // Check min sequence number lag limit - assertEquals(80L, pressureSettings.getMinRefreshSeqNoLagLimit()); - // Check bytes lag variance threshold updated assertEquals(40.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d); From 5250f5268ae603daaf0893ae03fd448c6f031c6f Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Thu, 18 May 2023 16:17:46 +0530 Subject: [PATCH 3/9] Remove seq no lag condition from tests Signed-off-by: Ashish Singh --- ...RemoteRefreshSegmentPressureServiceTests.java | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java index 529f65ad21083..920607ffc8b16 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java @@ -99,14 +99,10 @@ public void testValidateSegmentUploadLag() { pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); pressureService.afterIndexShardCreated(indexShard); - // 1. Seq no - add data points to the pressure tracker RemoteRefreshSegmentTracker pressureTracker = pressureService.getRemoteRefreshSegmentTracker(shardId); pressureTracker.updateLocalRefreshSeqNo(6); - Exception e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); - assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments")); - assertTrue(e.getMessage().contains("remote_refresh_seq_no:0 local_refresh_seq_no:6")); - // 2. time lag more than dynamic threshold + // 1. time lag more than dynamic threshold pressureTracker.updateRemoteRefreshSeqNo(3); AtomicLong sum = new AtomicLong(); IntStream.range(0, 20).forEach(i -> { @@ -117,14 +113,14 @@ public void testValidateSegmentUploadLag() { long currentMs = System.nanoTime() / 1_000_000; pressureTracker.updateLocalRefreshTimeMs((long) (currentMs + 4 * avg)); pressureTracker.updateRemoteRefreshTimeMs(currentMs); - e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); + Exception e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments")); assertTrue(e.getMessage().contains("time_lag:38 ms dynamic_time_lag_threshold:19.0 ms")); pressureTracker.updateRemoteRefreshTimeMs((long) (currentMs + 2 * avg)); pressureService.validateSegmentsUploadLag(shardId); - // 3. bytes lag more than dynamic threshold + // 2. bytes lag more than dynamic threshold sum.set(0); IntStream.range(0, 20).forEach(i -> { pressureTracker.addUploadBytes(i); @@ -142,13 +138,13 @@ public void testValidateSegmentUploadLag() { pressureTracker.setLatestLocalFileNameLengthMap(nameSizeMap); pressureService.validateSegmentsUploadLag(shardId); - // 4. Consecutive failures more than the limit - IntStream.range(0, 10).forEach(ignore -> pressureTracker.incrementTotalUploadsFailed()); + // 3. Consecutive failures more than the limit + IntStream.range(0, 5).forEach(ignore -> pressureTracker.incrementTotalUploadsFailed()); pressureService.validateSegmentsUploadLag(shardId); pressureTracker.incrementTotalUploadsFailed(); e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments")); - assertTrue(e.getMessage().contains("failure_streak_count:11 min_consecutive_failure_threshold:10")); + assertTrue(e.getMessage().contains("failure_streak_count:6 min_consecutive_failure_threshold:5")); pressureTracker.incrementTotalUploadsSucceeded(); pressureService.validateSegmentsUploadLag(shardId); } From 345409cc4a79b791af4638a094b1797522e0cfab Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Thu, 18 May 2023 18:23:20 +0530 Subject: [PATCH 4/9] Address review comments Signed-off-by: Ashish Singh --- ...emoteStoreMockRepositoryIntegTestCase.java | 149 +++++++++++++++++ .../RemoteStoreBackpressureIT.java | 43 +++++ .../RemoteStoreRefreshListenerIT.java | 155 +----------------- .../RemoteRefreshSegmentPressureService.java | 5 + .../remote/RemoteRefreshSegmentTracker.java | 14 +- .../opensearch/index/shard/IndexShard.java | 2 +- .../shard/RemoteStoreRefreshListener.java | 52 +++--- .../RemoteRefreshSegmentTrackerTests.java | 6 +- .../RemoteStoreRefreshListenerTests.java | 4 +- 9 files changed, 231 insertions(+), 199 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java new file mode 100644 index 0000000000000..316a4a68a081a --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java @@ -0,0 +1,149 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.junit.Before; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.UUIDs; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.util.FileSystemUtils; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.snapshots.AbstractSnapshotIntegTestCase; +import org.opensearch.test.FeatureFlagSetter; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +public abstract class AbstractRemoteStoreMockRepositoryIntegTestCase extends AbstractSnapshotIntegTestCase { + + protected static final String REPOSITORY_NAME = "my-segment-repo-1"; + protected static final String INDEX_NAME = "remote-store-test-idx-1"; + + @Override + protected Settings featureFlagSettings() { + return Settings.builder() + .put(super.featureFlagSettings()) + .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") + .put(FeatureFlags.REMOTE_STORE, "true") + .build(); + } + + @Before + public void setup() { + FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); + internalCluster().startClusterManagerOnlyNode(); + } + + @Override + public Settings indexSettings() { + return remoteStoreIndexSettings(0); + } + + protected Settings remoteStoreIndexSettings(int numberOfReplicas) { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME) + .build(); + } + + protected void deleteRepo() { + logger.info("--> Deleting the repository={}", REPOSITORY_NAME); + assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + } + + protected void setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList) { + logger.info("--> Creating repository={} at the path={}", REPOSITORY_NAME, repoLocation); + // The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in + /// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the + // repository creation can happen without failure. + createRepository( + REPOSITORY_NAME, + "mock", + Settings.builder() + .put("location", repoLocation) + .put("random_control_io_exception_rate", ioFailureRate) + .put("skip_exception_on_verification_file", true) + .put("skip_exception_on_list_blobs", true) + // Skipping is required for metadata as it is part of recovery + .put("skip_exception_on_blobs", skipExceptionBlobList) + .put("max_failure_number", Long.MAX_VALUE) + ); + + internalCluster().startDataOnlyNodes(1); + createIndex(INDEX_NAME); + logger.info("--> Created index={}", INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + logger.info("--> Cluster is yellow with no initializing shards"); + ensureGreen(INDEX_NAME); + logger.info("--> Cluster is green"); + } + + /** + * Gets all segment files which starts with "_". For instance, _0.cfe, _o.cfs etc. + * + * @param location the path to location where segment files are being searched. + * @return set of file names of all segment file or empty set if there was IOException thrown. + */ + protected Set getSegmentFiles(Path location) { + try { + return Arrays.stream(FileSystemUtils.files(location)) + .filter(path -> path.getFileName().toString().startsWith("_")) + .map(path -> path.getFileName().toString()) + .map(this::getLocalSegmentFilename) + .collect(Collectors.toSet()); + } catch (IOException exception) { + logger.error("Exception occurred while getting segment files", exception); + } + return Collections.emptySet(); + } + + private String getLocalSegmentFilename(String remoteFilename) { + return remoteFilename.split(RemoteSegmentStoreDirectory.SEGMENT_NAME_UUID_SEPARATOR)[0]; + } + + private IndexResponse indexSingleDoc() { + return client().prepareIndex(INDEX_NAME) + .setId(UUIDs.randomBase64UUID()) + .setSource(randomAlphaOfLength(5), randomAlphaOfLength(5)) + .get(); + } + + protected void indexData(int numberOfIterations, boolean invokeFlush) { + logger.info("--> Indexing data for {} iterations with flush={}", numberOfIterations, invokeFlush); + for (int i = 0; i < numberOfIterations; i++) { + int numberOfOperations = randomIntBetween(1, 5); + logger.info("--> Indexing {} operations in iteration #{}", numberOfOperations, i); + for (int j = 0; j < numberOfOperations; j++) { + indexSingleDoc(); + } + if (invokeFlush) { + flush(INDEX_NAME); + } else { + refresh(INDEX_NAME); + } + } + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java new file mode 100644 index 0000000000000..c3e4af0f05e35 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; + +import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteStoreBackpressureIT extends AbstractRemoteStoreMockRepositoryIntegTestCase { + + public void testWritesRejected() { + Path location = randomRepoPath().toAbsolutePath(); + setup(location, 1d, "metadata"); + + Settings request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true).build(); + ClusterUpdateSettingsResponse clusterUpdateResponse = client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(request) + .get(); + assertEquals(clusterUpdateResponse.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "true"); + + logger.info("--> Indexing data"); + OpenSearchRejectedExecutionException ex = assertThrows( + OpenSearchRejectedExecutionException.class, + () -> indexData(randomIntBetween(10, 20), randomBoolean()) + ); + assertTrue(ex.getMessage().contains("rejected execution on primary shard")); + deleteRepo(); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java index 3833ed6e4e1bd..eb95c2a270d1a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java @@ -8,80 +8,21 @@ package org.opensearch.remotestore; -import org.junit.Before; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; -import org.opensearch.action.index.IndexResponse; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.FeatureFlags; -import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; -import org.opensearch.core.util.FileSystemUtils; -import org.opensearch.index.IndexModule; -import org.opensearch.index.IndexSettings; -import org.opensearch.index.store.RemoteSegmentStoreDirectory; -import org.opensearch.indices.replication.common.ReplicationType; -import org.opensearch.snapshots.AbstractSnapshotIntegTestCase; -import org.opensearch.test.FeatureFlagSetter; import org.opensearch.test.OpenSearchIntegTestCase; -import java.io.IOException; import java.nio.file.Path; -import java.util.Arrays; -import java.util.Collections; import java.util.Locale; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class RemoteStoreRefreshListenerIT extends AbstractSnapshotIntegTestCase { - - private static final String REPOSITORY_NAME = "my-segment-repo-1"; - private static final String INDEX_NAME = "remote-store-test-idx-1"; - - @Override - protected Settings featureFlagSettings() { - return Settings.builder() - .put(super.featureFlagSettings()) - .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") - .put(FeatureFlags.REMOTE_STORE, "true") - .build(); - } - - @Before - public void setup() { - FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); - internalCluster().startClusterManagerOnlyNode(); - } - - @Override - public Settings indexSettings() { - return remoteStoreIndexSettings(0); - } - - private Settings remoteStoreIndexSettings(int numberOfReplicas) { - return Settings.builder() - .put(super.indexSettings()) - .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s") - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) - .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) - .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME) - .build(); - } - - public void deleteRepo() { - logger.info("--> Deleting the repository={}", REPOSITORY_NAME); - assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); - } +public class RemoteStoreRefreshListenerIT extends AbstractRemoteStoreMockRepositoryIntegTestCase { public void testRemoteRefreshRetryOnFailure() throws Exception { @@ -111,27 +52,6 @@ public void testRemoteRefreshRetryOnFailure() throws Exception { deleteRepo(); } - public void testWritesRejected() { - Path location = randomRepoPath().toAbsolutePath(); - setup(location, 1d, "metadata"); - - Settings request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true).build(); - ClusterUpdateSettingsResponse clusterUpdateResponse = client().admin() - .cluster() - .prepareUpdateSettings() - .setPersistentSettings(request) - .get(); - assertEquals(clusterUpdateResponse.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "true"); - - logger.info("--> Indexing data"); - OpenSearchRejectedExecutionException ex = assertThrows( - OpenSearchRejectedExecutionException.class, - () -> indexData(randomIntBetween(10, 20), randomBoolean()) - ); - assertTrue(ex.getMessage().contains("rejected execution on primary shard")); - deleteRepo(); - } - public void testRemoteRefreshSegmentPressureSettingChanged() { Settings request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true).build(); ClusterUpdateSettingsResponse response = client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get(); @@ -141,77 +61,4 @@ public void testRemoteRefreshSegmentPressureSettingChanged() { response = client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get(); assertEquals(response.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "false"); } - - private void setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList) { - logger.info("--> Creating repository={} at the path={}", REPOSITORY_NAME, repoLocation); - // The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in - /// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the - // repository creation can happen without failure. - createRepository( - REPOSITORY_NAME, - "mock", - Settings.builder() - .put("location", repoLocation) - .put("random_control_io_exception_rate", ioFailureRate) - .put("skip_exception_on_verification_file", true) - .put("skip_exception_on_list_blobs", true) - // Skipping is required for metadata as it is part of recovery - .put("skip_exception_on_blobs", skipExceptionBlobList) - .put("max_failure_number", Long.MAX_VALUE) - ); - - internalCluster().startDataOnlyNodes(1); - createIndex(INDEX_NAME); - logger.info("--> Created index={}", INDEX_NAME); - ensureYellowAndNoInitializingShards(INDEX_NAME); - logger.info("--> Cluster is yellow with no initializing shards"); - ensureGreen(INDEX_NAME); - logger.info("--> Cluster is green"); - } - - /** - * Gets all segment files which starts with "_". For instance, _0.cfe, _o.cfs etc. - * - * @param location the path to location where segment files are being searched. - * @return set of file names of all segment file or empty set if there was IOException thrown. - */ - private Set getSegmentFiles(Path location) { - try { - return Arrays.stream(FileSystemUtils.files(location)) - .filter(path -> path.getFileName().toString().startsWith("_")) - .map(path -> path.getFileName().toString()) - .map(this::getLocalSegmentFilename) - .collect(Collectors.toSet()); - } catch (IOException exception) { - logger.error("Exception occurred while getting segment files", exception); - } - return Collections.emptySet(); - } - - private String getLocalSegmentFilename(String remoteFilename) { - return remoteFilename.split(RemoteSegmentStoreDirectory.SEGMENT_NAME_UUID_SEPARATOR)[0]; - } - - private IndexResponse indexSingleDoc() { - return client().prepareIndex(INDEX_NAME) - .setId(UUIDs.randomBase64UUID()) - .setSource(randomAlphaOfLength(5), randomAlphaOfLength(5)) - .get(); - } - - private void indexData(int numberOfIterations, boolean invokeFlush) { - logger.info("--> Indexing data for {} iterations with flush={}", numberOfIterations, invokeFlush); - for (int i = 0; i < numberOfIterations; i++) { - int numberOfOperations = randomIntBetween(1, 5); - logger.info("--> Indexing {} operations in iteration #{}", numberOfOperations, i); - for (int j = 0; j < numberOfOperations; j++) { - indexSingleDoc(); - } - if (invokeFlush) { - flush(INDEX_NAME); - } else { - refresh(INDEX_NAME); - } - } - } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java index ff71c5292f1fd..d29db2820b8b4 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java @@ -101,6 +101,11 @@ public boolean isSegmentsUploadBackpressureEnabled() { return pressureSettings.isRemoteRefreshSegmentPressureEnabled(); } + /** + * Validates if segments are lagging more than the limits. If yes, it would lead to rejections of the requests. + * + * @param shardId shardId for which the validation needs to be done. + */ public void validateSegmentsUploadLag(ShardId shardId) { RemoteRefreshSegmentTracker remoteRefreshSegmentTracker = getRemoteRefreshSegmentTracker(shardId); // This will be null for non-remote backed indexes diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java index 82aa2cf76b53e..800cf176548a0 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java @@ -120,7 +120,7 @@ public class RemoteRefreshSegmentTracker { /** * Set of names of segment files that were uploaded as part of the most recent remote refresh. */ - private final Set latestUploadFiles = new HashSet<>(); + private final Set latestUploadedFiles = new HashSet<>(); /** * Keeps the bytes lag computed so that we do not compute it for every request. @@ -340,14 +340,14 @@ public void setLatestLocalFileNameLengthMap(Map latestLocalFileNam computeBytesLag(); } - public void addToLatestUploadFiles(String file) { - this.latestUploadFiles.add(file); + public void addToLatestUploadedFiles(String file) { + this.latestUploadedFiles.add(file); computeBytesLag(); } - public void setLatestUploadFiles(Set files) { - this.latestUploadFiles.clear(); - this.latestUploadFiles.addAll(files); + public void setLatestUploadedFiles(Set files) { + this.latestUploadedFiles.clear(); + this.latestUploadedFiles.addAll(files); computeBytesLag(); } @@ -357,7 +357,7 @@ private void computeBytesLag() { } Set filesNotYetUploaded = latestLocalFileNameLengthMap.keySet() .stream() - .filter(f -> !latestUploadFiles.contains(f)) + .filter(f -> !latestUploadedFiles.contains(f)) .collect(Collectors.toSet()); this.bytesLag = filesNotYetUploaded.stream().map(latestLocalFileNameLengthMap::get).mapToLong(Long::longValue).sum(); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 031c14e44810b..8b542be222f25 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3554,7 +3554,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro this, // Add the checkpoint publisher if the Segment Replciation via remote store is enabled. indexSettings.isSegRepWithRemoteEnabled() ? this.checkpointPublisher : SegmentReplicationCheckpointPublisher.EMPTY, - remoteRefreshSegmentPressureService + remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId()) ) ); } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 66458daac3594..eb40b17e156c2 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -25,14 +25,13 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.index.engine.EngineException; import org.opensearch.index.engine.InternalEngine; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.remote.RemoteRefreshSegmentTracker; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.store.RemoteSegmentStoreDirectory; -import org.opensearch.threadpool.Scheduler; -import org.opensearch.threadpool.ThreadPool; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; +import org.opensearch.threadpool.Scheduler; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Collection; @@ -89,7 +88,7 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres private final IndexShard indexShard; private final Directory storeDirectory; private final RemoteSegmentStoreDirectory remoteDirectory; - private final RemoteRefreshSegmentPressureService pressureService; + private final RemoteRefreshSegmentTracker segmentTracker; private final Map localSegmentChecksumMap; private long primaryTerm; @@ -117,7 +116,7 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres public RemoteStoreRefreshListener( IndexShard indexShard, SegmentReplicationCheckpointPublisher checkpointPublisher, - RemoteRefreshSegmentPressureService pressureService + RemoteRefreshSegmentTracker segmentTracker ) { this.indexShard = indexShard; this.storeDirectory = indexShard.store().directory(); @@ -132,7 +131,7 @@ public RemoteStoreRefreshListener( logger.error("Exception while initialising RemoteSegmentStoreDirectory", e); } } - this.pressureService = pressureService; + this.segmentTracker = segmentTracker; resetBackOffDelayIterator(); this.checkpointPublisher = checkpointPublisher; } @@ -168,7 +167,6 @@ private synchronized void syncSegments(boolean isRetry) { return; } beforeSegmentsSync(isRetry); - RemoteRefreshSegmentTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshSeqNo = segmentTracker.getLocalRefreshSeqNo(); long bytesBeforeUpload = segmentTracker.getUploadBytesSucceeded(), startTimeInNS = System.nanoTime(); boolean shouldRetry = true; @@ -219,7 +217,7 @@ private synchronized void syncSegments(boolean isRetry) { segmentTracker.setLatestLocalFileNameLengthMap(currentLocalSizeMap); // Start the segments files upload - boolean newSegmentsUploadStatus = uploadNewSegments(localSegmentsPostRefresh, segmentTracker, currentLocalSizeMap); + boolean newSegmentsUploadStatus = uploadNewSegments(localSegmentsPostRefresh, currentLocalSizeMap); if (newSegmentsUploadStatus) { segmentInfoSnapshotFilename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos); localSegmentsPostRefresh.add(segmentInfoSnapshotFilename); @@ -232,9 +230,9 @@ private synchronized void syncSegments(boolean isRetry) { segmentInfos.getGeneration() ); // Update latest uploaded segment files name in segment tracker - segmentTracker.setLatestUploadFiles(currentLocalSizeMap.keySet()); + segmentTracker.setLatestUploadedFiles(currentLocalSizeMap.keySet()); // Update the remote refresh time and refresh seq no - updateRemoteRefreshTimeAndSeqNo(segmentTracker, refreshTimeMs, refreshSeqNo); + updateRemoteRefreshTimeAndSeqNo(refreshTimeMs, refreshSeqNo); clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh); onSuccessfulSegmentsSync(); final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); @@ -265,13 +263,13 @@ private synchronized void syncSegments(boolean isRetry) { logger.error("Exception in RemoteStoreRefreshListener.afterRefresh()", t); } finally { // Update the segment tracker with the final upload status as seen at the end - updateFinalUploadStatusInSegmentTracker(segmentTracker, shouldRetry == false, bytesBeforeUpload, startTimeInNS); + updateFinalUploadStatusInSegmentTracker(shouldRetry == false, bytesBeforeUpload, startTimeInNS); } afterSegmentsSync(isRetry, shouldRetry); } /** - * // Clears the stale files from the latest local segment checksum map. + * Clears the stale files from the latest local segment checksum map. * * @param localSegmentsPostRefresh list of segment files present post refresh */ @@ -353,11 +351,7 @@ String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos s } // Visible for testing - boolean uploadNewSegments( - Collection localSegmentsPostRefresh, - RemoteRefreshSegmentTracker segmentTracker, - Map sizeMap - ) throws IOException { + boolean uploadNewSegments(Collection localSegmentsPostRefresh, Map sizeMap) throws IOException { AtomicBoolean uploadSuccess = new AtomicBoolean(true); // Exclude files that are already uploaded and the exclude files to come up with the list of files to be uploaded. List filesToUpload = localSegmentsPostRefresh.stream().filter(file -> !EXCLUDE_FILES.contains(file)).filter(file -> { @@ -384,7 +378,7 @@ boolean uploadNewSegments( remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT); // Upload succeeded segmentTracker.addUploadBytesSucceeded(fileSize); - segmentTracker.addToLatestUploadFiles(file); + segmentTracker.addToLatestUploadedFiles(file); success = true; } catch (IOException e) { // ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397) @@ -422,7 +416,6 @@ private void deleteStaleCommits() { * Updates the last refresh time and refresh seq no which is seen by local store. */ private void updateLocalRefreshTimeAndSeqNo() { - RemoteRefreshSegmentTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); segmentTracker.updateLocalRefreshTimeMs(pendingRefreshTimeMs); segmentTracker.updateLocalRefreshSeqNo(segmentTracker.getLocalRefreshSeqNo() + 1); } @@ -430,7 +423,7 @@ private void updateLocalRefreshTimeAndSeqNo() { /** * Updates the last refresh time and refresh seq no which is seen by remote store. */ - private void updateRemoteRefreshTimeAndSeqNo(RemoteRefreshSegmentTracker segmentTracker, long refreshTimeMs, long refreshSeqNo) { + private void updateRemoteRefreshTimeAndSeqNo(long refreshTimeMs, long refreshSeqNo) { segmentTracker.updateRemoteRefreshTimeMs(refreshTimeMs); segmentTracker.updateRemoteRefreshSeqNo(refreshSeqNo); } @@ -465,22 +458,17 @@ private Map createSizeMap(Collection segmentFiles) { return sizeMap; } - private void updateFinalUploadStatusInSegmentTracker( - RemoteRefreshSegmentTracker statsTracker, - boolean uploadStatus, - long bytesBeforeUpload, - long startTimeInNS - ) { + private void updateFinalUploadStatusInSegmentTracker(boolean uploadStatus, long bytesBeforeUpload, long startTimeInNS) { if (uploadStatus) { - long bytesUploaded = statsTracker.getUploadBytesSucceeded() - bytesBeforeUpload; + long bytesUploaded = segmentTracker.getUploadBytesSucceeded() - bytesBeforeUpload; long timeTakenInMS = (System.nanoTime() - startTimeInNS) / 1_000_000L; - statsTracker.incrementTotalUploadsSucceeded(); - statsTracker.addUploadBytes(bytesUploaded); - statsTracker.addUploadBytesPerSec((bytesUploaded * 1_000L) / timeTakenInMS); - statsTracker.addUploadTimeMs(timeTakenInMS); + segmentTracker.incrementTotalUploadsSucceeded(); + segmentTracker.addUploadBytes(bytesUploaded); + segmentTracker.addUploadBytesPerSec((bytesUploaded * 1_000L) / timeTakenInMS); + segmentTracker.addUploadTimeMs(timeTakenInMS); } else { - statsTracker.incrementTotalUploadsFailed(); + segmentTracker.incrementTotalUploadsFailed(); } } } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java index e06b800237750..4360fc0fe4011 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java @@ -309,17 +309,17 @@ public void testComputeBytesLag() { pressureTracker.setLatestLocalFileNameLengthMap(fileSizeMap); assertEquals(205L, pressureTracker.getBytesLag()); - pressureTracker.addToLatestUploadFiles("a"); + pressureTracker.addToLatestUploadedFiles("a"); assertEquals(105L, pressureTracker.getBytesLag()); fileSizeMap.put("c", 115L); pressureTracker.setLatestLocalFileNameLengthMap(fileSizeMap); assertEquals(220L, pressureTracker.getBytesLag()); - pressureTracker.addToLatestUploadFiles("b"); + pressureTracker.addToLatestUploadedFiles("b"); assertEquals(115L, pressureTracker.getBytesLag()); - pressureTracker.addToLatestUploadFiles("c"); + pressureTracker.addToLatestUploadedFiles("c"); assertEquals(0L, pressureTracker.getBytesLag()); } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 39bbca17615b7..0300c0880e990 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -70,7 +70,7 @@ public void setup(boolean primary, int numberOfDocs) throws IOException { remoteStoreRefreshListener = new RemoteStoreRefreshListener( indexShard, SegmentReplicationCheckpointPublisher.EMPTY, - remoteRefreshSegmentPressureService + remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()) ); remoteStoreRefreshListener.beforeRefresh(); } @@ -381,7 +381,7 @@ private Tuple m RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener( shard, SegmentReplicationCheckpointPublisher.EMPTY, - remoteRefreshSegmentPressureService + remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()) ); refreshListener.beforeRefresh(); refreshListener.afterRefresh(true); From 0d87c8c46ec5cc40da995face2faefc8e0531384 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Thu, 18 May 2023 23:32:29 +0530 Subject: [PATCH 5/9] Address review comments Signed-off-by: Ashish Singh --- .../shard/RemoteStoreRefreshListener.java | 226 ++++++++++++------ .../RemoteStoreRefreshListenerTests.java | 2 - 2 files changed, 153 insertions(+), 75 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index eb40b17e156c2..88b71a92d7340 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -21,8 +21,10 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.opensearch.action.bulk.BackoffPolicy; +import org.opensearch.common.CheckedFunction; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.index.engine.EngineException; import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.remote.RemoteRefreshSegmentTracker; @@ -38,6 +40,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -47,7 +50,6 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; import java.util.stream.Collectors; import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; @@ -101,18 +103,15 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres private volatile Scheduler.ScheduledCancellable scheduledCancellableRetry; - /** - * This keeps the pending refresh time which is set before the refresh. This is used to set the last refresh time. - */ - private volatile long pendingRefreshTimeMs; - /** * Keeps track of segment files and their size in bytes which are part of the most recent refresh. */ - private final Map latestFileNameSizeOnLocalMap = new HashMap<>(); + private final Map latestFileNameSizeOnLocalMap = ConcurrentCollections.newConcurrentMap(); private final SegmentReplicationCheckpointPublisher checkpointPublisher; + private final FileUploader fileUploader; + public RemoteStoreRefreshListener( IndexShard indexShard, SegmentReplicationCheckpointPublisher checkpointPublisher, @@ -134,13 +133,30 @@ public RemoteStoreRefreshListener( this.segmentTracker = segmentTracker; resetBackOffDelayIterator(); this.checkpointPublisher = checkpointPublisher; + this.fileUploader = new FileUploader(new UploadTracker() { + @Override + public void beforeUpload(String file) { + // Start tracking the upload bytes started + segmentTracker.addUploadBytesStarted(latestFileNameSizeOnLocalMap.get(file)); + } + + @Override + public void onSuccess(String file) { + // Track upload success + segmentTracker.addUploadBytesSucceeded(latestFileNameSizeOnLocalMap.get(file)); + segmentTracker.addToLatestUploadedFiles(file); + } + + @Override + public void onFailure(String file) { + // Track upload failure + segmentTracker.addUploadBytesFailed(latestFileNameSizeOnLocalMap.get(file)); + } + }, remoteDirectory, storeDirectory, this::getChecksumOfLocalFile); } @Override - public void beforeRefresh() throws IOException { - // Set the pending refresh time which gets set on account of success refresh. - pendingRefreshTimeMs = System.nanoTime() / 1_000_000L; - } + public void beforeRefresh() throws IOException {} /** * Upload new segment files created as part of the last refresh to the remote segment store. @@ -171,8 +187,6 @@ private synchronized void syncSegments(boolean isRetry) { long bytesBeforeUpload = segmentTracker.getUploadBytesSucceeded(), startTimeInNS = System.nanoTime(); boolean shouldRetry = true; try { - // Start tracking total uploads started - segmentTracker.incrementTotalUploadsStarted(); if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { this.primaryTerm = indexShard.getOperationPrimaryTerm(); @@ -193,6 +207,7 @@ private synchronized void syncSegments(boolean isRetry) { // Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can // move. ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint(); + long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); Collection localSegmentsPostRefresh = segmentInfos.files(true); List segmentInfosFiles = localSegmentsPostRefresh.stream() @@ -213,11 +228,10 @@ private synchronized void syncSegments(boolean isRetry) { .forEach(localSegmentsPostRefresh::remove); // Create a map of file name to size and update the refresh segment tracker - Map currentLocalSizeMap = createSizeMap(localSegmentsPostRefresh); - segmentTracker.setLatestLocalFileNameLengthMap(currentLocalSizeMap); + updateLocalSizeMapAndTracker(localSegmentsPostRefresh); // Start the segments files upload - boolean newSegmentsUploadStatus = uploadNewSegments(localSegmentsPostRefresh, currentLocalSizeMap); + boolean newSegmentsUploadStatus = uploadNewSegments(localSegmentsPostRefresh); if (newSegmentsUploadStatus) { segmentInfoSnapshotFilename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos); localSegmentsPostRefresh.add(segmentInfoSnapshotFilename); @@ -229,13 +243,8 @@ private synchronized void syncSegments(boolean isRetry) { indexShard.getOperationPrimaryTerm(), segmentInfos.getGeneration() ); - // Update latest uploaded segment files name in segment tracker - segmentTracker.setLatestUploadedFiles(currentLocalSizeMap.keySet()); - // Update the remote refresh time and refresh seq no - updateRemoteRefreshTimeAndSeqNo(refreshTimeMs, refreshSeqNo); clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh); - onSuccessfulSegmentsSync(); - final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); + onSuccessfulSegmentsSync(refreshTimeMs, refreshSeqNo); indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1); checkpointPublisher.publish(indexShard, checkpoint); // At this point since we have uploaded new segments, segment infos and segment metadata file, @@ -285,9 +294,15 @@ private void beforeSegmentsSync(boolean isRetry) { if (isRetry) { logger.info("Retrying to sync the segments to remote store"); } + // Start tracking total uploads started + segmentTracker.incrementTotalUploadsStarted(); } - private void onSuccessfulSegmentsSync() { + private void onSuccessfulSegmentsSync(long refreshTimeMs, long refreshSeqNo) { + // Update latest uploaded segment files name in segment tracker + segmentTracker.setLatestUploadedFiles(latestFileNameSizeOnLocalMap.keySet()); + // Update the remote refresh time and refresh seq no + updateRemoteRefreshTimeAndSeqNo(refreshTimeMs, refreshSeqNo); // Reset the backoffDelayIterator for the future failures resetBackOffDelayIterator(); // Cancel the scheduled cancellable retry if possible and set it to null @@ -350,45 +365,14 @@ String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos s return segmentInfoSnapshotFilename; } - // Visible for testing - boolean uploadNewSegments(Collection localSegmentsPostRefresh, Map sizeMap) throws IOException { + private boolean uploadNewSegments(Collection localSegmentsPostRefresh) throws IOException { AtomicBoolean uploadSuccess = new AtomicBoolean(true); - // Exclude files that are already uploaded and the exclude files to come up with the list of files to be uploaded. - List filesToUpload = localSegmentsPostRefresh.stream().filter(file -> !EXCLUDE_FILES.contains(file)).filter(file -> { + localSegmentsPostRefresh.forEach(file -> { try { - return !remoteDirectory.containsFile(file, getChecksumOfLocalFile(file)); + fileUploader.uploadFile(file); } catch (IOException e) { - logger.info( - "Exception while reading checksum of local segment file: {}, ignoring the exception and re-uploading the file", - file - ); - return true; - } - }).collect(Collectors.toList()); - - // Start tracking the upload bytes started - filesToUpload.forEach(file -> segmentTracker.addUploadBytesStarted(sizeMap.get(file))); - - // Starting the uploads now - filesToUpload.forEach(file -> { - boolean success = false; - long fileSize = sizeMap.get(file); - try { - // Start upload to remote store - remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT); - // Upload succeeded - segmentTracker.addUploadBytesSucceeded(fileSize); - segmentTracker.addToLatestUploadedFiles(file); - success = true; - } catch (IOException e) { - // ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397) + uploadSuccess.set(false); logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e); - } finally { - if (success == false) { - uploadSuccess.set(false); - // Upload failed - segmentTracker.addUploadBytesFailed(fileSize); - } } }); return uploadSuccess.get(); @@ -416,7 +400,7 @@ private void deleteStaleCommits() { * Updates the last refresh time and refresh seq no which is seen by local store. */ private void updateLocalRefreshTimeAndSeqNo() { - segmentTracker.updateLocalRefreshTimeMs(pendingRefreshTimeMs); + segmentTracker.updateLocalRefreshTimeMs(System.nanoTime() / 1_000_000L); segmentTracker.updateLocalRefreshSeqNo(segmentTracker.getLocalRefreshSeqNo() + 1); } @@ -429,33 +413,33 @@ private void updateRemoteRefreshTimeAndSeqNo(long refreshTimeMs, long refreshSeq } /** - * Returns map of file name to size of the input segment files. Tries to reuse existing information by caching the size + * Updates map of file name to size of the input segment files. Tries to reuse existing information by caching the size * data, otherwise uses {@code storeDirectory.fileLength(file)} to get the size. This method also removes from the map * such files that are not present in the list of segment files given in the input. * * @param segmentFiles list of segment files for which size needs to be known - * @return the map as mentioned above */ - private Map createSizeMap(Collection segmentFiles) { - // Create a map of file name to size - Map sizeMap = segmentFiles.stream() + private void updateLocalSizeMapAndTracker(Collection segmentFiles) { + + // Update the map + segmentFiles.stream() .filter(file -> !EXCLUDE_FILES.contains(file)) - .collect(Collectors.toMap(Function.identity(), file -> { - if (latestFileNameSizeOnLocalMap.containsKey(file)) { - return latestFileNameSizeOnLocalMap.get(file); - } + .filter(file -> !latestFileNameSizeOnLocalMap.containsKey(file) || latestFileNameSizeOnLocalMap.get(file) == 0) + .forEach(file -> { long fileSize = 0; try { fileSize = storeDirectory.fileLength(file); - latestFileNameSizeOnLocalMap.put(file, fileSize); } catch (IOException e) { logger.warn(new ParameterizedMessage("Exception while reading the fileLength of file={}", file), e); } - return fileSize; - })); + latestFileNameSizeOnLocalMap.put(file, fileSize); + }); + + Set fileSet = new HashSet<>(segmentFiles); // Remove keys from the fileSizeMap that do not exist in the latest segment files - latestFileNameSizeOnLocalMap.entrySet().removeIf(entry -> sizeMap.containsKey(entry.getKey()) == false); - return sizeMap; + latestFileNameSizeOnLocalMap.entrySet().removeIf(entry -> fileSet.contains(entry.getKey()) == false); + // Update the tracker + segmentTracker.setLatestLocalFileNameLengthMap(latestFileNameSizeOnLocalMap); } private void updateFinalUploadStatusInSegmentTracker(boolean uploadStatus, long bytesBeforeUpload, long startTimeInNS) { @@ -471,4 +455,100 @@ private void updateFinalUploadStatusInSegmentTracker(boolean uploadStatus, long segmentTracker.incrementTotalUploadsFailed(); } } + + /** + * This class is a wrapper over the copying of file from local to remote store allowing to decorate the actual copy + * method along with adding hooks of code that can be run before, on success and on failure. + * + * @opensearch.internal + */ + private static class FileUploader { + + private final UploadTracker uploadTracker; + + private final RemoteSegmentStoreDirectory remoteDirectory; + + private final Directory storeDirectory; + + private final CheckedFunction checksumProvider; + + public FileUploader( + UploadTracker uploadTracker, + RemoteSegmentStoreDirectory remoteDirectory, + Directory storeDirectory, + CheckedFunction checksumProvider + ) { + this.uploadTracker = uploadTracker; + this.remoteDirectory = remoteDirectory; + this.storeDirectory = storeDirectory; + this.checksumProvider = checksumProvider; + } + + /** + * Calling this method will lead to before getting executed and then the actual upload. Based on the upload status, + * the onSuccess or onFailure method gets invoked. + * + * @param file the file which is to be uploaded. + * @throws IOException is thrown if the upload fails. + */ + private void uploadFile(String file) throws IOException { + if (skipUpload(file)) { + return; + } + uploadTracker.beforeUpload(file); + boolean success = false; + try { + performUpload(file); + uploadTracker.onSuccess(file); + success = true; + } finally { + if (!success) { + uploadTracker.onFailure(file); + } + } + } + + /** + * Whether to upload a file or not depending on whether file is in excluded list or has been already uploaded. + * + * @param file that needs to be uploaded. + * @return true if the upload has to be skipped for the file. + */ + private boolean skipUpload(String file) { + try { + // Exclude files that are already uploaded and the exclude files to come up with the list of files to be uploaded. + return EXCLUDE_FILES.contains(file) || remoteDirectory.containsFile(file, checksumProvider.apply(file)); + } catch (IOException e) { + logger.error( + "Exception while reading checksum of local segment file: {}, ignoring the exception and re-uploading the file", + file + ); + } + return false; + } + + /** + * This method does the actual upload. + * + * @param file that needs to be uploaded. + * @throws IOException is thrown if the upload fails. + */ + private void performUpload(String file) throws IOException { + remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT); + } + } + + /** + * A tracker class that is fed to FileUploader. + * + * @opensearch.internal + */ + interface UploadTracker { + + void beforeUpload(String file); + + void onSuccess(String file); + + void onFailure(String file); + } } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 0300c0880e990..c2c06f71d1985 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -72,7 +72,6 @@ public void setup(boolean primary, int numberOfDocs) throws IOException { SegmentReplicationCheckpointPublisher.EMPTY, remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()) ); - remoteStoreRefreshListener.beforeRefresh(); } private void indexDocs(int startDocId, int numberOfDocs) throws IOException { @@ -383,7 +382,6 @@ private Tuple m SegmentReplicationCheckpointPublisher.EMPTY, remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()) ); - refreshListener.beforeRefresh(); refreshListener.afterRefresh(true); return Tuple.tuple(refreshListener, remoteRefreshSegmentPressureService); } From dc378e86e31f68524e271d91c47acac6377bab5d Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Fri, 19 May 2023 01:34:02 +0530 Subject: [PATCH 6/9] Empty-Commit Signed-off-by: Ashish Singh From 4803283f6e06a9389ceb217b3a144d7041fb5ca5 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Fri, 19 May 2023 14:04:58 +0530 Subject: [PATCH 7/9] Increase the default limit for variance factor Signed-off-by: Ashish Singh --- .../RemoteRefreshSegmentPressureService.java | 16 +++++++++------- .../RemoteRefreshSegmentPressureSettings.java | 4 ++-- ...RemoteRefreshSegmentPressureServiceTests.java | 8 ++++---- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java index d29db2820b8b4..280381a7b6109 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java @@ -108,13 +108,9 @@ public boolean isSegmentsUploadBackpressureEnabled() { */ public void validateSegmentsUploadLag(ShardId shardId) { RemoteRefreshSegmentTracker remoteRefreshSegmentTracker = getRemoteRefreshSegmentTracker(shardId); - // This will be null for non-remote backed indexes - if (remoteRefreshSegmentTracker == null) { - return; - } - // Check if refresh checkpoint (a.k.a. seq number) lag is 2 or below - this is to handle segment merges that can - // increase the bytes to upload almost suddenly. - if (remoteRefreshSegmentTracker.getRefreshSeqNoLag() <= 1) { + // condition 1 - This will be null for non-remote backed indexes + // condition 2 - This will be zero if the remote store is + if (remoteRefreshSegmentTracker == null || remoteRefreshSegmentTracker.getRefreshSeqNoLag() == 0) { return; } @@ -189,6 +185,9 @@ private BytesLagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) @Override public boolean validate(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) { + if (pressureTracker.getRefreshSeqNoLag() <= 1) { + return true; + } if (pressureTracker.isUploadBytesAverageReady() == false) { logger.trace("upload bytes moving average is not ready"); return true; @@ -232,6 +231,9 @@ private TimeLagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) @Override public boolean validate(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) { + if (pressureTracker.getRefreshSeqNoLag() <= 1) { + return true; + } if (pressureTracker.isUploadTimeMsAverageReady() == false) { logger.trace("upload time moving average is not ready"); return true; diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettings.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettings.java index 3da21506d60e9..2a098b8f7a89b 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettings.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettings.java @@ -21,8 +21,8 @@ public class RemoteRefreshSegmentPressureSettings { private static class Defaults { - private static final double BYTES_LAG_VARIANCE_FACTOR = 2.0; - private static final double UPLOAD_TIME_LAG_VARIANCE_FACTOR = 2.0; + private static final double BYTES_LAG_VARIANCE_FACTOR = 10.0; + private static final double UPLOAD_TIME_LAG_VARIANCE_FACTOR = 10.0; private static final double VARIANCE_FACTOR_MIN_VALUE = 1.0; private static final int MIN_CONSECUTIVE_FAILURES_LIMIT = 5; private static final int MIN_CONSECUTIVE_FAILURES_LIMIT_MIN_VALUE = 1; diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java index 920607ffc8b16..1bab4bbfd9d31 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java @@ -111,11 +111,11 @@ public void testValidateSegmentUploadLag() { }); double avg = (double) sum.get() / 20; long currentMs = System.nanoTime() / 1_000_000; - pressureTracker.updateLocalRefreshTimeMs((long) (currentMs + 4 * avg)); + pressureTracker.updateLocalRefreshTimeMs((long) (currentMs + 12 * avg)); pressureTracker.updateRemoteRefreshTimeMs(currentMs); Exception e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments")); - assertTrue(e.getMessage().contains("time_lag:38 ms dynamic_time_lag_threshold:19.0 ms")); + assertTrue(e.getMessage().contains("time_lag:114 ms dynamic_time_lag_threshold:95.0 ms")); pressureTracker.updateRemoteRefreshTimeMs((long) (currentMs + 2 * avg)); pressureService.validateSegmentsUploadLag(shardId); @@ -128,11 +128,11 @@ public void testValidateSegmentUploadLag() { }); avg = (double) sum.get() / 20; Map nameSizeMap = new HashMap<>(); - nameSizeMap.put("a", (long) (4 * avg)); + nameSizeMap.put("a", (long) (12 * avg)); pressureTracker.setLatestLocalFileNameLengthMap(nameSizeMap); e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments")); - assertTrue(e.getMessage().contains("bytes_lag:38 dynamic_bytes_lag_threshold:19.0")); + assertTrue(e.getMessage().contains("bytes_lag:114 dynamic_bytes_lag_threshold:95.0")); nameSizeMap.put("a", (long) (2 * avg)); pressureTracker.setLatestLocalFileNameLengthMap(nameSizeMap); From 6dad1a47e410de7e1b56ddf59745235b65073e84 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Fri, 19 May 2023 15:43:34 +0530 Subject: [PATCH 8/9] Fix failing test & incorporate comments Signed-off-by: Ashish Singh --- ...emoteStoreMockRepositoryIntegTestCase.java | 6 +-- ...java => RemoteStoreBaseIntegTestCase.java} | 2 +- .../opensearch/remotestore/RemoteStoreIT.java | 2 +- .../ReplicaToPrimaryPromotionIT.java | 2 +- ...teRefreshSegmentPressureSettingsTests.java | 4 +- .../RemoteStoreRefreshListenerTests.java | 44 +++++++++++++++++-- 6 files changed, 46 insertions(+), 14 deletions(-) rename server/src/internalClusterTest/java/org/opensearch/remotestore/{RemoteStoreBaseIT.java => RemoteStoreBaseIntegTestCase.java} (97%) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java index 316a4a68a081a..dc312ffa6676d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java @@ -38,11 +38,7 @@ public abstract class AbstractRemoteStoreMockRepositoryIntegTestCase extends Abs @Override protected Settings featureFlagSettings() { - return Settings.builder() - .put(super.featureFlagSettings()) - .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") - .put(FeatureFlags.REMOTE_STORE, "true") - .build(); + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build(); } @Before diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java similarity index 97% rename from server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIT.java rename to server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index bdaa5af222459..0914506e632dd 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -26,7 +26,7 @@ import static java.util.Arrays.asList; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; -public class RemoteStoreBaseIT extends OpenSearchIntegTestCase { +public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase { protected static final String REPOSITORY_NAME = "test-remore-store-repo"; protected static final int SHARD_COUNT = 1; protected static final int REPLICA_COUNT = 1; diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 41ae0da9ccb72..f069950c11f17 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -34,7 +34,7 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class RemoteStoreIT extends RemoteStoreBaseIT { +public class RemoteStoreIT extends RemoteStoreBaseIntegTestCase { private static final String INDEX_NAME = "remote-store-test-idx-1"; private static final String TOTAL_OPERATIONS = "total-operations"; diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java index fe8f3612be6bf..712747f7479ae 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java @@ -28,7 +28,7 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0) -public class ReplicaToPrimaryPromotionIT extends RemoteStoreBaseIT { +public class ReplicaToPrimaryPromotionIT extends RemoteStoreBaseIntegTestCase { private int shard_count = 5; @Override diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java index b3c05a50ac881..75b5b946e8bf8 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java @@ -55,10 +55,10 @@ public void testGetDefaultSettings() { assertFalse(pressureSettings.isRemoteRefreshSegmentPressureEnabled()); // Check bytes lag variance threshold default value - assertEquals(2.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d); + assertEquals(10.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d); // Check time lag variance threshold default value - assertEquals(2.0, pressureSettings.getUploadTimeLagVarianceFactor(), 0.0d); + assertEquals(10.0, pressureSettings.getUploadTimeLagVarianceFactor(), 0.0d); // Check minimum consecutive failures limit default value assertEquals(5, pressureSettings.getMinConsecutiveFailuresLimit()); diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 9fbe2143da196..365fb0237f80f 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -30,6 +30,7 @@ import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; @@ -42,6 +43,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX; public class RemoteStoreRefreshListenerTests extends IndexShardTestCase { @@ -237,9 +239,19 @@ public void testRefreshSuccessOnFirstAttempt() throws Exception { // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down CountDownLatch successLatch = new CountDownLatch(3); - mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch); + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + succeedOnAttempt, + refreshCountLatch, + successLatch + ); assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); assertBusy(() -> assertEquals(0, successLatch.getCount())); + RemoteRefreshSegmentPressureService pressureService = tuple.v2(); + RemoteRefreshSegmentTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + assertEquals(0, segmentTracker.getBytesLag()); + assertEquals(0, segmentTracker.getRefreshSeqNoLag()); + assertEquals(0, segmentTracker.getTimeMsLag()); + assertEquals(0, segmentTracker.getTotalUploadsFailed()); } public void testRefreshSuccessOnSecondAttempt() throws Exception { @@ -251,9 +263,19 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception { // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down CountDownLatch successLatch = new CountDownLatch(3); - mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch); + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + succeedOnAttempt, + refreshCountLatch, + successLatch + ); assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); assertBusy(() -> assertEquals(0, successLatch.getCount())); + RemoteRefreshSegmentPressureService pressureService = tuple.v2(); + RemoteRefreshSegmentTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + assertEquals(0, segmentTracker.getBytesLag()); + assertEquals(0, segmentTracker.getRefreshSeqNoLag()); + assertEquals(0, segmentTracker.getTimeMsLag()); + assertEquals(1, segmentTracker.getTotalUploadsFailed()); } public void testRefreshSuccessOnThirdAttemptAttempt() throws Exception { @@ -265,9 +287,20 @@ public void testRefreshSuccessOnThirdAttemptAttempt() throws Exception { // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down CountDownLatch successLatch = new CountDownLatch(3); - mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch); + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + succeedOnAttempt, + refreshCountLatch, + successLatch + ); assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); assertBusy(() -> assertEquals(0, successLatch.getCount())); + RemoteRefreshSegmentPressureService pressureService = tuple.v2(); + RemoteRefreshSegmentTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + assertEquals(0, segmentTracker.getBytesLag()); + assertEquals(0, segmentTracker.getRefreshSeqNoLag()); + assertEquals(0, segmentTracker.getTimeMsLag()); + assertEquals(2, segmentTracker.getTotalUploadsFailed()); + } public void testTrackerData() throws Exception { @@ -310,7 +343,10 @@ private Tuple m // Create index shard that we will be using to mock different methods in IndexShard for the unit test indexShard = newStartedShard( true, - Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true).build(), + Settings.builder() + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(), new InternalEngineFactory() ); From 077aa994374961a347ba2c50d2a67ace3a4c4173 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Fri, 19 May 2023 22:47:03 +0530 Subject: [PATCH 9/9] Empty-Commit Signed-off-by: Ashish Singh