Skip to content

Commit

Permalink
[Segment Replication] Update peer recovery logic for segment replication
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 committed Nov 22, 2022
1 parent 152f3f4 commit b413298
Show file tree
Hide file tree
Showing 15 changed files with 245 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Test] Add IAE test for deprecated edgeNGram analyzer name ([#5040](https://github.com/opensearch-project/OpenSearch/pull/5040))
- Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151))
- Add feature flag for extensions ([#5211](https://github.com/opensearch-project/OpenSearch/pull/5211))
- [Segment Replication] Update peer recovery logic for segment replication ([#5344](https://github.com/opensearch-project/OpenSearch/pull/5344))

### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.builder.SearchSourceBuilder;
Expand Down Expand Up @@ -701,7 +702,8 @@ public static final IndexShard newIndexShard(
cbs,
new InternalTranslogFactory(),
SegmentReplicationCheckpointPublisher.EMPTY,
null
null,
SegmentReplicationTargetService.NO_OP
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.junit.BeforeClass;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
Expand All @@ -23,9 +24,11 @@
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.Index;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
Expand Down Expand Up @@ -53,6 +56,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -91,6 +95,110 @@ protected boolean addMockInternalEngine() {
return false;
}


private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES);

public void ingestDocs(int docCount) throws Exception {
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(docCount);
waitForDocs(docCount, indexer);
refresh(INDEX_NAME);
waitForReplicaUpdate();
}
}

/**
* This Integration Relocates a primary shard to another node. Before Relocation and after relocation we index single document. We don't perform any flush
* before relocation is done.
* This test will pass if we perform flush before relocation.
*/
public void testSimplePrimaryRelocationWithoutFlushBeforeRelocation() throws Exception {
logger.info("--> starting [primary] ...");
final String old_primary = internalCluster().startNode();

logger.info("--> creating test index ...");
prepareCreate(
INDEX_NAME,
Settings.builder()
.put("index.number_of_shards", 1)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put("index.number_of_replicas", 1)
).get();

final String replica = internalCluster().startNode();

ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(0, 200);
ingestDocs(initialDocCount);

logger.info("--> verifying count {}", initialDocCount);
assertHitCount(client(old_primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

// If we do a flush before relocation, this test will pass.
// flush(INDEX_NAME);

logger.info("--> start another node");
final String new_primary = internalCluster().startNode();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("3")
.execute()
.actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

logger.info("--> relocate the shard");

client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(INDEX_NAME, 0, old_primary, new_primary))
.execute()
.actionGet();
clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.setTimeout(ACCEPTABLE_RELOCATION_TIME)
.execute()
.actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

final int finalDocCount = 1;
client().prepareIndex(INDEX_NAME).setId("20").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
refresh(INDEX_NAME);
waitForReplicaUpdate();

logger.info("--> verifying count again {} + {}", initialDocCount, finalDocCount);
client().admin().indices().prepareRefresh().execute().actionGet();
assertBusy(() -> {
assertHitCount(
client(new_primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(),
initialDocCount + finalDocCount
);
});
assertBusy(() -> {
assertHitCount(
client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(),
initialDocCount + finalDocCount
);
});
}

public void testPrimaryStopped_ReplicaPromoted() throws Exception {
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
Expand Down
7 changes: 5 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.script.ScriptService;
Expand Down Expand Up @@ -434,7 +435,8 @@ public synchronized IndexShard createShard(
final ShardRouting routing,
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationCheckpointPublisher checkpointPublisher
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final SegmentReplicationTargetService segmentReplicationTargetService
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
Expand Down Expand Up @@ -551,7 +553,8 @@ public synchronized IndexShard createShard(
// TODO Replace with remote translog factory in the follow up PR
this.indexSettings.isRemoteTranslogStoreEnabled() ? null : new InternalTranslogFactory(),
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore
remoteStore,
segmentReplicationTargetService
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ public void beforeRefresh() throws IOException {

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh && shard.state() != IndexShardState.CLOSED && shard.getReplicationTracker().isPrimaryMode()) {
if (didRefresh
&& shard.state() != IndexShardState.CLOSED
&& shard.getReplicationTracker().isPrimaryMode()
&& shard.isBlockInternalCheckPointRefresh() == false) {
publisher.publish(shard);
}
}
Expand Down
80 changes: 77 additions & 3 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.StepListener;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.opensearch.action.admin.indices.upgrade.post.UpgradeRequest;
Expand Down Expand Up @@ -162,8 +163,11 @@
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.rest.RestStatus;
Expand Down Expand Up @@ -308,6 +312,25 @@ Runnable getGlobalCheckpointSyncer() {
private final Store remoteStore;
private final TranslogFactory translogFactory;

public boolean isBlockInternalCheckPointRefresh() {
return blockInternalCheckPointRefresh;
}

public void setBlockInternalCheckPointRefresh(boolean blockInternalCheckPointRefresh) {
this.blockInternalCheckPointRefresh = blockInternalCheckPointRefresh;
}

/**
* Used with segment replication only.
*
* The flag is meant to block the segment replication events to all replicas from existing primary. This is done in
* order to ensure that file copied during peer recovery does not conflict with segment files from new primary post
* peer recovery.
*/
private boolean blockInternalCheckPointRefresh;

private final SegmentReplicationTargetService segmentReplicationTargetService;

public IndexShard(
final ShardRouting shardRouting,
final IndexSettings indexSettings,
Expand All @@ -331,7 +354,8 @@ public IndexShard(
final CircuitBreakerService circuitBreakerService,
final TranslogFactory translogFactory,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final Store remoteStore
@Nullable final Store remoteStore,
@Nullable final SegmentReplicationTargetService segmentReplicationTargetService
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -417,6 +441,44 @@ public boolean shouldCache(Query query) {
this.checkpointPublisher = checkpointPublisher;
this.remoteStore = remoteStore;
this.translogFactory = translogFactory;

this.segmentReplicationTargetService = segmentReplicationTargetService;
}

/**
* Used with Segment replication only
*
* This function is used to perform a segment replication on target primary node in order to copy segment files
* previously copied to other replicas. This is done so that new primary doesn't conflict during new segment
* replication round with existing replicas.
* @param listener
*/
public void performSegmentReplicationRefresh(StepListener<Void> listener) {
this.segmentReplicationTargetService.startReplication(
ReplicationCheckpoint.empty(this.shardId()),
this,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
try {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { resetEngineToGlobalCheckpoint(); });
listener.onResponse(null);
} catch (InterruptedException | TimeoutException | IOException e) {
listener.onFailure(e);
throw new RuntimeException(e);
}
}

@Override
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
logger.error("segment replication failure post recovery {}", e);
listener.onFailure(e);
if (sendShardFailure == true) {
failShard("segment replication failure post recovery", e);
}
}
}
);
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -3287,6 +3349,10 @@ private DocumentMapperForType docMapper() {
}

private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) throws IOException {
return this.newEngineConfig(globalCheckpointSupplier, false);
}

private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier, boolean forceReadWriteEngine) throws IOException {
final Sort indexSort = indexSortSupplier.get();
final Engine.Warmer warmer = reader -> {
assert Thread.holdsLock(mutex) == false : "warming engine under mutex";
Expand All @@ -3305,6 +3371,14 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
}

boolean isReadOnlyReplica = indexSettings.isSegRepEnabled() && shardRouting.primary() == false;
/**
* Recover relocating primary shard as replica with segment replication.
*/
if (shardRouting.isRelocationTarget() && shardRouting.primary() == true && forceReadWriteEngine == false) {
isReadOnlyReplica = true;
}

return this.engineConfigFactory.newEngineConfig(
shardId,
threadPool,
Expand All @@ -3328,7 +3402,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
replicationTracker::getRetentionLeases,
() -> getOperationPrimaryTerm(),
tombstoneDocSupplier(),
indexSettings.isSegRepEnabled() && shardRouting.primary() == false,
isReadOnlyReplica,
translogFactory
);
}
Expand Down Expand Up @@ -4118,7 +4192,7 @@ public void close() throws IOException {
}
};
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker, true)));
onNewEngine(newEngineReference.get());
}
final TranslogRecoveryRunner translogRunner = (snapshot) -> runTranslogRecovery(
Expand Down
12 changes: 10 additions & 2 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.node.Node;
import org.opensearch.plugins.IndexStorePlugin;
Expand Down Expand Up @@ -851,14 +852,21 @@ public IndexShard createShard(
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final DiscoveryNode targetNode,
final DiscoveryNode sourceNode
final DiscoveryNode sourceNode,
final SegmentReplicationTargetService segmentReplicationTargetService
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
ensureChangesAllowed();
IndexService indexService = indexService(shardRouting.index());
assert indexService != null;
RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode);
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher);
IndexShard indexShard = indexService.createShard(
shardRouting,
globalCheckpointSyncer,
retentionLeaseSyncer,
checkpointPublisher,
segmentReplicationTargetService
);
indexShard.addShardFailureCallback(onShardFailure);
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS
Expand Down
Loading

0 comments on commit b413298

Please sign in to comment.