From b6bbfe31fa86c80cc1a622d57e7c552af4a59b2b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 18 Mar 2022 12:08:08 -0400 Subject: [PATCH] Increase store ref before snapshotting index commit (#84776) (#85112) Snapshotted commits should also hold a reference to the store, so they are always usable; otherwise, callers need to manage the store's references manually. This change applies only to InternalEngine as we already do this in ReadOnlyEngine. --- docs/changelog/84776.yaml | 5 +++ .../index/engine/InternalEngine.java | 34 ++++++++++++++++--- .../index/shard/LocalShardSnapshot.java | 23 ++----------- .../index/engine/InternalEngineTests.java | 15 ++++++-- .../CcrRestoreSourceServiceTests.java | 6 ++-- 5 files changed, 55 insertions(+), 28 deletions(-) create mode 100644 docs/changelog/84776.yaml diff --git a/docs/changelog/84776.yaml b/docs/changelog/84776.yaml new file mode 100644 index 0000000000000..75e5d91e8eb37 --- /dev/null +++ b/docs/changelog/84776.yaml @@ -0,0 +1,5 @@ +pr: 84776 +summary: Increase store ref before snapshotting index commit +area: Engine +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1389b5bf0c6e7..e8c75b820d737 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2328,14 +2328,40 @@ public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws En flush(false, true); logger.trace("finish flush for snapshot"); } - final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false); - return new Engine.IndexCommitRef(lastCommit, () -> releaseIndexCommit(lastCommit)); + store.incRef(); + boolean success = false; + try { + final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false); + final IndexCommitRef commitRef = new IndexCommitRef( + lastCommit, + () -> IOUtils.close(() -> releaseIndexCommit(lastCommit), store::decRef) + ); + success = true; + return commitRef; + } finally { + if (success == false) { + store.decRef(); + } + } } @Override public IndexCommitRef acquireSafeIndexCommit() throws EngineException { - final IndexCommit safeCommit = combinedDeletionPolicy.acquireIndexCommit(true); - return new Engine.IndexCommitRef(safeCommit, () -> releaseIndexCommit(safeCommit)); + store.incRef(); + boolean success = false; + try { + final IndexCommit safeCommit = combinedDeletionPolicy.acquireIndexCommit(true); + final IndexCommitRef commitRef = new IndexCommitRef( + safeCommit, + () -> IOUtils.close(() -> releaseIndexCommit(safeCommit), store::decRef) + ); + success = true; + return commitRef; + } finally { + if (success == false) { + store.decRef(); + } + } } private void releaseIndexCommit(IndexCommit snapshot) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java b/server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java index 0489cb29bc202..8056791198100 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java @@ -23,27 +23,16 @@ import java.io.IOException; import java.util.Collection; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; final class LocalShardSnapshot implements Closeable { private final IndexShard shard; private final Store store; private final Engine.IndexCommitRef indexCommit; - private final AtomicBoolean closed = new AtomicBoolean(false); LocalShardSnapshot(IndexShard shard) { this.shard = shard; - store = shard.store(); - store.incRef(); - boolean success = false; - try { - indexCommit = shard.acquireLastIndexCommit(true); - success = true; - } finally { - if (success == false) { - store.decRef(); - } - } + this.store = shard.store(); + this.indexCommit = shard.acquireLastIndexCommit(true); } Index getIndex() { @@ -117,13 +106,7 @@ public Set getPendingDeletions() throws IOException { @Override public void close() throws IOException { - if (closed.compareAndSet(false, true)) { - try { - indexCommit.close(); - } finally { - store.decRef(); - } - } + indexCommit.close(); } IndexMetadata getIndexMetadata() { diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 05150ea4fbf0a..a13b95b4db6d7 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -6105,6 +6105,7 @@ public void testAcquireIndexCommit() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final Engine.IndexCommitRef snapshot; final boolean closeSnapshotBeforeEngine = randomBoolean(); + final int expectedDocs; try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) { int numDocs = between(1, 20); for (int i = 0; i < numDocs; i++) { @@ -6120,6 +6121,7 @@ public void testAcquireIndexCommit() throws Exception { } else { snapshot = engine.acquireLastIndexCommit(flushFirst); } + expectedDocs = flushFirst && safeCommit == false ? numDocs : 0; int moreDocs = between(1, 20); for (int i = 0; i < moreDocs; i++) { index(engine, numDocs + i); @@ -6128,7 +6130,7 @@ public void testAcquireIndexCommit() throws Exception { engine.flush(); // check that we can still read the commit that we captured try (IndexReader reader = DirectoryReader.open(snapshot.getIndexCommit())) { - assertThat(reader.numDocs(), equalTo(flushFirst && safeCommit == false ? numDocs : 0)); + assertThat(reader.numDocs(), equalTo(expectedDocs)); } assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(2)); @@ -6140,8 +6142,17 @@ public void testAcquireIndexCommit() throws Exception { } } + if (randomBoolean()) { + IOUtils.close(store); + } + if (closeSnapshotBeforeEngine == false) { - snapshot.close(); // shouldn't throw AlreadyClosedException + // check that we can still read the commit that we captured + try (DirectoryReader reader = DirectoryReader.open(snapshot.getIndexCommit())) { + assertThat(reader.numDocs(), equalTo(expectedDocs)); + } finally { + snapshot.close(); + } } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java index bf71e8a2bf59f..3612285334ae5 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java @@ -167,8 +167,10 @@ public void testGetSessionReader() throws IOException { byte[] expectedBytes = new byte[(int) fileMetadata.length()]; byte[] actualBytes = new byte[(int) fileMetadata.length()]; - Engine.IndexCommitRef indexCommitRef = indexShard1.acquireSafeIndexCommit(); - try (IndexInput indexInput = indexCommitRef.getIndexCommit().getDirectory().openInput(fileName, IOContext.READONCE)) { + try ( + Engine.IndexCommitRef indexCommitRef = indexShard1.acquireSafeIndexCommit(); + IndexInput indexInput = indexCommitRef.getIndexCommit().getDirectory().openInput(fileName, IOContext.READONCE) + ) { indexInput.seek(0); indexInput.readBytes(expectedBytes, 0, (int) fileMetadata.length()); }