Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Remote Store] Fix relocation failure due to transport receive timeout (
Browse files Browse the repository at this point in the history
opensearch-project#10761)

* [Remote Store] Fix relocation failure due to transport receive timeout

Signed-off-by: Ashish Singh <[email protected]>

* Fix existing extended shardIdle for remote backed shards

Signed-off-by: Ashish Singh <[email protected]>

* Incorporate PR review comments

Signed-off-by: Ashish Singh <[email protected]>

---------

Signed-off-by: Ashish Singh <[email protected]>
ashking94 authored and austintlee committed Oct 23, 2023
1 parent 5a984a9 commit 48e40dc
Showing 10 changed files with 75 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -509,4 +509,27 @@ public void testRestoreSnapshotToIndexWithSameNameDifferentUUID() throws Excepti
assertHitCount(client(dataNodes.get(1)).prepareSearch(INDEX_NAME).setSize(0).get(), 50);
});
}

public void testNoSearchIdleForAnyReplicaCount() throws ExecutionException, InterruptedException {
internalCluster().startClusterManagerOnlyNode();
String primaryShardNode = internalCluster().startDataOnlyNodes(1).get(0);

createIndex(INDEX_NAME, remoteStoreIndexSettings(0));
ensureGreen(INDEX_NAME);
IndexShard indexShard = getIndexShard(primaryShardNode);
assertFalse(indexShard.isSearchIdleSupported());

String replicaShardNode = internalCluster().startDataOnlyNodes(1).get(0);
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);
ensureGreen(INDEX_NAME);
assertFalse(indexShard.isSearchIdleSupported());

indexShard = getIndexShard(replicaShardNode);
assertFalse(indexShard.isSearchIdleSupported());
}
}
3 changes: 3 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
@@ -1024,6 +1024,9 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
if (this.isRemoteStoreEnabled) {
logger.warn("Search idle is not supported for remote backed indices");
}
if (this.replicationType == ReplicationType.SEGMENT && this.getNumberOfReplicas() > 0) {
logger.warn("Search idle is not supported for indices with replicas using 'replication.type: SEGMENT'");
}
Original file line number Diff line number Diff line change
@@ -4425,7 +4425,6 @@ public final boolean isSearchIdle() {
}

/**
*
* Returns true if this shard supports search idle.
* <p>
* Indices using Segment Replication will ignore search idle unless there are no replicas.
@@ -4434,6 +4433,11 @@ public final boolean isSearchIdle() {
* a new set of segments.
*/
public final boolean isSearchIdleSupported() {
// If the index is remote store backed, then search idle is not supported. This is to ensure that async refresh
// task continues to upload to remote store periodically.
if (isRemoteTranslogEnabled()) {
return false;
}
return indexSettings.isSegRepEnabled() == false || indexSettings.getNumberOfReplicas() == 0;
}

Original file line number Diff line number Diff line change
@@ -430,10 +430,10 @@ public String getTranslogUUID() {
* @return if the translog should be flushed
*/
public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long flushThreshold) {
final long translogGenerationOfLastCommit = translog.getMinGenerationForSeqNo(
localCheckpointOfLastCommit + 1
).translogFileGeneration;
if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) {
// This is the minimum seqNo that is referred in translog and considered for calculating translog size
long minTranslogRefSeqNo = translog.getMinUnreferencedSeqNoInSegments(localCheckpointOfLastCommit + 1);
final long minReferencedTranslogGeneration = translog.getMinGenerationForSeqNo(minTranslogRefSeqNo).translogFileGeneration;
if (translog.sizeInBytesByMinGen(minReferencedTranslogGeneration) < flushThreshold) {
return false;
}
/*
@@ -454,7 +454,7 @@ public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long fl
final long translogGenerationOfNewCommit = translog.getMinGenerationForSeqNo(
localCheckpointTrackerSupplier.get().getProcessedCheckpoint() + 1
).translogFileGeneration;
return translogGenerationOfLastCommit < translogGenerationOfNewCommit
return minReferencedTranslogGeneration < translogGenerationOfNewCommit
|| localCheckpointTrackerSupplier.get().getProcessedCheckpoint() == localCheckpointTrackerSupplier.get().getMaxSeqNo();
}

Original file line number Diff line number Diff line change
@@ -544,4 +544,9 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) thro
}
}
}

@Override
public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) {
return minSeqNoToKeep;
}
}
Original file line number Diff line number Diff line change
@@ -2034,4 +2034,8 @@ public static String createEmptyTranslog(
writer.close();
return uuid;
}

public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) {
return minUnrefCheckpointInLastCommit;
}
}
Original file line number Diff line number Diff line change
@@ -376,7 +376,8 @@ private Tuple<RecoverySourceHandler, RemoteRecoveryTargetHandler> createRecovery
transportService,
request.targetNode(),
recoverySettings,
throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime)
throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime),
shard.isRemoteTranslogEnabled()
);
handler = RecoverySourceHandlerFactory.create(shard, recoveryTarget, request, recoverySettings);
return Tuple.tuple(handler, recoveryTarget);
Original file line number Diff line number Diff line change
@@ -75,14 +75,16 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
private final AtomicLong requestSeqNoGenerator = new AtomicLong(0);
private final RetryableTransportClient retryableTransportClient;
private final RemoteSegmentFileChunkWriter fileChunkWriter;
private final boolean remoteStoreEnabled;

public RemoteRecoveryTargetHandler(
long recoveryId,
ShardId shardId,
TransportService transportService,
DiscoveryNode targetNode,
RecoverySettings recoverySettings,
Consumer<Long> onSourceThrottle
Consumer<Long> onSourceThrottle,
boolean remoteStoreEnabled
) {
this.transportService = transportService;
// It is safe to pass the retry timeout value here because RemoteRecoveryTargetHandler
@@ -111,6 +113,7 @@ public RemoteRecoveryTargetHandler(
requestSeqNoGenerator,
onSourceThrottle
);
this.remoteStoreEnabled = remoteStoreEnabled;
}

public DiscoveryNode targetNode() {
@@ -129,7 +132,13 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Vo
);
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
final ActionListener<TransportResponse.Empty> responseListener = ActionListener.map(listener, r -> null);
retryableTransportClient.executeRetryableAction(action, request, responseListener, reader);
if (remoteStoreEnabled) {
// If remote store is enabled, during the prepare_translog phase, translog is also downloaded on the
// target host along with incremental segments download.
retryableTransportClient.executeRetryableAction(action, request, translogOpsRequestOptions, responseListener, reader);
} else {
retryableTransportClient.executeRetryableAction(action, request, responseListener, reader);
}
}

@Override
Original file line number Diff line number Diff line change
@@ -471,6 +471,15 @@ public void onReplicationFailure(
}
}

@Override
protected void validateShardIdleWithNoReplicas(IndexShard primary) {
// ensure search idle conditions are met.
assertFalse(primary.isSearchIdleSupported());
assertTrue(primary.isSearchIdle());
assertTrue(primary.scheduledRefresh());
assertFalse(primary.hasRefreshPending());
}

private void assertSingleSegmentFile(IndexShard shard, String fileName) throws IOException {
final Set<String> segmentsFileNames = Arrays.stream(shard.store().directory().listAll())
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
Original file line number Diff line number Diff line change
@@ -436,13 +436,17 @@ public void testShardIdleWithNoReplicas() throws Exception {
shards.startAll();
final IndexShard primary = shards.getPrimary();
shards.indexDocs(randomIntBetween(1, 10));
// ensure search idle conditions are met.
assertTrue(primary.isSearchIdle());
assertFalse(primary.scheduledRefresh());
assertTrue(primary.hasRefreshPending());
validateShardIdleWithNoReplicas(primary);
}
}

protected void validateShardIdleWithNoReplicas(IndexShard primary) {
// ensure search idle conditions are met.
assertTrue(primary.isSearchIdle());
assertFalse(primary.scheduledRefresh());
assertTrue(primary.hasRefreshPending());
}

/**
* here we are starting a new primary shard in PrimaryMode and testing if the shard publishes checkpoint after refresh.
*/

0 comments on commit 48e40dc

Please sign in to comment.