From 8df5cc315b878bc182717490d2aa0d0611e0fce8 Mon Sep 17 00:00:00 2001 From: Ankit Kala Date: Wed, 31 May 2023 21:58:21 +0530 Subject: [PATCH] Revert "SegRep with Remote: Add hook for publishing checkpoint notifications after segment upload to remote store (#7394) (#7718)" This reverts commit 92571b75256428d037569d52df9a6b76294a9330. Signed-off-by: Ankit Kala --- CHANGELOG.md | 1 - .../main/java/org/opensearch/index/IndexSettings.java | 8 -------- .../index/shard/CheckpointRefreshListener.java | 5 +---- .../java/org/opensearch/index/shard/IndexShard.java | 10 ++-------- .../index/shard/RemoteStoreRefreshListener.java | 11 +---------- .../index/shard/RemoteStoreRefreshListenerTests.java | 2 -- 6 files changed, 4 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d960579f2bd82..6e4e3f1f79eaa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,7 +23,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Support to clear filecache using clear indices cache API ([#7498](https://github.com/opensearch-project/OpenSearch/pull/7498)) - Create NamedRoute to map extension routes to a shortened name ([#6870](https://github.com/opensearch-project/OpenSearch/pull/6870)) - Added @dbwiddis as on OpenSearch maintainer ([#7665](https://github.com/opensearch-project/OpenSearch/pull/7665)) -- SegRep with Remote: Add hook for publishing checkpoint notifications after segment upload to remote store ([#7394](https://github.com/opensearch-project/OpenSearch/pull/7394)) - [Extensions] Add ExtensionAwarePlugin extension point to add custom settings for extensions ([#7526](https://github.com/opensearch-project/OpenSearch/pull/7526)) - Add new cluster setting to set default index replication type ([#7420](https://github.com/opensearch-project/OpenSearch/pull/7420)) diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 725f7c5a7ff3c..1d46125281ced 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -1030,14 +1030,6 @@ public boolean isSegRepEnabled() { return ReplicationType.SEGMENT.equals(replicationType); } - public boolean isSegRepLocalEnabled() { - return isSegRepEnabled() && !isSegRepWithRemoteEnabled(); - } - - public boolean isSegRepWithRemoteEnabled() { - return isSegRepEnabled() && isRemoteStoreEnabled() && FeatureFlags.isEnabled(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL); - } - /** * Returns if remote store is enabled for this index. */ diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java index 4254586f3d70e..66d095878d123 100644 --- a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -40,10 +40,7 @@ public void beforeRefresh() throws IOException { @Override public void afterRefresh(boolean didRefresh) throws IOException { - if (didRefresh - && shard.state() == IndexShardState.STARTED - && shard.getReplicationTracker().isPrimaryMode() - && !shard.indexSettings.isSegRepWithRemoteEnabled()) { + if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode()) { publisher.publish(shard, shard.getLatestReplicationCheckpoint()); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 668a7469bfec5..5270fd570e884 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3552,16 +3552,10 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric)); if (isRemoteStoreEnabled()) { internalRefreshListener.add( - new RemoteStoreRefreshListener( - this, - // Add the checkpoint publisher if the Segment Replciation via remote store is enabled. - indexSettings.isSegRepWithRemoteEnabled() ? this.checkpointPublisher : SegmentReplicationCheckpointPublisher.EMPTY, - remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId()) - ) + new RemoteStoreRefreshListener(this, remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId())) ); } - - if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) { + if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) { internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher)); } /** diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 666c7a26ccd8d..fc54e5210b602 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -31,7 +31,6 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; -import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; @@ -108,15 +107,9 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres */ private final Map latestFileNameSizeOnLocalMap = ConcurrentCollections.newConcurrentMap(); - private final SegmentReplicationCheckpointPublisher checkpointPublisher; - private final FileUploader fileUploader; - public RemoteStoreRefreshListener( - IndexShard indexShard, - SegmentReplicationCheckpointPublisher checkpointPublisher, - RemoteRefreshSegmentTracker segmentTracker - ) { + public RemoteStoreRefreshListener(IndexShard indexShard, RemoteRefreshSegmentTracker segmentTracker) { this.indexShard = indexShard; this.storeDirectory = indexShard.store().directory(); this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()) @@ -132,7 +125,6 @@ public RemoteStoreRefreshListener( } this.segmentTracker = segmentTracker; resetBackOffDelayIterator(); - this.checkpointPublisher = checkpointPublisher; this.fileUploader = new FileUploader(new UploadTracker() { @Override public void beforeUpload(String file) { @@ -245,7 +237,6 @@ private synchronized void syncSegments(boolean isRetry) { clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh); onSuccessfulSegmentsSync(refreshTimeMs, refreshSeqNo); ((InternalEngine) indexShard.getEngine()).translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1); - checkpointPublisher.publish(indexShard, checkpoint); // At this point since we have uploaded new segments, segment infos and segment metadata file, // along with marking minSeqNoToKeep, upload has succeeded completely. shouldRetry = false; diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 30809092471de..3473842b85c1b 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -29,7 +29,6 @@ import org.opensearch.index.remote.RemoteRefreshSegmentTracker; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; -import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; @@ -71,7 +70,6 @@ public void setup(boolean primary, int numberOfDocs) throws IOException { remoteRefreshSegmentPressureService.afterIndexShardCreated(indexShard); remoteStoreRefreshListener = new RemoteStoreRefreshListener( indexShard, - SegmentReplicationCheckpointPublisher.EMPTY, remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()) ); }