Skip to content

Commit

Permalink
Increase store ref before snapshotting index commit (#84776)
Browse files Browse the repository at this point in the history
Snapshotted commits should also hold a reference to the store, so they 
are always usable; otherwise, callers need to manage the store's 
references manually. This change applies only to InternalEngine as we
already do this in ReadOnlyEngine.
  • Loading branch information
dnhatn authored Mar 18, 2022
1 parent 5272689 commit b826516
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 28 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/84776.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 84776
summary: Increase store ref before snapshotting index commit
area: Engine
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -2169,14 +2169,40 @@ public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws En
flush(false, true);
logger.trace("finish flush for snapshot");
}
final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false);
return new Engine.IndexCommitRef(lastCommit, () -> releaseIndexCommit(lastCommit));
store.incRef();
boolean success = false;
try {
final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false);
final IndexCommitRef commitRef = new IndexCommitRef(
lastCommit,
() -> IOUtils.close(() -> releaseIndexCommit(lastCommit), store::decRef)
);
success = true;
return commitRef;
} finally {
if (success == false) {
store.decRef();
}
}
}

@Override
public IndexCommitRef acquireSafeIndexCommit() throws EngineException {
final IndexCommit safeCommit = combinedDeletionPolicy.acquireIndexCommit(true);
return new Engine.IndexCommitRef(safeCommit, () -> releaseIndexCommit(safeCommit));
store.incRef();
boolean success = false;
try {
final IndexCommit safeCommit = combinedDeletionPolicy.acquireIndexCommit(true);
final IndexCommitRef commitRef = new IndexCommitRef(
safeCommit,
() -> IOUtils.close(() -> releaseIndexCommit(safeCommit), store::decRef)
);
success = true;
return commitRef;
} finally {
if (success == false) {
store.decRef();
}
}
}

private void releaseIndexCommit(IndexCommit snapshot) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,16 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;

final class LocalShardSnapshot implements Closeable {
private final IndexShard shard;
private final Store store;
private final Engine.IndexCommitRef indexCommit;
private final AtomicBoolean closed = new AtomicBoolean(false);

LocalShardSnapshot(IndexShard shard) {
this.shard = shard;
store = shard.store();
store.incRef();
boolean success = false;
try {
indexCommit = shard.acquireLastIndexCommit(true);
success = true;
} finally {
if (success == false) {
store.decRef();
}
}
this.store = shard.store();
this.indexCommit = shard.acquireLastIndexCommit(true);
}

Index getIndex() {
Expand Down Expand Up @@ -110,13 +99,7 @@ public void close() throws IOException {

@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
try {
indexCommit.close();
} finally {
store.decRef();
}
}
indexCommit.close();
}

IndexMetadata getIndexMetadata() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5664,6 +5664,7 @@ public void testAcquireIndexCommit() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final Engine.IndexCommitRef snapshot;
final boolean closeSnapshotBeforeEngine = randomBoolean();
final int expectedDocs;
try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
int numDocs = between(1, 20);
for (int i = 0; i < numDocs; i++) {
Expand All @@ -5679,6 +5680,7 @@ public void testAcquireIndexCommit() throws Exception {
} else {
snapshot = engine.acquireLastIndexCommit(flushFirst);
}
expectedDocs = flushFirst && safeCommit == false ? numDocs : 0;
int moreDocs = between(1, 20);
for (int i = 0; i < moreDocs; i++) {
index(engine, numDocs + i);
Expand All @@ -5687,7 +5689,7 @@ public void testAcquireIndexCommit() throws Exception {
engine.flush();
// check that we can still read the commit that we captured
try (IndexReader reader = DirectoryReader.open(snapshot.getIndexCommit())) {
assertThat(reader.numDocs(), equalTo(flushFirst && safeCommit == false ? numDocs : 0));
assertThat(reader.numDocs(), equalTo(expectedDocs));
}
assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(2));

Expand All @@ -5699,8 +5701,17 @@ public void testAcquireIndexCommit() throws Exception {
}
}

if (randomBoolean()) {
IOUtils.close(store);
}

if (closeSnapshotBeforeEngine == false) {
snapshot.close(); // shouldn't throw AlreadyClosedException
// check that we can still read the commit that we captured
try (DirectoryReader reader = DirectoryReader.open(snapshot.getIndexCommit())) {
assertThat(reader.numDocs(), equalTo(expectedDocs));
} finally {
snapshot.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,10 @@ public void testGetSessionReader() throws IOException {

byte[] expectedBytes = new byte[(int) fileMetadata.length()];
byte[] actualBytes = new byte[(int) fileMetadata.length()];
Engine.IndexCommitRef indexCommitRef = indexShard1.acquireSafeIndexCommit();
try (IndexInput indexInput = indexCommitRef.getIndexCommit().getDirectory().openInput(fileName, IOContext.READONCE)) {
try (
Engine.IndexCommitRef indexCommitRef = indexShard1.acquireSafeIndexCommit();
IndexInput indexInput = indexCommitRef.getIndexCommit().getDirectory().openInput(fileName, IOContext.READONCE)
) {
indexInput.seek(0);
indexInput.readBytes(expectedBytes, 0, (int) fileMetadata.length());
}
Expand Down

0 comments on commit b826516

Please sign in to comment.