Skip to content

Commit

Permalink
[Segment Replication] Handling resource close (#6795)
Browse files Browse the repository at this point in the history
* [Segment Replication] Fix resource close handling

Signed-off-by: Suraj Singh <[email protected]>

* Fix unit tests and change exception type

Signed-off-by: Suraj Singh <[email protected]>

---------

Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 authored Mar 23, 2023
1 parent 4d78bbf commit e58a33d
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,6 @@ public RecoveryTarget retryCopy() {
return new RecoveryTarget(indexShard, sourceNode, listener);
}

public IndexShard indexShard() {
ensureRefCount();
return indexShard;
}

public String source() {
return sourceNode.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) thro
} else {
// From the checkpoint's shard ID, fetch the IndexShard
ShardId shardId = checkpoint.getShardId();
final IndexService indexService = indicesService.indexService(shardId.getIndex());
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShard(shardId.id());
// build the CopyState object and cache it before returning
final CopyState copyState = new CopyState(checkpoint, indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.lucene.store.ByteBuffersIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.common.UUIDs;
Expand Down Expand Up @@ -213,9 +214,10 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse,
ActionListener.completeWith(listener, () -> {
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION);
Store store = null;
try {
multiFileWriter.renameAllTempFiles();
final Store store = store();
store = store();
store.incRef();
// Deserialize the new SegmentInfos object sent from the primary.
final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint();
Expand Down Expand Up @@ -250,6 +252,13 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse,
);
fail(rfe, true);
throw rfe;
} catch (OpenSearchException ex) {
/*
Ignore closed replication target as it can happen due to index shard closed event in a separate thread.
In such scenario, ignore the exception
*/
assert cancellableThreads.isCancelled() : "Replication target closed but segment replication not cancelled";
logger.info("Replication target closed", ex);
} catch (Exception ex) {
ReplicationFailedException rfe = new ReplicationFailedException(
indexShard.shardId(),
Expand All @@ -259,7 +268,9 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse,
fail(rfe, true);
throw rfe;
} finally {
store.decRef();
if (store != null) {
store.decRef();
}
}
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ public void messageReceived(final ForceSyncRequest request, TransportChannel cha
assert indicesService != null;
final IndexShard indexShard = indicesService.getShardOrNull(request.getShardId());
// Proceed with round of segment replication only when it is allowed
if (indexShard.getReplicationEngine().isEmpty()) {
if (indexShard == null || indexShard.getReplicationEngine().isEmpty()) {
logger.info("Ignore force segment replication sync as it is not allowed");
channel.sendResponse(TransportResponse.Empty.INSTANCE);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.RateLimiter;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ChannelActionListener;
import org.opensearch.common.CheckedFunction;
Expand Down Expand Up @@ -186,7 +187,7 @@ public void fail(ReplicationFailedException e, boolean sendShardFailure) {

protected void ensureRefCount() {
if (refCount() <= 0) {
throw new ReplicationFailedException(
throw new OpenSearchException(
"ReplicationTarget is used but it's refcount is 0. Probably a mismatch between incRef/decRef calls"
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void setUp() throws Exception {
// This mirrors the creation of the ReplicationCheckpoint inside CopyState
testCheckpoint = new ReplicationCheckpoint(testShardId, primary.getOperationPrimaryTerm(), 0L, 0L);
IndexService mockIndexService = mock(IndexService.class);
when(mockIndicesService.indexService(testShardId.getIndex())).thenReturn(mockIndexService);
when(mockIndicesService.indexServiceSafe(testShardId.getIndex())).thenReturn(mockIndexService);
when(mockIndexService.getShard(testShardId.id())).thenReturn(primary);

TransportService transportService = mock(TransportService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void setUp() throws Exception {
ShardId testShardId = mockIndexShard.shardId();
IndicesService mockIndicesService = mock(IndicesService.class);
IndexService mockIndexService = mock(IndexService.class);
when(mockIndicesService.indexService(testShardId.getIndex())).thenReturn(mockIndexService);
when(mockIndicesService.indexServiceSafe(testShardId.getIndex())).thenReturn(mockIndexService);
when(mockIndexService.getShard(testShardId.id())).thenReturn(mockIndexShard);

// This mirrors the creation of the ReplicationCheckpoint inside CopyState
Expand Down

0 comments on commit e58a33d

Please sign in to comment.