Skip to content

Commit

Permalink
SegRep with Remote: Add hook for publishing checkpoint notifications …
Browse files Browse the repository at this point in the history
…after segment upload to remote store (#7394) (#7718)

Signed-off-by: Ankit Kala <[email protected]>
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 authored May 24, 2023
1 parent 636e772 commit 92571b7
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Support to clear filecache using clear indices cache API ([#7498](https://github.com/opensearch-project/OpenSearch/pull/7498))
- Create NamedRoute to map extension routes to a shortened name ([#6870](https://github.com/opensearch-project/OpenSearch/pull/6870))
- Added @dbwiddis as on OpenSearch maintainer ([#7665](https://github.com/opensearch-project/OpenSearch/pull/7665))
- SegRep with Remote: Add hook for publishing checkpoint notifications after segment upload to remote store ([#7394](https://github.com/opensearch-project/OpenSearch/pull/7394))

### Dependencies
- Bump `com.netflix.nebula:gradle-info-plugin` from 12.0.0 to 12.1.3 (#7564)
Expand Down
8 changes: 8 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,14 @@ public boolean isSegRepEnabled() {
return ReplicationType.SEGMENT.equals(replicationType);
}

public boolean isSegRepLocalEnabled() {
return isSegRepEnabled() && !isSegRepWithRemoteEnabled();
}

public boolean isSegRepWithRemoteEnabled() {
return isSegRepEnabled() && isRemoteStoreEnabled() && FeatureFlags.isEnabled(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL);
}

/**
* Returns if remote store is enabled for this index.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ public void beforeRefresh() throws IOException {

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode()) {
if (didRefresh
&& shard.state() == IndexShardState.STARTED
&& shard.getReplicationTracker().isPrimaryMode()
&& !shard.indexSettings.isSegRepWithRemoteEnabled()) {
publisher.publish(shard, shard.getLatestReplicationCheckpoint());
}
}
Expand Down
11 changes: 9 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -3554,9 +3554,16 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
if (isRemoteStoreEnabled()) {
internalRefreshListener.add(new RemoteStoreRefreshListener(this));
internalRefreshListener.add(
new RemoteStoreRefreshListener(
this,
// Add the checkpoint publisher if the Segment Replciation via remote store is enabled.
indexSettings.isSegRepWithRemoteEnabled() ? this.checkpointPublisher : SegmentReplicationCheckpointPublisher.EMPTY
)
);
}
if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) {

if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
}
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -96,7 +98,9 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres

private volatile Scheduler.ScheduledCancellable scheduledCancellableRetry;

public RemoteStoreRefreshListener(IndexShard indexShard) {
private final SegmentReplicationCheckpointPublisher checkpointPublisher;

public RemoteStoreRefreshListener(IndexShard indexShard, SegmentReplicationCheckpointPublisher checkpointPublisher) {
this.indexShard = indexShard;
this.storeDirectory = indexShard.store().directory();
this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
Expand All @@ -111,6 +115,7 @@ public RemoteStoreRefreshListener(IndexShard indexShard) {
}
}
resetBackOffDelayIterator();
this.checkpointPublisher = checkpointPublisher;
}

@Override
Expand Down Expand Up @@ -151,6 +156,10 @@ private synchronized void syncSegments(boolean isRetry) {
deleteStaleCommits();
}

// Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can
// move.
ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();

String segmentInfoSnapshotFilename = null;
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
Expand Down Expand Up @@ -190,9 +199,11 @@ private synchronized void syncSegments(boolean isRetry) {
.filter(file -> !localSegmentsPostRefresh.contains(file))
.collect(Collectors.toSet())
.forEach(localSegmentChecksumMap::remove);
OnSuccessfulSegmentsSync();
onSuccessfulSegmentsSync();
final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();
((InternalEngine) indexShard.getEngine()).translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1);

checkpointPublisher.publish(indexShard, checkpoint);
} else {
shouldRetry = true;
}
Expand Down Expand Up @@ -229,7 +240,7 @@ private void beforeSegmentsSync(boolean isRetry) {
}
}

private void OnSuccessfulSegmentsSync() {
private void onSuccessfulSegmentsSync() {
// Reset the backoffDelayIterator for the future failures
resetBackOffDelayIterator();
// Cancel the scheduled cancellable retry if possible and set it to null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.index.engine.InternalEngineFactory;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -52,7 +53,7 @@ public void setup(boolean primary, int numberOfDocs) throws IOException {
indexDocs(1, numberOfDocs);
indexShard.refresh("test");

remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard);
remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard, SegmentReplicationCheckpointPublisher.EMPTY);
}

private void indexDocs(int startDocId, int numberOfDocs) throws IOException {
Expand Down Expand Up @@ -316,7 +317,7 @@ private void mockIndexShardWithRetryAndScheduleRefresh(
return indexShard.getEngine();
}).when(shard).getEngine();

RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard);
RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard, SegmentReplicationCheckpointPublisher.EMPTY);
refreshListener.afterRefresh(false);
}

Expand Down

0 comments on commit 92571b7

Please sign in to comment.