Skip to content

Commit

Permalink
Disable shard idle with segment replication.
Browse files Browse the repository at this point in the history
This change disables shard idle when segment replication is enabled.
Primary shards will only push out new segments on refresh, we do not want to block this based on search behavior.
Replicas will only refresh on an externally provided SegmentInfos, so we do not want search requests to hang waiting for a refresh.

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Aug 3, 2022
1 parent 6f23300 commit 5d353e8
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,18 @@ protected ReferenceManager<OpenSearchDirectoryReader> getReferenceManager(Search
return readerManager;
}

/**
* Refreshing of this engine will only happen internally when a new set of segments is received. The engine will ignore external
* refresh attempts so we can return false here. Further Engine's existing implementation reads DirectoryReader.isCurrent after acquiring a searcher.
* With this Engine's NRTReplicationReaderManager, This will use StandardDirectoryReader's implementation which determines if the reader is current by
* comparing the on-disk SegmentInfos version against the one in the reader, which at refresh points will always return isCurrent false and then refreshNeeded true.
* Even if this method returns refresh as needed, we ignore it and only ever refresh with incoming SegmentInfos.
*/
@Override
public boolean refreshNeeded() {
return false;
}

@Override
public Closeable acquireHistoryRetentionLock() {
throw new UnsupportedOperationException("Not implemented");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3778,6 +3778,10 @@ public boolean scheduledRefresh() {
if (listenerNeedsRefresh == false // if we have a listener that is waiting for a refresh we need to force it
&& isSearchIdle()
&& indexSettings.isExplicitRefresh() == false
&& indexSettings.isSegRepEnabled() == false
// Indices with segrep enabled will never wait on a refresh and ignore shard idle. Primary shards push out new segments only
// after a refresh, so we don't want to wait for a search to trigger that cycle. Replicas will only refresh after receiving
// a new set of segments.
&& active.get()) { // it must be active otherwise we might not free up segment memory once the shard became inactive
// lets skip this refresh since we are search idle and
// don't necessarily need to refresh. the next searcher access will register a refreshListener and that will
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.shard;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase;
import org.opensearch.indices.replication.common.ReplicationType;

public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelReplicationTestCase {

private static final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();

public void testIgnoreShardIdle() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);

final int numDocs = shards.indexDocs(randomInt(10));
primary.refresh("test");
replicateSegments(primary, shards.getReplicas());
shards.assertAllEqual(numDocs);

primary.scheduledRefresh();
replica.scheduledRefresh();

primary.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b));
replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b));

// Update the search_idle setting, this will put both shards into search idle.
Settings updatedSettings = Settings.builder()
.put(settings)
.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO)
.build();
primary.indexSettings().getScopedSettings().applySettings(updatedSettings);
replica.indexSettings().getScopedSettings().applySettings(updatedSettings);

primary.scheduledRefresh();
replica.scheduledRefresh();

// Shards without segrep will register a new RefreshListener on the engine and return true when registered,
// assert with segrep enabled that awaitShardSearchActive does not register a listener.
primary.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b));
replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,16 @@ protected ReplicationGroup createGroup(int replicas, Settings settings) throws I
return new ReplicationGroup(metadata);
}

protected ReplicationGroup createGroup(int replicas, Settings settings, EngineFactory engineFactory) throws IOException {
IndexMetadata metadata = buildIndexMetadata(replicas, settings, indexMapping);
return new ReplicationGroup(metadata) {
@Override
protected EngineFactory getEngineFactory(ShardRouting routing) {
return engineFactory;
}
};
}

protected IndexMetadata buildIndexMetadata(int replicas) throws IOException {
return buildIndexMetadata(replicas, indexMapping);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,15 @@

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.junit.Assert;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.PlainActionFuture;
Expand All @@ -58,6 +64,7 @@
import org.opensearch.common.lucene.uid.Versions;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.internal.io.IOUtils;
Expand All @@ -82,6 +89,7 @@
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.breaker.HierarchyCircuitBreakerService;
Expand All @@ -94,7 +102,14 @@
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.recovery.StartRecoveryRequest;
import org.opensearch.indices.replication.CheckpointInfoResponse;
import org.opensearch.indices.replication.GetSegmentFilesResponse;
import org.opensearch.indices.replication.SegmentReplicationSource;
import org.opensearch.indices.replication.SegmentReplicationTarget;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.CopyState;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.repositories.IndexId;
Expand All @@ -112,6 +127,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -122,6 +138,7 @@
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.Mockito.mock;
import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting;

/**
Expand Down Expand Up @@ -1133,4 +1150,117 @@ public static Engine.Warmer createTestWarmer(IndexSettings indexSettings) {
}
};
}

/**
* Segment Replication specific test method - Replicate segments to a list of replicas from a given primary.
* This test will use a real {@link SegmentReplicationTarget} for each replica with a mock {@link SegmentReplicationSource} that
* writes all segments directly to the target.
*/
public final void replicateSegments(IndexShard primaryShard, List<IndexShard> replicaShards) throws IOException, InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(replicaShards.size());
Store.MetadataSnapshot primaryMetadata;
try (final GatedCloseable<SegmentInfos> segmentInfosSnapshot = primaryShard.getSegmentInfosSnapshot()) {
final SegmentInfos primarySegmentInfos = segmentInfosSnapshot.get();
primaryMetadata = primaryShard.store().getMetadata(primarySegmentInfos);
}
final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primaryShard.shardId), primaryShard);

final ReplicationCollection<SegmentReplicationTarget> replicationCollection = new ReplicationCollection<>(logger, threadPool);
final SegmentReplicationSource source = new SegmentReplicationSource() {
@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
listener.onResponse(
new CheckpointInfoResponse(
copyState.getCheckpoint(),
copyState.getMetadataSnapshot(),
copyState.getInfosBytes(),
copyState.getPendingDeleteFiles()
)
);
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
Store store,
ActionListener<GetSegmentFilesResponse> listener
) {
try (
final ReplicationCollection.ReplicationRef<SegmentReplicationTarget> replicationRef = replicationCollection.get(
replicationId
)
) {
writeFileChunks(replicationRef.get(), primaryShard, filesToFetch.toArray(new StoreFileMetadata[] {}));
} catch (IOException e) {
listener.onFailure(e);
}
listener.onResponse(new GetSegmentFilesResponse(filesToFetch));
}
};

for (IndexShard replica : replicaShards) {
final SegmentReplicationTarget target = new SegmentReplicationTarget(
ReplicationCheckpoint.empty(replica.shardId),
replica,
source,
new ReplicationListener() {
@Override
public void onDone(ReplicationState state) {
try (final GatedCloseable<SegmentInfos> snapshot = replica.getSegmentInfosSnapshot()) {
final SegmentInfos replicaInfos = snapshot.get();
final Store.MetadataSnapshot replicaMetadata = replica.store().getMetadata(replicaInfos);
final Store.RecoveryDiff recoveryDiff = primaryMetadata.recoveryDiff(replicaMetadata);
assertTrue(recoveryDiff.missing.isEmpty());
assertTrue(recoveryDiff.different.isEmpty());
assertEquals(recoveryDiff.identical.size(), primaryMetadata.size());
assertEquals(primaryMetadata.getCommitUserData(), replicaMetadata.getCommitUserData());
} catch (Exception e) {
throw ExceptionsHelper.convertToRuntime(e);
}
countDownLatch.countDown();
}

@Override
public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) {
logger.error("Unexpected replication failure in test", e);
Assert.fail("test replication should not fail: " + e);
}
}
);
replicationCollection.start(target, TimeValue.timeValueMillis(5000));
target.startReplication(new ActionListener<>() {
@Override
public void onResponse(Void o) {
replicationCollection.markAsDone(target.getId());
}

@Override
public void onFailure(Exception e) {
replicationCollection.fail(target.getId(), new OpenSearchException("Segment Replication failed", e), true);
}
});
}
countDownLatch.await(3, TimeUnit.SECONDS);
}

private void writeFileChunks(SegmentReplicationTarget target, IndexShard primary, StoreFileMetadata[] files) throws IOException {
for (StoreFileMetadata md : files) {
try (IndexInput in = primary.store().directory().openInput(md.name(), IOContext.READONCE)) {
int pos = 0;
while (pos < md.length()) {
int length = between(1, Math.toIntExact(md.length() - pos));
byte[] buffer = new byte[length];
in.readBytes(buffer, 0, length);
target.writeFileChunk(md, pos, new BytesArray(buffer), pos + length == md.length(), 0, mock(ActionListener.class));
pos += length;
}
}
}
}
}

0 comments on commit 5d353e8

Please sign in to comment.