From e2b931e80b8b4134f94e6eef8bfb9cfe31a571a5 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 28 Aug 2018 10:44:15 -0400 Subject: [PATCH] Use Lucene history in primary-replica resync (#33178) This commit makes primary-replica resyncer use Lucene as the source of history operation instead of translog if soft-deletes is enabled. With this change, we no longer expose translog snapshot directly in IndexShard. Relates #29530 --- .../elasticsearch/index/engine/Engine.java | 6 ------ .../index/engine/InternalEngine.java | 5 ----- .../elasticsearch/index/shard/IndexShard.java | 9 -------- .../index/shard/PrimaryReplicaSyncer.java | 3 +-- .../shard/PrimaryReplicaSyncerTests.java | 21 ++++++++++++------- .../action/bulk/BulkShardOperationsTests.java | 2 +- 6 files changed, 15 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 020dac78d49e2..65d289bedd4f6 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -592,12 +592,6 @@ public enum SearcherScope { */ public abstract Closeable acquireRetentionLockForPeerRecovery(); - /** - * Creates a new translog snapshot from this engine for reading translog operations whose seq# in the provided range. - * The caller has to close the returned snapshot after finishing the reading. - */ - public abstract Translog.Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException; - public abstract TranslogStats getTranslogStats(); /** diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1baaaf2e1b14d..11acf22d338d1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -480,11 +480,6 @@ public void syncTranslog() throws IOException { revisitIndexDeletionPolicyOnTranslogSynced(); } - @Override - public Translog.Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException { - return getTranslog().newSnapshotFromMinSeqNo(minSeqNo); - } - /** * Creates a new history snapshot for reading operations since the provided seqno. * The returned snapshot can be retrieved from either Lucene index or translog files. diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index aabdd742303e7..62518dfc9b828 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1635,15 +1635,6 @@ public Closeable acquireRetentionLockForPeerRecovery() { return getEngine().acquireRetentionLockForPeerRecovery(); } - /** - * Creates a new translog snapshot for reading translog operations whose seq# at least the provided seq#. - * The caller has to close the returned snapshot after finishing the reading. - */ - public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException { - // TODO: Remove this method after primary-replica resync use soft-deletes - return getEngine().newSnapshotFromMinSeqNo(minSeqNo); - } - /** * Returns the estimated number of history operations whose seq# at least the provided seq# in this shard. */ diff --git a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index 909039cea4b76..016a8afff6964 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -89,8 +89,7 @@ public void resync(final IndexShard indexShard, final ActionListener // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender. // Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible // Also fail the resync early if the shard is shutting down - // TODO: A follow-up to make resync using soft-deletes - snapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo); + snapshot = indexShard.getHistoryOperations("resync", startingSeqNo); final Translog.Snapshot originalSnapshot = snapshot; final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() { @Override diff --git a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index ae2cc84e4870c..29b16ca28f4da 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -106,17 +106,22 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { .isPresent(), is(false)); } - - assertEquals(globalCheckPoint == numDocs - 1 ? 0 : numDocs, resyncTask.getTotalOperations()); if (syncNeeded && globalCheckPoint < numDocs - 1) { - long skippedOps = globalCheckPoint + 1; // everything up to global checkpoint included - assertEquals(skippedOps, resyncTask.getSkippedOperations()); - assertEquals(numDocs - skippedOps, resyncTask.getResyncedOperations()); + if (shard.indexSettings.isSoftDeleteEnabled()) { + assertThat(resyncTask.getSkippedOperations(), equalTo(0)); + assertThat(resyncTask.getResyncedOperations(), equalTo(resyncTask.getTotalOperations())); + assertThat(resyncTask.getTotalOperations(), equalTo(Math.toIntExact(numDocs - 1 - globalCheckPoint))); + } else { + int skippedOps = Math.toIntExact(globalCheckPoint + 1); // everything up to global checkpoint included + assertThat(resyncTask.getSkippedOperations(), equalTo(skippedOps)); + assertThat(resyncTask.getResyncedOperations(), equalTo(numDocs - skippedOps)); + assertThat(resyncTask.getTotalOperations(), equalTo(globalCheckPoint == numDocs - 1 ? 0 : numDocs)); + } } else { - assertEquals(0, resyncTask.getSkippedOperations()); - assertEquals(0, resyncTask.getResyncedOperations()); + assertThat(resyncTask.getSkippedOperations(), equalTo(0)); + assertThat(resyncTask.getResyncedOperations(), equalTo(0)); + assertThat(resyncTask.getTotalOperations(), equalTo(0)); } - closeShards(shard); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java index fa06abe65ba6a..4c6c0c060e45a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java @@ -61,7 +61,7 @@ public void testPrimaryTermFromFollower() throws IOException { final TransportWriteAction.WritePrimaryResult result = TransportBulkShardOperationsAction.shardOperationOnPrimary(followerPrimary.shardId(), operations, followerPrimary, logger); - try (Translog.Snapshot snapshot = followerPrimary.newTranslogSnapshotFromMinSeqNo(0)) { + try (Translog.Snapshot snapshot = followerPrimary.getHistoryOperations("test", 0)) { assertThat(snapshot.totalOperations(), equalTo(operations.size())); Translog.Operation operation; while ((operation = snapshot.next()) != null) {