From 3738de199780afea7efde48167bb75e0260d4a9c Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 8 Mar 2022 15:06:30 -0500 Subject: [PATCH 1/5] Increase store ref before snapshotting index commit --- .../index/engine/InternalEngine.java | 34 ++++++++++++++++--- .../index/engine/InternalEngineTests.java | 15 ++++++-- 2 files changed, 43 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 4319529e92622..d68bca56408e0 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2159,14 +2159,40 @@ public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws En flush(false, true); logger.trace("finish flush for snapshot"); } - final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false); - return new Engine.IndexCommitRef(lastCommit, () -> releaseIndexCommit(lastCommit)); + store.incRef(); + boolean success = false; + try { + final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false); + final IndexCommitRef commitRef = new IndexCommitRef( + lastCommit, + () -> IOUtils.close(() -> releaseIndexCommit(lastCommit), store::decRef) + ); + success = true; + return commitRef; + } finally { + if (success == false) { + store.decRef(); + } + } } @Override public IndexCommitRef acquireSafeIndexCommit() throws EngineException { - final IndexCommit safeCommit = combinedDeletionPolicy.acquireIndexCommit(true); - return new Engine.IndexCommitRef(safeCommit, () -> releaseIndexCommit(safeCommit)); + store.incRef(); + boolean success = false; + try { + final IndexCommit safeCommit = combinedDeletionPolicy.acquireIndexCommit(true); + final IndexCommitRef commitRef = new IndexCommitRef( + safeCommit, + () -> IOUtils.close(() -> releaseIndexCommit(safeCommit), store::decRef) + ); + success = true; + return commitRef; + } finally { + if (success == false) { + store.decRef(); + } + } } private void releaseIndexCommit(IndexCommit snapshot) throws IOException { diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index b2ba370c8063f..01ae81164c5a8 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -5666,6 +5666,7 @@ public void testAcquireIndexCommit() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final Engine.IndexCommitRef snapshot; final boolean closeSnapshotBeforeEngine = randomBoolean(); + final int expectedDocs; try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) { int numDocs = between(1, 20); for (int i = 0; i < numDocs; i++) { @@ -5681,6 +5682,7 @@ public void testAcquireIndexCommit() throws Exception { } else { snapshot = engine.acquireLastIndexCommit(flushFirst); } + expectedDocs = flushFirst && safeCommit == false ? numDocs : 0; int moreDocs = between(1, 20); for (int i = 0; i < moreDocs; i++) { index(engine, numDocs + i); @@ -5689,7 +5691,7 @@ public void testAcquireIndexCommit() throws Exception { engine.flush(); // check that we can still read the commit that we captured try (IndexReader reader = DirectoryReader.open(snapshot.getIndexCommit())) { - assertThat(reader.numDocs(), equalTo(flushFirst && safeCommit == false ? numDocs : 0)); + assertThat(reader.numDocs(), equalTo(expectedDocs)); } assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(2)); @@ -5701,8 +5703,17 @@ public void testAcquireIndexCommit() throws Exception { } } + if (randomBoolean()) { + IOUtils.close(store); + } + if (closeSnapshotBeforeEngine == false) { - snapshot.close(); // shouldn't throw AlreadyClosedException + // check that we can still read the commit that we captured + try (DirectoryReader reader = DirectoryReader.open(snapshot.getIndexCommit())) { + assertThat(reader.numDocs(), equalTo(expectedDocs)); + } finally { + snapshot.close(); + } } } From d3710069c5ae5ce4fcb32f815165798f92868acd Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 8 Mar 2022 20:31:46 -0500 Subject: [PATCH 2/5] Fix tests --- .../xpack/ccr/repository/CcrRestoreSourceServiceTests.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java index bf71e8a2bf59f..3612285334ae5 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java @@ -167,8 +167,10 @@ public void testGetSessionReader() throws IOException { byte[] expectedBytes = new byte[(int) fileMetadata.length()]; byte[] actualBytes = new byte[(int) fileMetadata.length()]; - Engine.IndexCommitRef indexCommitRef = indexShard1.acquireSafeIndexCommit(); - try (IndexInput indexInput = indexCommitRef.getIndexCommit().getDirectory().openInput(fileName, IOContext.READONCE)) { + try ( + Engine.IndexCommitRef indexCommitRef = indexShard1.acquireSafeIndexCommit(); + IndexInput indexInput = indexCommitRef.getIndexCommit().getDirectory().openInput(fileName, IOContext.READONCE) + ) { indexInput.seek(0); indexInput.readBytes(expectedBytes, 0, (int) fileMetadata.length()); } From 6d5cbd17bc761f3b8095a1d6f7e6ffe0fa9305a4 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 8 Mar 2022 21:34:09 -0500 Subject: [PATCH 3/5] Simplify usage --- .../index/shard/LocalShardSnapshot.java | 23 +++---------------- 1 file changed, 3 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java b/server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java index d3c5a814eae24..d475309065259 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java @@ -22,27 +22,16 @@ import java.io.Closeable; import java.io.IOException; import java.util.Collection; -import java.util.concurrent.atomic.AtomicBoolean; final class LocalShardSnapshot implements Closeable { private final IndexShard shard; private final Store store; private final Engine.IndexCommitRef indexCommit; - private final AtomicBoolean closed = new AtomicBoolean(false); LocalShardSnapshot(IndexShard shard) { this.shard = shard; - store = shard.store(); - store.incRef(); - boolean success = false; - try { - indexCommit = shard.acquireLastIndexCommit(true); - success = true; - } finally { - if (success == false) { - store.decRef(); - } - } + this.store = shard.store(); + this.indexCommit = shard.acquireLastIndexCommit(true); } Index getIndex() { @@ -110,13 +99,7 @@ public void close() throws IOException { @Override public void close() throws IOException { - if (closed.compareAndSet(false, true)) { - try { - indexCommit.close(); - } finally { - store.decRef(); - } - } + indexCommit.close(); } IndexMetadata getIndexMetadata() { From b49c6fde5aad6836967dd6e6023aaa42abcc9043 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 8 Mar 2022 22:32:30 -0500 Subject: [PATCH 4/5] Update docs/changelog/84776.yaml --- docs/changelog/84776.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/84776.yaml diff --git a/docs/changelog/84776.yaml b/docs/changelog/84776.yaml new file mode 100644 index 0000000000000..1b49ca90ff555 --- /dev/null +++ b/docs/changelog/84776.yaml @@ -0,0 +1,5 @@ +pr: 84776 +summary: Increase store ref before snapshotting index commit +area: Engine +type: enhancement +issues: [] From 87af64d13054301c6cc836332fd32883acd08c4a Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 18 Mar 2022 09:39:52 -0400 Subject: [PATCH 5/5] Update docs/changelog/84776.yaml --- docs/changelog/84776.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/changelog/84776.yaml b/docs/changelog/84776.yaml index 1b49ca90ff555..75e5d91e8eb37 100644 --- a/docs/changelog/84776.yaml +++ b/docs/changelog/84776.yaml @@ -1,5 +1,5 @@ pr: 84776 summary: Increase store ref before snapshotting index commit area: Engine -type: enhancement +type: bug issues: []