Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "SegRep with Remote: Add hook for publishing checkpoint notifi… #7839

Merged
merged 1 commit into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Support to clear filecache using clear indices cache API ([#7498](https://github.com/opensearch-project/OpenSearch/pull/7498))
- Create NamedRoute to map extension routes to a shortened name ([#6870](https://github.com/opensearch-project/OpenSearch/pull/6870))
- Added @dbwiddis as on OpenSearch maintainer ([#7665](https://github.com/opensearch-project/OpenSearch/pull/7665))
- SegRep with Remote: Add hook for publishing checkpoint notifications after segment upload to remote store ([#7394](https://github.com/opensearch-project/OpenSearch/pull/7394))
- [Extensions] Add ExtensionAwarePlugin extension point to add custom settings for extensions ([#7526](https://github.com/opensearch-project/OpenSearch/pull/7526))
- Add new cluster setting to set default index replication type ([#7420](https://github.com/opensearch-project/OpenSearch/pull/7420))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1030,14 +1030,6 @@ public boolean isSegRepEnabled() {
return ReplicationType.SEGMENT.equals(replicationType);
}

public boolean isSegRepLocalEnabled() {
return isSegRepEnabled() && !isSegRepWithRemoteEnabled();
}

public boolean isSegRepWithRemoteEnabled() {
return isSegRepEnabled() && isRemoteStoreEnabled() && FeatureFlags.isEnabled(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL);
}

/**
* Returns if remote store is enabled for this index.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ public void beforeRefresh() throws IOException {

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh
&& shard.state() == IndexShardState.STARTED
&& shard.getReplicationTracker().isPrimaryMode()
&& !shard.indexSettings.isSegRepWithRemoteEnabled()) {
if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode()) {
publisher.publish(shard, shard.getLatestReplicationCheckpoint());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3552,16 +3552,10 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
if (isRemoteStoreEnabled()) {
internalRefreshListener.add(
new RemoteStoreRefreshListener(
this,
// Add the checkpoint publisher if the Segment Replciation via remote store is enabled.
indexSettings.isSegRepWithRemoteEnabled() ? this.checkpointPublisher : SegmentReplicationCheckpointPublisher.EMPTY,
remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId())
)
new RemoteStoreRefreshListener(this, remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId()))
);
}

if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) {
if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
}
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -108,15 +107,9 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres
*/
private final Map<String, Long> latestFileNameSizeOnLocalMap = ConcurrentCollections.newConcurrentMap();

private final SegmentReplicationCheckpointPublisher checkpointPublisher;

private final FileUploader fileUploader;

public RemoteStoreRefreshListener(
IndexShard indexShard,
SegmentReplicationCheckpointPublisher checkpointPublisher,
RemoteRefreshSegmentTracker segmentTracker
) {
public RemoteStoreRefreshListener(IndexShard indexShard, RemoteRefreshSegmentTracker segmentTracker) {
this.indexShard = indexShard;
this.storeDirectory = indexShard.store().directory();
this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
Expand All @@ -132,7 +125,6 @@ public RemoteStoreRefreshListener(
}
this.segmentTracker = segmentTracker;
resetBackOffDelayIterator();
this.checkpointPublisher = checkpointPublisher;
this.fileUploader = new FileUploader(new UploadTracker() {
@Override
public void beforeUpload(String file) {
Expand Down Expand Up @@ -245,7 +237,6 @@ private synchronized void syncSegments(boolean isRetry) {
clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh);
onSuccessfulSegmentsSync(refreshTimeMs, refreshSeqNo);
((InternalEngine) indexShard.getEngine()).translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1);
checkpointPublisher.publish(indexShard, checkpoint);
// At this point since we have uploaded new segments, segment infos and segment metadata file,
// along with marking minSeqNoToKeep, upload has succeeded completely.
shouldRetry = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.opensearch.index.remote.RemoteRefreshSegmentTracker;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -71,7 +70,6 @@ public void setup(boolean primary, int numberOfDocs) throws IOException {
remoteRefreshSegmentPressureService.afterIndexShardCreated(indexShard);
remoteStoreRefreshListener = new RemoteStoreRefreshListener(
indexShard,
SegmentReplicationCheckpointPublisher.EMPTY,
remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId())
);
}
Expand Down Expand Up @@ -415,7 +413,6 @@ private Tuple<RemoteStoreRefreshListener, RemoteRefreshSegmentPressureService> m
remoteRefreshSegmentPressureService.afterIndexShardCreated(shard);
RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(
shard,
SegmentReplicationCheckpointPublisher.EMPTY,
remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId())
);
refreshListener.afterRefresh(true);
Expand Down