Skip to content

Commit

Permalink
Move shard failure after send failure into SegmentFileTransferHandler.
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Jun 11, 2022
1 parent 2bfbf4a commit ec397f1
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ public RecoverySourceHandler(
logger,
threadPool,
cancellableThreads,
this::failEngine,
fileChunkSizeInBytes,
maxConcurrentFileChunks
);
Expand Down Expand Up @@ -1050,8 +1049,4 @@ private void cleanFiles(
}))
);
}

protected void failEngine(IOException cause) {
shard.failShard("recovery", cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.Comparator;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.function.Consumer;
import java.util.function.IntSupplier;

/**
Expand All @@ -57,7 +56,6 @@ public final class SegmentFileTransferHandler {
private final int maxConcurrentFileChunks;
private final DiscoveryNode targetNode;
private final CancellableThreads cancellableThreads;
private final Consumer<IOException> onCorruptException;

public SegmentFileTransferHandler(
IndexShard shard,
Expand All @@ -66,7 +64,6 @@ public SegmentFileTransferHandler(
Logger logger,
ThreadPool threadPool,
CancellableThreads cancellableThreads,
Consumer<IOException> onCorruptException,
int fileChunkSizeInBytes,
int maxConcurrentFileChunks
) {
Expand All @@ -76,7 +73,6 @@ public SegmentFileTransferHandler(
this.logger = logger;
this.threadPool = threadPool;
this.cancellableThreads = cancellableThreads;
this.onCorruptException = onCorruptException;
this.chunkSizeInBytes = fileChunkSizeInBytes;
// if the target is on an old version, it won't be able to handle out-of-order file chunks.
this.maxConcurrentFileChunks = maxConcurrentFileChunks;
Expand Down Expand Up @@ -184,7 +180,7 @@ public void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetadata[]
if (localException == null) {
localException = corruptIndexException;
}
onCorruptException.accept(corruptIndexException);
shard.failShard("error sending files", corruptIndexException);
}
}
if (localException != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.opensearch.transport.Transports;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -84,7 +83,6 @@ class SegmentReplicationSourceHandler {
logger,
threadPool,
cancellableThreads,
this::failEngine,
fileChunkSizeInBytes,
maxConcurrentFileChunks
);
Expand Down Expand Up @@ -156,10 +154,6 @@ public void cancel(String reason) {
cancellableThreads.cancel(reason);
}

private void failEngine(IOException e) {
shard.failShard("Failed Replication", e);
}

CopyState getCopyState() {
return copyState;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,11 @@ public void writeFileChunk(
};
IndexShard mockShard = mock(IndexShard.class);
when(mockShard.shardId()).thenReturn(new ShardId("testIndex", "testUUID", 0));
doAnswer(invocation -> {
assertFalse(failedEngine.get());
failedEngine.set(true);
return null;
}).when(mockShard).failShard(any(), any());
RecoverySourceHandler handler = new RecoverySourceHandler(
mockShard,
new AsyncRecoveryTarget(target, recoveryExecutor),
Expand All @@ -555,13 +560,7 @@ public void writeFileChunk(
Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
between(1, 8),
between(1, 8)
) {
@Override
protected void failEngine(IOException cause) {
assertFalse(failedEngine.get());
failedEngine.set(true);
}
};
);
SetOnce<Exception> sendFilesError = new SetOnce<>();
CountDownLatch latch = new CountDownLatch(1);
handler.sendFiles(
Expand Down Expand Up @@ -623,6 +622,11 @@ public void writeFileChunk(
};
IndexShard mockShard = mock(IndexShard.class);
when(mockShard.shardId()).thenReturn(new ShardId("testIndex", "testUUID", 0));
doAnswer(invocation -> {
assertFalse(failedEngine.get());
failedEngine.set(true);
return null;
}).when(mockShard).failShard(any(), any());
RecoverySourceHandler handler = new RecoverySourceHandler(
mockShard,
new AsyncRecoveryTarget(target, recoveryExecutor),
Expand All @@ -631,13 +635,7 @@ public void writeFileChunk(
Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
between(1, 10),
between(1, 4)
) {
@Override
protected void failEngine(IOException cause) {
assertFalse(failedEngine.get());
failedEngine.set(true);
}
};
);
PlainActionFuture<Void> sendFilesFuture = new PlainActionFuture<>();
handler.sendFiles(store, metas.toArray(new StoreFileMetadata[0]), () -> 0, sendFilesFuture);
Exception ex = expectThrows(Exception.class, sendFilesFuture::actionGet);
Expand Down

0 comments on commit ec397f1

Please sign in to comment.