Skip to content

Commit

Permalink
Separate acquiring safe commit and last commit (#28271)
Browse files Browse the repository at this point in the history
Previously we introduced a new parameter to `acquireIndexCommit` to
allow acquire either a safe commit or a last commit. However with the
new parameters, callers can provide a nonsense combination - flush first
but acquire the safe commit. This commit separates acquireIndexCommit
method into two different methods to avoid that problem. Moreover, this
change should also improve the readability.

Relates #28038
  • Loading branch information
dnhatn authored Feb 17, 2018
1 parent 3059862 commit 84fd39f
Show file tree
Hide file tree
Showing 11 changed files with 51 additions and 24 deletions.
10 changes: 7 additions & 3 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -868,13 +868,17 @@ public void forceMerge(boolean flush) throws IOException {
public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException, IOException;

/**
* Snapshots the index and returns a handle to it. If needed will try and "commit" the
* Snapshots the most recent index and returns a handle to it. If needed will try and "commit" the
* lucene index to make sure we have a "fresh" copy of the files to snapshot.
*
* @param safeCommit indicates whether the engine should acquire the most recent safe commit, or the most recent commit.
* @param flushFirst indicates whether the engine should flush before returning the snapshot
*/
public abstract IndexCommitRef acquireIndexCommit(boolean safeCommit, boolean flushFirst) throws EngineException;
public abstract IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException;

/**
* Snapshots the most recent safe index commit from the engine.
*/
public abstract IndexCommitRef acquireSafeIndexCommit() throws EngineException;

/**
* fail engine due to some error. the engine will also be closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1719,16 +1719,22 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu
}

@Override
public IndexCommitRef acquireIndexCommit(final boolean safeCommit, final boolean flushFirst) throws EngineException {
public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws EngineException {
// we have to flush outside of the readlock otherwise we might have a problem upgrading
// the to a write lock when we fail the engine in this operation
if (flushFirst) {
logger.trace("start flush for snapshot");
flush(false, true);
logger.trace("finish flush for snapshot");
}
final IndexCommit snapshotCommit = combinedDeletionPolicy.acquireIndexCommit(safeCommit);
return new Engine.IndexCommitRef(snapshotCommit, () -> combinedDeletionPolicy.releaseCommit(snapshotCommit));
final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false);
return new Engine.IndexCommitRef(lastCommit, () -> combinedDeletionPolicy.releaseCommit(lastCommit));
}

@Override
public IndexCommitRef acquireSafeIndexCommit() throws EngineException {
final IndexCommit safeCommit = combinedDeletionPolicy.acquireIndexCommit(true);
return new Engine.IndexCommitRef(safeCommit, () -> combinedDeletionPolicy.releaseCommit(safeCommit));
}

private boolean failOnTragicEvent(AlreadyClosedException ex) {
Expand Down
24 changes: 18 additions & 6 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1077,22 +1077,34 @@ public org.apache.lucene.util.Version minimumCompatibleVersion() {
}

/**
* Creates a new {@link IndexCommit} snapshot form the currently running engine. All resources referenced by this
* Creates a new {@link IndexCommit} snapshot from the currently running engine. All resources referenced by this
* commit won't be freed until the commit / snapshot is closed.
*
* @param safeCommit <code>true</code> capture the most recent safe commit point; otherwise the most recent commit point.
* @param flushFirst <code>true</code> if the index should first be flushed to disk / a low level lucene commit should be executed
*/
public Engine.IndexCommitRef acquireIndexCommit(boolean safeCommit, boolean flushFirst) throws EngineException {
IndexShardState state = this.state; // one time volatile read
public Engine.IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException {
final IndexShardState state = this.state; // one time volatile read
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
return getEngine().acquireIndexCommit(safeCommit, flushFirst);
return getEngine().acquireLastIndexCommit(flushFirst);
} else {
throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
}
}

/**
* Snapshots the most recent safe index commit from the currently running engine.
* All index files referenced by this index commit won't be freed until the commit/snapshot is closed.
*/
public Engine.IndexCommitRef acquireSafeIndexCommit() throws EngineException {
final IndexShardState state = this.state; // one time volatile read
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
return getEngine().acquireSafeIndexCommit();
} else {
throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
}
}

/**
* gets a {@link Store.MetadataSnapshot} for the current directory. This method is safe to call in all lifecycle of the index shard,
Expand Down Expand Up @@ -1121,7 +1133,7 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
return store.getMetadata(null, true);
}
}
indexCommit = engine.acquireIndexCommit(false, false);
indexCommit = engine.acquireLastIndexCommit(false);
return store.getMetadata(indexCommit.getIndexCommit());
} finally {
store.decRef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ final class LocalShardSnapshot implements Closeable {
store.incRef();
boolean success = false;
try {
indexCommit = shard.acquireIndexCommit(false, true);
indexCommit = shard.acquireLastIndexCommit(true);
success = true;
} finally {
if (success == false) {
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ final void ensureOpen() {
*
* {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking
* {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard
* {@link IndexShard#acquireIndexCommit(boolean, boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
* {@link IndexShard#acquireLastIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
* @param commit the index commit to read the snapshot from or <code>null</code> if the latest snapshot should be read from the
* directory
* @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
Expand All @@ -262,7 +262,7 @@ public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException {
*
* {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking
* {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard
* {@link IndexShard#acquireIndexCommit(boolean, boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
* {@link IndexShard#acquireLastIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
*
* @param commit the index commit to read the snapshot from or <code>null</code> if the latest snapshot should be read from the
* directory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
} else {
final Engine.IndexCommitRef phase1Snapshot;
try {
phase1Snapshot = shard.acquireIndexCommit(true, false);
phase1Snapshot = shard.acquireSafeIndexCommit();
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long
/**
* Creates a snapshot of the shard based on the index commit point.
* <p>
* The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireIndexCommit} method.
* The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireLastIndexCommit} method.
* Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
* <p>
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, fina
final Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository());
try {
// we flush first to make sure we get the latest writes snapshotted
try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(false, true)) {
try (Engine.IndexCommitRef snapshotRef = indexShard.acquireLastIndexCommit(true)) {
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus);
if (logger.isDebugEnabled()) {
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2121,7 +2121,7 @@ public void testConcurrentWritesAndCommits() throws Exception {
boolean doneIndexing;
do {
doneIndexing = doneLatch.await(sleepTime, TimeUnit.MILLISECONDS);
commits.add(engine.acquireIndexCommit(false, true));
commits.add(engine.acquireLastIndexCommit(true));
if (commits.size() > commitLimit) { // don't keep on piling up too many commits
IOUtils.close(commits.remove(randomIntBetween(0, commits.size()-1)));
// we increase the wait time to make sure we eventually if things are slow wait for threads to finish.
Expand Down Expand Up @@ -4355,19 +4355,24 @@ public void testAcquireIndexCommit() throws Exception {
}
final boolean flushFirst = randomBoolean();
final boolean safeCommit = randomBoolean();
Engine.IndexCommitRef commit = engine.acquireIndexCommit(safeCommit, flushFirst);
final Engine.IndexCommitRef snapshot;
if (safeCommit) {
snapshot = engine.acquireSafeIndexCommit();
} else {
snapshot = engine.acquireLastIndexCommit(flushFirst);
}
int moreDocs = between(1, 20);
for (int i = 0; i < moreDocs; i++) {
index(engine, numDocs + i);
}
globalCheckpoint.set(numDocs + moreDocs - 1);
engine.flush();
// check that we can still read the commit that we captured
try (IndexReader reader = DirectoryReader.open(commit.getIndexCommit())) {
try (IndexReader reader = DirectoryReader.open(snapshot.getIndexCommit())) {
assertThat(reader.numDocs(), equalTo(flushFirst && (safeCommit == false || inSync) ? numDocs : 0));
}
assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(2));
commit.close();
snapshot.close();
// check it's clean up
engine.flush(true, true);
assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE
when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class));
when(shard.segmentStats(anyBoolean())).thenReturn(mock(SegmentsStats.class));
when(shard.state()).thenReturn(IndexShardState.RELOCATED);
when(shard.acquireIndexCommit(anyBoolean(), anyBoolean())).thenReturn(mock(Engine.IndexCommitRef.class));
when(shard.acquireSafeIndexCommit()).thenReturn(mock(Engine.IndexCommitRef.class));
doAnswer(invocation -> {
((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(() -> {});
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ protected void snapshotShard(final IndexShard shard,
final Snapshot snapshot,
final Repository repository) throws IOException {
final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing();
try (Engine.IndexCommitRef indexCommitRef = shard.acquireIndexCommit(false, true)) {
try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) {
Index index = shard.shardId().getIndex();
IndexId indexId = new IndexId(index.getName(), index.getUUID());

Expand Down

0 comments on commit 84fd39f

Please sign in to comment.