Skip to content

Commit

Permalink
Test opening engine after initial copy.
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 authored and dreamer-89 committed Jun 28, 2022
1 parent f8aa20a commit 7b9ed28
Show file tree
Hide file tree
Showing 15 changed files with 361 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.junit.Assert;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.opensearch.action.admin.indices.segments.ShardSegments;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexModule;
import org.opensearch.index.engine.Segment;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationIT extends OpenSearchIntegTestCase {

private static final String INDEX_NAME = "test-idx-1";
private static final int SHARD_COUNT = 1;
private static final int REPLICA_COUNT = 1;

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(0, 200);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);
refresh(INDEX_NAME);

// wait a short amount of time to give replication a chance to complete.
Thread.sleep(1000);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int expectedHitCount = initialDocCount + additionalDocCount;
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);

flushAndRefresh(INDEX_NAME);
Thread.sleep(1000);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);

ensureGreen(INDEX_NAME);
assertSegmentStats(REPLICA_COUNT);
}
}

private void assertSegmentStats(int numberOfReplicas) {
client().admin().indices().segments(new IndicesSegmentsRequest(), new ActionListener<>() {
@Override
public void onResponse(IndicesSegmentResponse indicesSegmentResponse) {

List<ShardSegments[]> segmentsByIndex = indicesSegmentResponse.getIndices()
.values()
.stream() // get list of IndexSegments
.flatMap(is -> is.getShards().values().stream()) // Map to shard replication group
.map(IndexShardSegments::getShards) // get list of segments across replication group
.collect(Collectors.toList());

// There will be an entry in the list for each index.
for (ShardSegments[] replicationGroupSegments : segmentsByIndex) {

// Separate Primary & replica shards ShardSegments.
final Map<Boolean, List<ShardSegments>> segmentListMap = Arrays.stream(replicationGroupSegments)
.collect(Collectors.groupingBy(s -> s.getShardRouting().primary()));
final List<ShardSegments> primaryShardSegmentsList = segmentListMap.get(true);
final List<ShardSegments> replicaShardSegments = segmentListMap.get(false);

assertEquals("There should only be one primary in the replicationGroup", primaryShardSegmentsList.size(), 1);
final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get();

// create a map of the primary's segments keyed by segment name, allowing us to compare the same segment found on
// replicas.
final Map<String, Segment> primarySegmentsMap = primaryShardSegments.getSegments()
.stream()
.collect(Collectors.toMap(Segment::getName, Function.identity()));
// For every replica, ensure that its segments are in the same state as on the primary.
// It is possible the primary has not cleaned up old segments that are not required on replicas, so we can't do a
// list comparison.
// This equality check includes search/committed properties on the Segment. Combined with docCount checks,
// this ensures the replica has correctly copied the latest segments and has all segments referenced by the latest
// commit point, even if they are not searchable.
assertEquals(
"There should be a ShardSegment entry for each replica in the replicationGroup",
numberOfReplicas,
replicaShardSegments.size()
);

for (ShardSegments shardSegment : replicaShardSegments) {
for (Segment replicaSegment : shardSegment.getSegments()) {
final Segment primarySegment = primarySegmentsMap.get(replicaSegment.getName());
assertEquals("Replica's segment should be identical to primary's version", replicaSegment, primarySegment);
}
}
}
}

@Override
public void onFailure(Exception e) {
Assert.fail("Error fetching segment stats");
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class FeatureFlags {
* and false otherwise.
*/
public static boolean isEnabled(String featureFlagName) {
return "true".equalsIgnoreCase(System.getProperty(featureFlagName));
return true;
// return "true".equalsIgnoreCase(System.getProperty(featureFlagName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,13 @@ public class NRTReplicationEngine extends Engine {

public NRTReplicationEngine(EngineConfig engineConfig) {
super(engineConfig);
logger.info("Creating NRTReplicationEngine");
store.incRef();
NRTReplicationReaderManager readerManager = null;
try {
logger.info("NRTEngine dir files {}", Arrays.asList(store.directory().listAll()));
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
logger.info("NRTEngine files {}", lastCommittedSegmentInfos.files(true));
readerManager = new NRTReplicationReaderManager(OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId));
final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(
this.lastCommittedSegmentInfos.getUserData().entrySet()
Expand Down Expand Up @@ -93,6 +96,8 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th
this.lastCommittedSegmentInfos = infos;
rollTranslogGeneration();
}
logger.info("ADVANCING PROCESSED SEQ NO TO {}", seqNo);
logger.info("persisted seq no {}", localCheckpointTracker.getPersistedCheckpoint());
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
}

Expand Down
31 changes: 28 additions & 3 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.rest.RestStatus;
Expand Down Expand Up @@ -1396,6 +1396,15 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
* Returns the lastest Replication Checkpoint that shard received
*/
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
if (isActive() == false) {
return new ReplicationCheckpoint(
shardId,
getOperationPrimaryTerm(),
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED
);
}
try (final GatedCloseable<SegmentInfos> snapshot = getSegmentInfosSnapshot()) {
return Optional.ofNullable(snapshot.get())
.map(
Expand Down Expand Up @@ -1944,7 +1953,12 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
*/
public void openEngineAndSkipTranslogRecovery() throws IOException {
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
if (indexSettings.isSegRepEnabled() == false) {
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
} else {
recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
}
loadGlobalCheckpointToReplicationTracker();
innerOpenEngineAndTranslog(replicationTracker);
getEngine().skipTranslogRecovery();
Expand All @@ -1970,6 +1984,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
assert currentEngineReference.get() == null : "engine is running";
verifyNotClosed();
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
logger.info("ENGINE FACTORY TYPE {}", engineFactory.getClass());
final Engine newEngine = engineFactory.newReadWriteEngine(config);
onNewEngine(newEngine);
currentEngineReference.set(newEngine);
Expand Down Expand Up @@ -2975,6 +2990,7 @@ public void startRecovery(
RecoveryState recoveryState,
PeerRecoveryTargetService recoveryTargetService,
RecoveryListener recoveryListener,
SegmentReplicationTargetService segmentReplicationTargetService,
RepositoriesService repositoriesService,
Consumer<MappingMetadata> mappingUpdateConsumer,
IndicesService indicesService
Expand Down Expand Up @@ -3004,7 +3020,16 @@ public void startRecovery(
case PEER:
try {
markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);
recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener);
if (indexSettings.isSegRepEnabled()) {
// Start a "Recovery" using segment replication. This ensures the shard is tracked by the primary
// and started with the latest set of segments.
segmentReplicationTargetService.recoverShard(
this,
recoveryListener
);
} else {
recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener);
}
} catch (Exception e) {
failShard("corrupted preexisting index", e);
recoveryListener.onFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true);
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -1003,7 +1004,7 @@ static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory director
// version is written since 3.1+: we should have already hit IndexFormatTooOld.
throw new IllegalArgumentException("expected valid version value: " + info.info.toString());
}
if (version.onOrAfter(maxVersion)) {
if (maxVersion == null || version.onOrAfter(maxVersion)) {
maxVersion = version;
}
for (String file : info.files()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.node.Node;
import org.opensearch.plugins.IndexStorePlugin;
Expand Down Expand Up @@ -847,6 +848,7 @@ public IndexShard createShard(
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final PeerRecoveryTargetService recoveryTargetService,
final RecoveryListener recoveryListener,
final SegmentReplicationTargetService segmentReplicationTargetService,
final RepositoriesService repositoriesService,
final Consumer<IndexShard.ShardFailure> onShardFailure,
final Consumer<ShardId> globalCheckpointSyncer,
Expand All @@ -867,7 +869,7 @@ public IndexShard createShard(
repositoriesService
);
indexShard.addShardFailureCallback(onShardFailure);
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> {
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, segmentReplicationTargetService, repositoriesService, mapping -> {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS
: "mapping update consumer only required by local shards recovery";
client.admin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -120,6 +121,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final PeerRecoveryTargetService recoveryTargetService;
private final SegmentReplicationTargetService segmentReplicationTargetService;
private final ShardStateAction shardStateAction;
private final NodeMappingRefreshAction nodeMappingRefreshAction;

Expand Down Expand Up @@ -148,6 +150,7 @@ public IndicesClusterStateService(
final ClusterService clusterService,
final ThreadPool threadPool,
final PeerRecoveryTargetService recoveryTargetService,
final SegmentReplicationTargetService segmentReplicationTargetService,
final ShardStateAction shardStateAction,
final NodeMappingRefreshAction nodeMappingRefreshAction,
final RepositoriesService repositoriesService,
Expand All @@ -166,6 +169,7 @@ public IndicesClusterStateService(
threadPool,
checkpointPublisher,
recoveryTargetService,
segmentReplicationTargetService,
shardStateAction,
nodeMappingRefreshAction,
repositoriesService,
Expand All @@ -186,6 +190,7 @@ public IndicesClusterStateService(
final ThreadPool threadPool,
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final PeerRecoveryTargetService recoveryTargetService,
final SegmentReplicationTargetService segmentReplicationTargetService,
final ShardStateAction shardStateAction,
final NodeMappingRefreshAction nodeMappingRefreshAction,
final RepositoriesService repositoriesService,
Expand All @@ -198,11 +203,12 @@ public IndicesClusterStateService(
) {
this.settings = settings;
this.checkpointPublisher = checkpointPublisher;
this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, snapshotShardsService);
this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, segmentReplicationTargetService, searchService, snapshotShardsService);
this.indicesService = indicesService;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.recoveryTargetService = recoveryTargetService;
this.segmentReplicationTargetService = segmentReplicationTargetService;
this.shardStateAction = shardStateAction;
this.nodeMappingRefreshAction = nodeMappingRefreshAction;
this.repositoriesService = repositoriesService;
Expand Down Expand Up @@ -634,6 +640,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR
checkpointPublisher,
recoveryTargetService,
new RecoveryListener(shardRouting, primaryTerm, this),
segmentReplicationTargetService,
repositoriesService,
failedShardHandler,
globalCheckpointSyncer,
Expand Down Expand Up @@ -992,6 +999,7 @@ T createShard(
SegmentReplicationCheckpointPublisher checkpointPublisher,
PeerRecoveryTargetService recoveryTargetService,
RecoveryListener recoveryListener,
SegmentReplicationTargetService segmentReplicationTargetService,
RepositoriesService repositoriesService,
Consumer<IndexShard.ShardFailure> onShardFailure,
Consumer<ShardId> globalCheckpointSyncer,
Expand Down
Loading

0 comments on commit 7b9ed28

Please sign in to comment.