From 048398f72f5c163ce52cddf425c7fbdd259eb87f Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 8 May 2023 13:47:37 +0530 Subject: [PATCH] Add backpressure in write path on remote segments lag Signed-off-by: Ashish Singh --- .../opensearch/index/shard/IndexShardIT.java | 1 + .../action/bulk/TransportShardBulkAction.java | 15 +- .../org/opensearch/index/IndexService.java | 7 +- .../remote/RemoteRefreshSegmentTracker.java | 57 ++-- .../opensearch/index/shard/IndexShard.java | 9 +- .../shard/RemoteStoreRefreshListener.java | 297 +++++++++++++----- .../org/opensearch/indices/IndicesModule.java | 2 + .../opensearch/indices/IndicesService.java | 12 +- .../cluster/IndicesClusterStateService.java | 24 +- .../bulk/TransportShardBulkActionTests.java | 5 + ...oteRefreshSegmentPressureServiceTests.java | 2 +- .../RemoteRefreshSegmentTrackerTests.java | 8 +- .../RemoteStoreRefreshListenerTests.java | 40 ++- ...dicesLifecycleListenerSingleNodeTests.java | 3 +- ...actIndicesClusterStateServiceTestCase.java | 4 +- ...ClusterStateServiceRandomUpdatesTests.java | 3 +- .../snapshots/SnapshotResiliencyTests.java | 5 +- .../index/shard/IndexShardTestCase.java | 21 +- 18 files changed, 375 insertions(+), 140 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index 11f187ac6e619..ba567c125c6e9 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -701,6 +701,7 @@ public static final IndexShard newIndexShard( cbs, (indexSettings, shardRouting) -> new InternalTranslogFactory(), SegmentReplicationCheckpointPublisher.EMPTY, + null, null ); } diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index 6af512b011653..5230eab2d25a4 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -88,6 +88,7 @@ import org.opensearch.index.mapper.MapperException; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.mapper.SourceToParse; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; @@ -135,6 +136,7 @@ public class TransportShardBulkAction extends TransportWriteAction globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, - final SegmentReplicationCheckpointPublisher checkpointPublisher + final SegmentReplicationCheckpointPublisher checkpointPublisher, + final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); /* @@ -506,7 +508,8 @@ public synchronized IndexShard createShard( circuitBreakerService, translogFactorySupplier, this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null, - remoteStore + remoteStore, + remoteRefreshSegmentPressureService ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java index 109eadf34509b..4b2a24bd798b8 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java @@ -169,28 +169,28 @@ ShardId getShardId() { return shardId; } - long getLocalRefreshSeqNo() { + public long getLocalRefreshSeqNo() { return localRefreshSeqNo; } - void updateLocalRefreshSeqNo(long localRefreshSeqNo) { - assert localRefreshSeqNo > this.localRefreshSeqNo : "newLocalRefreshSeqNo=" + public void updateLocalRefreshSeqNo(long localRefreshSeqNo) { + assert localRefreshSeqNo >= this.localRefreshSeqNo : "newLocalRefreshSeqNo=" + localRefreshSeqNo - + ">=" + + " < " + "currentLocalRefreshSeqNo=" + this.localRefreshSeqNo; this.localRefreshSeqNo = localRefreshSeqNo; computeRefreshSeqNoLag(); } - long getLocalRefreshTimeMs() { + public long getLocalRefreshTimeMs() { return localRefreshTimeMs; } - void updateLocalRefreshTimeMs(long localRefreshTimeMs) { - assert localRefreshTimeMs > this.localRefreshTimeMs : "newLocalRefreshTimeMs=" + public void updateLocalRefreshTimeMs(long localRefreshTimeMs) { + assert localRefreshTimeMs >= this.localRefreshTimeMs : "newLocalRefreshTimeMs=" + localRefreshTimeMs - + ">=" + + " < " + "currentLocalRefreshTimeMs=" + this.localRefreshTimeMs; this.localRefreshTimeMs = localRefreshTimeMs; @@ -201,10 +201,10 @@ long getRemoteRefreshSeqNo() { return remoteRefreshSeqNo; } - void updateRemoteRefreshSeqNo(long remoteRefreshSeqNo) { - assert remoteRefreshSeqNo > this.remoteRefreshSeqNo : "newRemoteRefreshSeqNo=" + public void updateRemoteRefreshSeqNo(long remoteRefreshSeqNo) { + assert remoteRefreshSeqNo >= this.remoteRefreshSeqNo : "newRemoteRefreshSeqNo=" + remoteRefreshSeqNo - + ">=" + + " < " + "currentRemoteRefreshSeqNo=" + this.remoteRefreshSeqNo; this.remoteRefreshSeqNo = remoteRefreshSeqNo; @@ -215,10 +215,10 @@ long getRemoteRefreshTimeMs() { return remoteRefreshTimeMs; } - void updateRemoteRefreshTimeMs(long remoteRefreshTimeMs) { - assert remoteRefreshTimeMs > this.remoteRefreshTimeMs : "newRemoteRefreshTimeMs=" + public void updateRemoteRefreshTimeMs(long remoteRefreshTimeMs) { + assert remoteRefreshTimeMs >= this.remoteRefreshTimeMs : "newRemoteRefreshTimeMs=" + remoteRefreshTimeMs - + ">=" + + " < " + "currentRemoteRefreshTimeMs=" + this.remoteRefreshTimeMs; this.remoteRefreshTimeMs = remoteRefreshTimeMs; @@ -249,7 +249,7 @@ long getUploadBytesStarted() { return uploadBytesStarted; } - void addUploadBytesStarted(long size) { + public void addUploadBytesStarted(long size) { uploadBytesStarted += size; } @@ -257,15 +257,15 @@ long getUploadBytesFailed() { return uploadBytesFailed; } - void addUploadBytesFailed(long size) { + public void addUploadBytesFailed(long size) { uploadBytesFailed += size; } - long getUploadBytesSucceeded() { + public long getUploadBytesSucceeded() { return uploadBytesSucceeded; } - void addUploadBytesSucceeded(long size) { + public void addUploadBytesSucceeded(long size) { uploadBytesSucceeded += size; } @@ -277,7 +277,7 @@ long getTotalUploadsStarted() { return totalUploadsStarted; } - void incrementTotalUploadsStarted() { + public void incrementTotalUploadsStarted() { totalUploadsStarted += 1; } @@ -285,7 +285,7 @@ long getTotalUploadsFailed() { return totalUploadsFailed; } - void incrementTotalUploadsFailed() { + public void incrementTotalUploadsFailed() { totalUploadsFailed += 1; failures.record(true); } @@ -294,7 +294,7 @@ long getTotalUploadsSucceeded() { return totalUploadsSucceeded; } - void incrementTotalUploadSucceeded() { + public void incrementTotalUploadsSucceeded() { totalUploadsSucceeded += 1; failures.record(false); } @@ -323,16 +323,21 @@ Map getLatestLocalFileNameLengthMap() { return latestLocalFileNameLengthMap; } - void setLatestLocalFileNameLengthMap(Map latestLocalFileNameLengthMap) { + public void setLatestLocalFileNameLengthMap(Map latestLocalFileNameLengthMap) { this.latestLocalFileNameLengthMap = latestLocalFileNameLengthMap; computeBytesLag(); } - void addToLatestUploadFiles(String file) { + public void addToLatestUploadFiles(String file) { this.latestUploadFiles.add(file); computeBytesLag(); } + public void setLatestUploadFiles(Set files) { + this.latestUploadFiles.clear(); + this.latestUploadFiles.addAll(files); + } + private void computeBytesLag() { if (latestLocalFileNameLengthMap == null || latestLocalFileNameLengthMap.isEmpty()) { return; @@ -356,7 +361,7 @@ boolean isUploadBytesAverageReady() { return uploadBytesMovingAverageReference.get().getAverage(); } - void addUploadBytes(long size) { + public void addUploadBytes(long size) { synchronized (uploadBytesMutex) { this.uploadBytesMovingAverageReference.get().record(size); } @@ -381,7 +386,7 @@ boolean isUploadBytesPerSecAverageReady() { return uploadBytesPerSecMovingAverageReference.get().getAverage(); } - void addUploadBytesPerSec(long bytesPerSec) { + public void addUploadBytesPerSec(long bytesPerSec) { synchronized (uploadBytesPerSecMutex) { this.uploadBytesPerSecMovingAverageReference.get().record(bytesPerSec); } @@ -406,7 +411,7 @@ boolean isUploadTimeMsAverageReady() { return uploadTimeMsMovingAverageReference.get().getAverage(); } - void addUploadTimeMs(long timeMs) { + public void addUploadTimeMs(long timeMs) { synchronized (uploadTimeMsMutex) { this.uploadTimeMsMovingAverageReference.get().record(timeMs); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 3453ee196077c..f19383dfcc135 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -143,6 +143,7 @@ import org.opensearch.index.merge.MergeStats; import org.opensearch.index.recovery.RecoveryStats; import org.opensearch.index.refresh.RefreshStats; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.search.stats.SearchStats; import org.opensearch.index.search.stats.ShardSearchStats; import org.opensearch.index.seqno.ReplicationTracker; @@ -326,8 +327,8 @@ Runnable getGlobalCheckpointSyncer() { private volatile boolean useRetentionLeasesInPeerRecovery; private final Store remoteStore; private final BiFunction translogFactorySupplier; - private final boolean isTimeSeriesIndex; + private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; public IndexShard( final ShardRouting shardRouting, @@ -352,7 +353,8 @@ public IndexShard( final CircuitBreakerService circuitBreakerService, final BiFunction translogFactorySupplier, @Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher, - @Nullable final Store remoteStore + @Nullable final Store remoteStore, + final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -447,6 +449,7 @@ public boolean shouldCache(Query query) { this.isTimeSeriesIndex = (mapperService == null || mapperService.documentMapper() == null) ? false : mapperService.documentMapper().mappers().containsTimeStampField(); + this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService; } public ThreadPool getThreadPool() { @@ -3548,7 +3551,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro final List internalRefreshListener = new ArrayList<>(); internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric)); if (isRemoteStoreEnabled()) { - internalRefreshListener.add(new RemoteStoreRefreshListener(this)); + internalRefreshListener.add(new RemoteStoreRefreshListener(this, remoteRefreshSegmentPressureService)); } if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) { internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher)); diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 2f8757015f208..957ec04db4202 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -25,6 +25,8 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.index.engine.EngineException; import org.opensearch.index.engine.InternalEngine; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteRefreshSegmentTracker; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.threadpool.Scheduler; @@ -44,6 +46,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import java.util.stream.Collectors; import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; @@ -84,6 +87,7 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres private final IndexShard indexShard; private final Directory storeDirectory; private final RemoteSegmentStoreDirectory remoteDirectory; + private final RemoteRefreshSegmentPressureService pressureService; private final Map localSegmentChecksumMap; private long primaryTerm; @@ -96,7 +100,17 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres private volatile Scheduler.ScheduledCancellable scheduledCancellableRetry; - public RemoteStoreRefreshListener(IndexShard indexShard) { + /** + * This keeps the pending refresh time which is set before the refresh. This is used to set the last refresh time. + */ + private volatile long pendingRefreshTimeMs; + + /** + * Keeps track of segment files and their size in bytes which are part of the most recent refresh. + */ + private final Map latestFileNameSizeOnLocalMap = new HashMap<>(); + + public RemoteStoreRefreshListener(IndexShard indexShard, RemoteRefreshSegmentPressureService pressureService) { this.indexShard = indexShard; this.storeDirectory = indexShard.store().directory(); this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()) @@ -110,12 +124,14 @@ public RemoteStoreRefreshListener(IndexShard indexShard) { logger.error("Exception while initialising RemoteSegmentStoreDirectory", e); } } + this.pressureService = pressureService; resetBackOffDelayIterator(); } @Override public void beforeRefresh() throws IOException { - // Do Nothing + // Set the pending refresh time which gets set on account of success refresh. + pendingRefreshTimeMs = System.nanoTime() / 1_000_000L; } /** @@ -126,6 +142,11 @@ public void beforeRefresh() throws IOException { */ @Override public void afterRefresh(boolean didRefresh) { + + if (didRefresh) { + updateLocalRefreshTimeAndSeqNo(); + } + try { indexShard.getThreadPool().executor(ThreadPool.Names.REMOTE_REFRESH).submit(() -> syncSegments(false)).get(); } catch (InterruptedException | ExecutionException e) { @@ -134,95 +155,122 @@ public void afterRefresh(boolean didRefresh) { } private synchronized void syncSegments(boolean isRetry) { - boolean shouldRetry = false; + if (indexShard.getReplicationTracker().isPrimaryMode() == false) { + return; + } beforeSegmentsSync(isRetry); + RemoteRefreshSegmentTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshSeqNo = segmentTracker.getLocalRefreshSeqNo(); + long bytesBeforeUpload = segmentTracker.getUploadBytesSucceeded(), startTimeInNS = System.nanoTime(); + boolean shouldRetry = true; try { - if (indexShard.getReplicationTracker().isPrimaryMode()) { - if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { - this.primaryTerm = indexShard.getOperationPrimaryTerm(); - this.remoteDirectory.init(); + // Start tracking total uploads started + segmentTracker.incrementTotalUploadsStarted(); + + if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { + this.primaryTerm = indexShard.getOperationPrimaryTerm(); + this.remoteDirectory.init(); + } + try { + // if a new segments_N file is present in local that is not uploaded to remote store yet, it + // is considered as a first refresh post commit. A cleanup of stale commit files is triggered. + // This is done to avoid delete post each refresh. + // Ideally, we want this to be done in async flow. (GitHub issue #4315) + if (isRefreshAfterCommit()) { + deleteStaleCommits(); } - try { - // if a new segments_N file is present in local that is not uploaded to remote store yet, it - // is considered as a first refresh post commit. A cleanup of stale commit files is triggered. - // This is done to avoid delete post each refresh. - // Ideally, we want this to be done in async flow. (GitHub issue #4315) - if (isRefreshAfterCommit()) { - deleteStaleCommits(); - } - String segmentInfoSnapshotFilename = null; - try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { - SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); - - Collection localSegmentsPostRefresh = segmentInfos.files(true); - - List segmentInfosFiles = localSegmentsPostRefresh.stream() - .filter(file -> file.startsWith(IndexFileNames.SEGMENTS)) - .collect(Collectors.toList()); - Optional latestSegmentInfos = segmentInfosFiles.stream() - .max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName)); - - if (latestSegmentInfos.isPresent()) { - // SegmentInfosSnapshot is a snapshot of reader's view of segments and may not contain - // all the segments from last commit if they are merged away but not yet committed. - // Each metadata file in the remote segment store represents a commit and the following - // statement keeps sure that each metadata will always contain all the segments from last commit + refreshed - // segments. - localSegmentsPostRefresh.addAll(SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true)); - segmentInfosFiles.stream() - .filter(file -> !file.equals(latestSegmentInfos.get())) - .forEach(localSegmentsPostRefresh::remove); - - boolean uploadStatus = uploadNewSegments(localSegmentsPostRefresh); - if (uploadStatus) { - segmentInfoSnapshotFilename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos); - localSegmentsPostRefresh.add(segmentInfoSnapshotFilename); - - remoteDirectory.uploadMetadata( - localSegmentsPostRefresh, - storeDirectory, - indexShard.getOperationPrimaryTerm(), - segmentInfos.getGeneration() - ); - localSegmentChecksumMap.keySet() - .stream() - .filter(file -> !localSegmentsPostRefresh.contains(file)) - .collect(Collectors.toSet()) - .forEach(localSegmentChecksumMap::remove); - OnSuccessfulSegmentsSync(); - final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); - indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1); - } else { - shouldRetry = true; - } + String segmentInfoSnapshotFilename = null; + try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { + SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); + + Collection localSegmentsPostRefresh = segmentInfos.files(true); + + List segmentInfosFiles = localSegmentsPostRefresh.stream() + .filter(file -> file.startsWith(IndexFileNames.SEGMENTS)) + .collect(Collectors.toList()); + Optional latestSegmentInfos = segmentInfosFiles.stream() + .max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName)); + + if (latestSegmentInfos.isPresent()) { + // SegmentInfosSnapshot is a snapshot of reader's view of segments and may not contain + // all the segments from last commit if they are merged away but not yet committed. + // Each metadata file in the remote segment store represents a commit and the following + // statement keeps sure that each metadata will always contain all the segments from last commit + refreshed + // segments. + localSegmentsPostRefresh.addAll(SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true)); + segmentInfosFiles.stream() + .filter(file -> !file.equals(latestSegmentInfos.get())) + .forEach(localSegmentsPostRefresh::remove); + + // Create a map of file name to size and update the refresh segment tracker + Map currentLocalSizeMap = createSizeMap(localSegmentsPostRefresh); + segmentTracker.setLatestLocalFileNameLengthMap(currentLocalSizeMap); + + // Start the segments files upload + boolean newSegmentsUploadStatus = uploadNewSegments(localSegmentsPostRefresh, segmentTracker, currentLocalSizeMap); + if (newSegmentsUploadStatus) { + segmentInfoSnapshotFilename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos); + localSegmentsPostRefresh.add(segmentInfoSnapshotFilename); + + // Start metadata file upload + remoteDirectory.uploadMetadata( + localSegmentsPostRefresh, + storeDirectory, + indexShard.getOperationPrimaryTerm(), + segmentInfos.getGeneration() + ); + // Update latest uploaded segment files name in segment tracker + segmentTracker.setLatestUploadFiles(currentLocalSizeMap.keySet()); + // Update the remote refresh time and refresh seq no + updateRemoteRefreshTimeAndSeqNo(segmentTracker, refreshTimeMs, refreshSeqNo); + clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh); + OnSuccessfulSegmentsSync(); + final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); + indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1); + // At this point since we have uploaded new segments, segment infos and segment metadata file, + // along with marking minSeqNoToKeep, upload has succeeded completely. + shouldRetry = false; } - } catch (EngineException e) { - shouldRetry = true; - logger.warn("Exception while reading SegmentInfosSnapshot", e); - } finally { - try { - if (segmentInfoSnapshotFilename != null) { - storeDirectory.deleteFile(segmentInfoSnapshotFilename); - } - } catch (IOException e) { - logger.warn("Exception while deleting: " + segmentInfoSnapshotFilename, e); + } + } catch (EngineException e) { + logger.warn("Exception while reading SegmentInfosSnapshot", e); + } finally { + try { + if (segmentInfoSnapshotFilename != null) { + storeDirectory.deleteFile(segmentInfoSnapshotFilename); } + } catch (IOException e) { + logger.warn("Exception while deleting: " + segmentInfoSnapshotFilename, e); } - } catch (IOException e) { - shouldRetry = true; - // We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried - // in the next refresh. This should not affect durability of the indexed data after remote trans-log integration. - logger.warn("Exception while uploading new segments to the remote segment store", e); } + } catch (IOException e) { + // We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried + // in the next refresh. This should not affect durability of the indexed data after remote trans-log integration. + logger.warn("Exception while uploading new segments to the remote segment store", e); } } catch (Throwable t) { - shouldRetry = true; logger.error("Exception in RemoteStoreRefreshListener.afterRefresh()", t); + } finally { + // Update the segment tracker with the final upload status as seen at the end + updateFinalUploadStatusInSegmentTracker(segmentTracker, shouldRetry == false, bytesBeforeUpload, startTimeInNS); } afterSegmentsSync(isRetry, shouldRetry); } + /** + * // Clears the stale files from the latest local segment checksum map. + * + * @param localSegmentsPostRefresh list of segment files present post refresh + */ + private void clearStaleFilesFromLocalSegmentChecksumMap(Collection localSegmentsPostRefresh) { + localSegmentChecksumMap.keySet() + .stream() + .filter(file -> !localSegmentsPostRefresh.contains(file)) + .collect(Collectors.toSet()) + .forEach(localSegmentChecksumMap::remove); + } + private void beforeSegmentsSync(boolean isRetry) { if (isRetry) { logger.info("Retrying to sync the segments to remote store"); @@ -293,9 +341,14 @@ String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos s } // Visible for testing - boolean uploadNewSegments(Collection localFiles) throws IOException { + boolean uploadNewSegments( + Collection localSegmentsPostRefresh, + RemoteRefreshSegmentTracker segmentTracker, + Map sizeMap + ) throws IOException { AtomicBoolean uploadSuccess = new AtomicBoolean(true); - localFiles.stream().filter(file -> !EXCLUDE_FILES.contains(file)).filter(file -> { + // Exclude files that are already uploaded and the exclude files to come up with the list of files to be uploaded. + List filesToUpload = localSegmentsPostRefresh.stream().filter(file -> !EXCLUDE_FILES.contains(file)).filter(file -> { try { return !remoteDirectory.containsFile(file, getChecksumOfLocalFile(file)); } catch (IOException e) { @@ -305,13 +358,31 @@ boolean uploadNewSegments(Collection localFiles) throws IOException { ); return true; } - }).forEach(file -> { + }).collect(Collectors.toList()); + + // Start tracking the upload bytes started + filesToUpload.forEach(file -> segmentTracker.addUploadBytesStarted(sizeMap.get(file))); + + // Starting the uploads now + filesToUpload.forEach(file -> { + boolean success = false; + long fileSize = sizeMap.get(file); try { + // Start upload to remote store remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT); + // Upload succeeded + segmentTracker.addUploadBytesSucceeded(fileSize); + segmentTracker.addToLatestUploadFiles(file); + success = true; } catch (IOException e) { - uploadSuccess.set(false); // ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397) logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e); + } finally { + if (success == false) { + uploadSuccess.set(false); + // Upload failed + segmentTracker.addUploadBytesFailed(fileSize); + } } }); return uploadSuccess.get(); @@ -334,4 +405,70 @@ private void deleteStaleCommits() { logger.info("Exception while deleting stale commits from remote segment store, will retry delete post next commit", e); } } + + /** + * Updates the last refresh time and refresh seq no which is seen by local store. + */ + private void updateLocalRefreshTimeAndSeqNo() { + RemoteRefreshSegmentTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + segmentTracker.updateLocalRefreshTimeMs(pendingRefreshTimeMs); + segmentTracker.updateLocalRefreshSeqNo(segmentTracker.getLocalRefreshSeqNo() + 1); + } + + /** + * Updates the last refresh time and refresh seq no which is seen by remote store. + */ + private void updateRemoteRefreshTimeAndSeqNo(RemoteRefreshSegmentTracker segmentTracker, long refreshTimeMs, long refreshSeqNo) { + segmentTracker.updateRemoteRefreshTimeMs(refreshTimeMs); + segmentTracker.updateRemoteRefreshSeqNo(refreshSeqNo); + } + + /** + * Returns map of file name to size of the input segment files. Tries to reuse existing information by caching the size + * data, otherwise uses {@code storeDirectory.fileLength(file)} to get the size. This method also removes from the map + * such files that are not present in the list of segment files given in the input. + * + * @param segmentFiles list of segment files for which size needs to be known + * @return the map as mentioned above + */ + private Map createSizeMap(Collection segmentFiles) { + // Create a map of file name to size + Map sizeMap = segmentFiles.stream() + .filter(file -> !EXCLUDE_FILES.contains(file)) + .collect(Collectors.toMap(Function.identity(), file -> { + if (latestFileNameSizeOnLocalMap.containsKey(file)) { + return latestFileNameSizeOnLocalMap.get(file); + } + long fileSize = 0; + try { + fileSize = storeDirectory.fileLength(file); + latestFileNameSizeOnLocalMap.put(file, fileSize); + } catch (IOException e) { + logger.warn(new ParameterizedMessage("Exception while reading the fileLength of file={}", file), e); + } + return fileSize; + })); + // Remove keys from the fileSizeMap that do not exist in the latest segment files + latestFileNameSizeOnLocalMap.entrySet().removeIf(entry -> sizeMap.containsKey(entry.getKey()) == false); + return sizeMap; + } + + private void updateFinalUploadStatusInSegmentTracker( + RemoteRefreshSegmentTracker statsTracker, + boolean uploadStatus, + long bytesBeforeUpload, + long startTimeInNS + ) { + if (uploadStatus) { + long bytesUploaded = statsTracker.getUploadBytesSucceeded() - bytesBeforeUpload; + long timeTakenInMS = (System.nanoTime() - startTimeInNS) / 1_000_000L; + + statsTracker.incrementTotalUploadsSucceeded(); + statsTracker.addUploadBytes(bytesUploaded); + statsTracker.addUploadBytesPerSec((bytesUploaded * 1_000L) / timeTakenInMS); + statsTracker.addUploadTimeMs(timeTakenInMS); + } else { + statsTracker.incrementTotalUploadsFailed(); + } + } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesModule.java b/server/src/main/java/org/opensearch/indices/IndicesModule.java index a5276350d582a..0dee977ad3042 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesModule.java +++ b/server/src/main/java/org/opensearch/indices/IndicesModule.java @@ -69,6 +69,7 @@ import org.opensearch.index.mapper.SourceFieldMapper; import org.opensearch.index.mapper.TextFieldMapper; import org.opensearch.index.mapper.VersionFieldMapper; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.seqno.RetentionLeaseBackgroundSyncAction; import org.opensearch.index.seqno.RetentionLeaseSyncAction; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -286,6 +287,7 @@ protected void configure() { bind(RetentionLeaseSyncer.class).asEagerSingleton(); bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton(); bind(SegmentReplicationPressureService.class).asEagerSingleton(); + bind(RemoteRefreshSegmentPressureService.class).asEagerSingleton(); } /** diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index b3843dfd114a9..cdad2c45638e5 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -120,6 +120,7 @@ import org.opensearch.index.query.QueryRewriteContext; import org.opensearch.index.recovery.RecoveryStats; import org.opensearch.index.refresh.RefreshStats; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.search.stats.SearchStats; import org.opensearch.index.seqno.RetentionLeaseStats; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -941,14 +942,21 @@ public IndexShard createShard( final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, - final DiscoveryNode sourceNode + final DiscoveryNode sourceNode, + final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); ensureChangesAllowed(); IndexService indexService = indexService(shardRouting.index()); assert indexService != null; RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode); - IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher); + IndexShard indexShard = indexService.createShard( + shardRouting, + globalCheckpointSyncer, + retentionLeaseSyncer, + checkpointPublisher, + remoteRefreshSegmentPressureService + ); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> { assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index f2a6583ae47bc..4a0fab82f9adc 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -56,6 +56,7 @@ import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.env.ShardLockObtainFailedException; @@ -64,6 +65,7 @@ import org.opensearch.index.IndexComponent; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.seqno.GlobalCheckpointSyncAction; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -146,6 +148,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final SegmentReplicationCheckpointPublisher checkpointPublisher; + private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; + @Inject public IndicesClusterStateService( final Settings settings, @@ -164,7 +168,8 @@ public IndicesClusterStateService( final PrimaryReplicaSyncer primaryReplicaSyncer, final GlobalCheckpointSyncAction globalCheckpointSyncAction, final RetentionLeaseSyncer retentionLeaseSyncer, - final SegmentReplicationCheckpointPublisher checkpointPublisher + final SegmentReplicationCheckpointPublisher checkpointPublisher, + final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService ) { this( settings, @@ -183,7 +188,8 @@ public IndicesClusterStateService( snapshotShardsService, primaryReplicaSyncer, globalCheckpointSyncAction::updateGlobalCheckpointForShard, - retentionLeaseSyncer + retentionLeaseSyncer, + remoteRefreshSegmentPressureService ); } @@ -205,7 +211,8 @@ public IndicesClusterStateService( final SnapshotShardsService snapshotShardsService, final PrimaryReplicaSyncer primaryReplicaSyncer, final Consumer globalCheckpointSyncer, - final RetentionLeaseSyncer retentionLeaseSyncer + final RetentionLeaseSyncer retentionLeaseSyncer, + final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService ) { this.settings = settings; this.checkpointPublisher = checkpointPublisher; @@ -215,6 +222,10 @@ public IndicesClusterStateService( ); indexEventListeners.add(segmentReplicationTargetService); indexEventListeners.add(segmentReplicationSourceService); + // if remote store feature is not enabled, do not wire the remote upload pressure service as an IndexEventListener. + if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) { + indexEventListeners.add(remoteRefreshSegmentPressureService); + } this.segmentReplicationTargetService = segmentReplicationTargetService; this.builtInIndexListener = Collections.unmodifiableList(indexEventListeners); this.indicesService = indicesService; @@ -228,6 +239,7 @@ public IndicesClusterStateService( this.globalCheckpointSyncer = globalCheckpointSyncer; this.retentionLeaseSyncer = Objects.requireNonNull(retentionLeaseSyncer); this.sendRefreshMapping = settings.getAsBoolean("indices.cluster.send_refresh_mapping", true); + this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService; } @Override @@ -657,7 +669,8 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR globalCheckpointSyncer, retentionLeaseSyncer, nodes.getLocalNode(), - sourceNode + sourceNode, + remoteRefreshSegmentPressureService ); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed to create shard", e, state); @@ -1015,7 +1028,8 @@ T createShard( Consumer globalCheckpointSyncer, RetentionLeaseSyncer retentionLeaseSyncer, DiscoveryNode targetNode, - @Nullable DiscoveryNode sourceNode + @Nullable DiscoveryNode sourceNode, + RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService ) throws IOException; /** diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java index d412b5383bc89..cc7b5cb8dc845 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java @@ -77,6 +77,7 @@ import org.opensearch.index.mapper.Mapping; import org.opensearch.index.mapper.MetadataFieldMapper; import org.opensearch.index.mapper.RootObjectMapper; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; @@ -1072,6 +1073,7 @@ public void testHandlePrimaryTermValidationRequestWithDifferentAllocationId() { mock(ActionFilters.class), mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), + mock(RemoteRefreshSegmentPressureService.class), mock(SystemIndices.class) ); action.handlePrimaryTermValidationRequest( @@ -1102,6 +1104,7 @@ public void testHandlePrimaryTermValidationRequestWithOlderPrimaryTerm() { mock(ActionFilters.class), mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), + mock(RemoteRefreshSegmentPressureService.class), mock(SystemIndices.class) ); action.handlePrimaryTermValidationRequest( @@ -1132,6 +1135,7 @@ public void testHandlePrimaryTermValidationRequestSuccess() { mock(ActionFilters.class), mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), + mock(RemoteRefreshSegmentPressureService.class), mock(SystemIndices.class) ); action.handlePrimaryTermValidationRequest( @@ -1173,6 +1177,7 @@ private TransportShardBulkAction createAction() { mock(ActionFilters.class), mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), + mock(RemoteRefreshSegmentPressureService.class), mock(SystemIndices.class) ); } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java index c5a6c0323a6f9..529f65ad21083 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java @@ -149,7 +149,7 @@ public void testValidateSegmentUploadLag() { e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments")); assertTrue(e.getMessage().contains("failure_streak_count:11 min_consecutive_failure_threshold:10")); - pressureTracker.incrementTotalUploadSucceeded(); + pressureTracker.incrementTotalUploadsSucceeded(); pressureService.validateSegmentsUploadLag(shardId); } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java index 48bc28e3a497d..1510623050d40 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java @@ -240,9 +240,9 @@ public void testIncrementTotalUploadSucceeded() { pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() ); - pressureTracker.incrementTotalUploadSucceeded(); + pressureTracker.incrementTotalUploadsSucceeded(); assertEquals(1, pressureTracker.getTotalUploadsSucceeded()); - pressureTracker.incrementTotalUploadSucceeded(); + pressureTracker.incrementTotalUploadsSucceeded(); assertEquals(2, pressureTracker.getTotalUploadsSucceeded()); } @@ -257,7 +257,7 @@ public void testGetInflightUploads() { assertEquals(1, pressureTracker.getInflightUploads()); pressureTracker.incrementTotalUploadsStarted(); assertEquals(2, pressureTracker.getInflightUploads()); - pressureTracker.incrementTotalUploadSucceeded(); + pressureTracker.incrementTotalUploadsSucceeded(); assertEquals(1, pressureTracker.getInflightUploads()); pressureTracker.incrementTotalUploadsFailed(); assertEquals(0, pressureTracker.getInflightUploads()); @@ -287,7 +287,7 @@ public void testGetConsecutiveFailureCount() { assertEquals(1, pressureTracker.getConsecutiveFailureCount()); pressureTracker.incrementTotalUploadsFailed(); assertEquals(2, pressureTracker.getConsecutiveFailureCount()); - pressureTracker.incrementTotalUploadSucceeded(); + pressureTracker.incrementTotalUploadsSucceeded(); assertEquals(0, pressureTracker.getConsecutiveFailureCount()); } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 158a9e9fb2229..c5a1e6e9a13a3 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -18,10 +18,13 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.index.engine.InternalEngineFactory; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.threadpool.ThreadPool; @@ -39,7 +42,9 @@ public class RemoteStoreRefreshListenerTests extends IndexShardTestCase { private IndexShard indexShard; + private ClusterService clusterService; private RemoteStoreRefreshListener remoteStoreRefreshListener; + private RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; public void setup(boolean primary, int numberOfDocs) throws IOException { indexShard = newStartedShard( @@ -51,7 +56,15 @@ public void setup(boolean primary, int numberOfDocs) throws IOException { indexDocs(1, numberOfDocs); indexShard.refresh("test"); - remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + remoteRefreshSegmentPressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); + remoteRefreshSegmentPressureService.afterIndexShardCreated(indexShard); + remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard, remoteRefreshSegmentPressureService); + remoteStoreRefreshListener.beforeRefresh(); } private void indexDocs(int startDocId, int numberOfDocs) throws IOException { @@ -62,11 +75,9 @@ private void indexDocs(int startDocId, int numberOfDocs) throws IOException { @After public void tearDown() throws Exception { - if (indexShard != null) { - Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) indexShard.store().directory()).getDelegate()).getDelegate(); - ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); - closeShards(indexShard); - } + Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) indexShard.store().directory()).getDelegate()).getDelegate(); + ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); + closeShards(indexShard); super.tearDown(); } @@ -308,8 +319,21 @@ private void mockIndexShardWithRetryAndScheduleRefresh( return indexShard.getEngine(); }).when(shard).getEngine(); - RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard); - refreshListener.afterRefresh(false); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService = new RemoteRefreshSegmentPressureService( + clusterService, + Settings.EMPTY + ); + when(shard.indexSettings()).thenReturn(indexShard.indexSettings()); + when(shard.shardId()).thenReturn(indexShard.shardId()); + remoteRefreshSegmentPressureService.afterIndexShardCreated(shard); + RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard, remoteRefreshSegmentPressureService); + refreshListener.beforeRefresh(); + refreshListener.afterRefresh(true); } private static class TestFilterDirectory extends FilterDirectory { diff --git a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 0989bf869f18e..213a22539971f 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -153,7 +153,8 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem newRouting, s -> {}, RetentionLeaseSyncer.EMPTY, - SegmentReplicationCheckpointPublisher.EMPTY + SegmentReplicationCheckpointPublisher.EMPTY, + null ); IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(5, counter.get()); diff --git a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 0619e3e3f62a2..c8e0460758df1 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -46,6 +46,7 @@ import org.opensearch.index.Index; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; @@ -262,7 +263,8 @@ public MockIndexShard createShard( final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, - final DiscoveryNode sourceNode + final DiscoveryNode sourceNode, + final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService ) throws IOException { failRandomly(); RecoveryState recoveryState = new RecoveryState(shardRouting, targetNode, sourceNode); diff --git a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 22a5194b50f6d..6fa39fea4cbd9 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -581,7 +581,8 @@ private IndicesClusterStateService createIndicesClusterStateService( null, primaryReplicaSyncer, s -> {}, - RetentionLeaseSyncer.EMPTY + RetentionLeaseSyncer.EMPTY, + null ); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 3f5ef4b824afa..a4d770983085d 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -171,6 +171,7 @@ import org.opensearch.index.IndexingPressureService; import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.analysis.AnalysisRegistry; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.seqno.GlobalCheckpointSyncAction; import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.shard.PrimaryReplicaSyncer; @@ -1901,7 +1902,8 @@ public void onFailure(final Exception e) { actionFilters ), RetentionLeaseSyncer.EMPTY, - SegmentReplicationCheckpointPublisher.EMPTY + SegmentReplicationCheckpointPublisher.EMPTY, + null ); Map actions = new HashMap<>(); final SystemIndices systemIndices = new SystemIndices(emptyMap()); @@ -1952,6 +1954,7 @@ public void onFailure(final Exception e) { mock(ShardStateAction.class), mock(ThreadPool.class) ), + mock(RemoteRefreshSegmentPressureService.class), new SystemIndices(emptyMap()) ); actions.put( diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index bb3b016560fa7..b609bfc8811e6 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -56,6 +56,7 @@ import org.opensearch.cluster.routing.ShardRoutingHelper; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.TestShardRouting; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.CheckedFunction; import org.opensearch.common.Nullable; import org.opensearch.common.UUIDs; @@ -89,6 +90,7 @@ import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.mapper.SourceToParse; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -169,6 +171,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; +import static org.opensearch.test.ClusterServiceUtils.createClusterService; /** * A base class for unit tests that need to create and shutdown {@link IndexShard} instances easily, @@ -204,12 +207,14 @@ public void onFailure(ReplicationState state, ReplicationFailedException e, bool protected ThreadPool threadPool; protected long primaryTerm; + protected ClusterService clusterService; @Override public void setUp() throws Exception { super.setUp(); threadPool = setUpThreadPool(); primaryTerm = randomIntBetween(1, 100); // use random but fixed term for creating shards + clusterService = createClusterService(threadPool); failOnShardFailures(); } @@ -221,6 +226,7 @@ protected ThreadPool setUpThreadPool() { public void tearDown() throws Exception { try { tearDownThreadPool(); + clusterService.close(); } finally { super.tearDown(); } @@ -564,8 +570,13 @@ protected IndexShard newShard( Collections.emptyList(), clusterSettings ); - if (remoteStore == null && indexSettings.isRemoteStoreEnabled()) { - remoteStore = createRemoteStore(createTempDir(), routing, indexMetadata); + + RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService = null; + if (indexSettings.isRemoteStoreEnabled()) { + if (remoteStore == null) { + remoteStore = createRemoteStore(createTempDir(), routing, indexMetadata); + } + remoteRefreshSegmentPressureService = new RemoteRefreshSegmentPressureService(clusterService, indexSettings.getSettings()); } final BiFunction translogFactorySupplier = (settings, shardRouting) -> { @@ -601,9 +612,13 @@ protected IndexShard newShard( breakerService, translogFactorySupplier, checkpointPublisher, - remoteStore + remoteStore, + remoteRefreshSegmentPressureService ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); + if (remoteRefreshSegmentPressureService != null) { + remoteRefreshSegmentPressureService.afterIndexShardCreated(indexShard); + } success = true; } finally { if (success == false) {