From 2b80149af12743aa74d993f5d5849c44cffc0325 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Thu, 5 Oct 2023 11:54:58 -0700 Subject: [PATCH] Fix edge case where flush failures would not get reported as corruption. Signed-off-by: Marc Handalian --- .../replication/SegmentReplicationIT.java | 28 +++++++++---------- .../index/engine/NRTReplicationEngine.java | 1 + .../replication/SegmentReplicationTarget.java | 11 +++----- .../engine/NRTReplicationEngineTests.java | 21 ++++++++++++++ .../SegmentReplicationTargetServiceTests.java | 3 +- .../SegmentReplicationTargetTests.java | 3 +- 6 files changed, 44 insertions(+), 23 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 0bb22ea99282e..81556cc270151 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -49,7 +49,6 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.Preference; -import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; @@ -1839,15 +1838,15 @@ public void testSendCorruptBytesToReplica() throws Exception { .setSource(jsonBuilder().startObject().field("field", i).endObject()) .get(); } - final long originalRecoveryTime = getRecoveryStopTime(); + final long originalRecoveryTime = getRecoveryStopTime(replicaNode); assertNotEquals(originalRecoveryTime, 0); refresh(INDEX_NAME); latch.await(); assertTrue(failed.get()); - waitForNewPeerRecovery(originalRecoveryTime); + waitForNewPeerRecovery(replicaNode, originalRecoveryTime); // reset checkIndex to ensure our original shard doesn't throw resetCheckIndexStatus(); - assertDocCounts(100, primaryNode, replicaNode); + waitForSearchableDocs(100, primaryNode, replicaNode); } public void testWipeSegmentBetweenSyncs() throws Exception { @@ -1866,7 +1865,7 @@ public void testWipeSegmentBetweenSyncs() throws Exception { final String replicaNode = internalCluster().startDataOnlyNode(); ensureGreen(INDEX_NAME); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 10; i++) { client().prepareIndex(INDEX_NAME) .setId(String.valueOf(i)) .setSource(jsonBuilder().startObject().field("field", i).endObject()) @@ -1874,39 +1873,40 @@ public void testWipeSegmentBetweenSyncs() throws Exception { } refresh(INDEX_NAME); ensureGreen(INDEX_NAME); - final long originalRecoveryTime = getRecoveryStopTime(); + final long originalRecoveryTime = getRecoveryStopTime(replicaNode); final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME); - waitForSearchableDocs(INDEX_NAME, 100, List.of(replicaNode)); + waitForSearchableDocs(INDEX_NAME, 10, List.of(replicaNode)); indexShard.store().directory().deleteFile("_0.si"); - for (int i = 101; i < 201; i++) { + for (int i = 11; i < 21; i++) { client().prepareIndex(INDEX_NAME) .setId(String.valueOf(i)) .setSource(jsonBuilder().startObject().field("field", i).endObject()) .get(); } refresh(INDEX_NAME); - waitForNewPeerRecovery(originalRecoveryTime); + waitForNewPeerRecovery(replicaNode, originalRecoveryTime); resetCheckIndexStatus(); - assertDocCounts(200, primaryNode, replicaNode); + waitForSearchableDocs(20, primaryNode, replicaNode); } - private static void waitForNewPeerRecovery(long originalRecoveryTime) throws Exception { + private void waitForNewPeerRecovery(String replicaNode, long originalRecoveryTime) throws Exception { assertBusy(() -> { // assert we have a peer recovery after the original - final long time = getRecoveryStopTime(); + final long time = getRecoveryStopTime(replicaNode); assertNotEquals(time, 0); assertNotEquals(originalRecoveryTime, time); }, 1, TimeUnit.MINUTES); } - private static long getRecoveryStopTime() { + private long getRecoveryStopTime(String nodeName) { final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(INDEX_NAME).get(); final List recoveryStates = recoveryResponse.shardRecoveryStates().get(INDEX_NAME); + logger.info("Recovery states {}", recoveryResponse); for (RecoveryState recoveryState : recoveryStates) { - if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.PEER) { + if (recoveryState.getTargetNode().getName().equals(nodeName)) { return recoveryState.getTimer().stopTime(); } } diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 8f76aa490a4b1..020e92aba4ce5 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -379,6 +379,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { try { commitSegmentInfos(); } catch (IOException e) { + maybeFailEngine("flush", e); throw new FlushFailedEngineException(shardId, e); } finally { flushLock.unlock(); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 5ae480b7d63a4..0eb6ce36fa63d 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -18,7 +18,6 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.opensearch.OpenSearchCorruptionException; -import org.opensearch.OpenSearchException; import org.opensearch.action.StepListener; import org.opensearch.common.UUIDs; import org.opensearch.common.lucene.Lucene; @@ -261,9 +260,7 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse) } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are - // broken. We have to clean up this shard entirely, remove all files and bubble it up to the - // source shard since this index might be broken there as well? The Source can handle this and checks - // its content on disk if possible. + // broken. We have to clean up this shard entirely, remove all files and bubble it up. try { try { store.removeCorruptionMarker(); @@ -279,14 +276,14 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse) // In this case the shard is closed at some point while updating the reader. // This can happen when the engine is closed in a separate thread. logger.warn("Shard is already closed, closing replication"); - } catch (OpenSearchException ex) { + } catch (CancellableThreads.ExecutionCancelledException ex) { /* Ignore closed replication target as it can happen due to index shard closed event in a separate thread. In such scenario, ignore the exception */ - assert cancellableThreads.isCancelled() : "Replication target closed but segment replication not cancelled"; + assert cancellableThreads.isCancelled() : "Replication target cancelled but cancellable threads not cancelled"; } catch (Exception ex) { - throw new OpenSearchCorruptionException(ex); + throw new ReplicationFailedException(ex); } finally { if (store != null) { store.decRef(); diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index a73d440b8494f..ee25d3789fb13 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -600,6 +600,27 @@ public void testCommitOnCloseThrowsException_decRefStore() throws Exception { assertThrows(RuntimeException.class, nrtEngineStore::close); } + public void testFlushThrowsFlushFailedExceptionOnCorruption() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, INDEX_SETTINGS); + List operations = generateHistoryOnReplica( + randomIntBetween(1, 10), + randomBoolean(), + randomBoolean(), + randomBoolean() + ); + indexOperations(nrtEngine, operations); + // wipe the nrt directory initially so we can sync with primary. + cleanAndCopySegmentsFromPrimary(nrtEngine); + nrtEngineStore.directory().deleteFile("_0.si"); + assertThrows(FlushFailedEngineException.class, nrtEngine::flush); + assertTrue(nrtEngineStore.isMarkedCorrupted()); + // store will throw when eventually closed, not handled here. + assertThrows(RuntimeException.class, nrtEngineStore::close); + } + private void copySegments(Collection latestPrimaryFiles, Engine nrtEngine) throws IOException { final Store store = nrtEngine.store; final List replicaFiles = List.of(store.directory().listAll()); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index c1f88a6938d33..c108de5ee5ea6 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -9,6 +9,7 @@ package org.opensearch.indices.replication; import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.cluster.ClusterState; @@ -553,7 +554,7 @@ public void testForceSegmentSyncHandlerWithFailure() throws Exception { ).txGet(); }); Throwable nestedException = finalizeException.getCause().getCause(); - assertTrue(nestedException instanceof IOException); + assertNotNull(ExceptionsHelper.unwrap(finalizeException, IOException.class)); assertTrue(nestedException.getMessage().contains("dummy failure")); } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 2596dd6e62026..a9d7d3cdd32fc 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -249,7 +249,7 @@ public void onFailure(Exception e) { }); } - public void testFailure_finalizeReplication_IOException() throws IOException { + public void testFailure_finalizeReplication_NonCorruptionException() throws IOException { IOException exception = new IOException("dummy failure"); SegmentReplicationSource segrepSource = new TestReplicationSource() { @@ -288,6 +288,7 @@ public void onResponse(Void replicationResponse) { @Override public void onFailure(Exception e) { + assertEquals(ReplicationFailedException.class, e.getClass()); assertEquals(exception, e.getCause()); segrepTarget.fail(new ReplicationFailedException(e), false); }