Skip to content

Commit

Permalink
Primary send safe commit in file-based recovery (#28038)
Browse files Browse the repository at this point in the history
Today a primary shard transfers the most recent commit point to a 
replica shard in a file-based recovery. However, the most recent commit
may not be a "safe" commit; this causes a replica shard not having a
safe commit point until it can retain a safe commit by itself.

This commits collapses the snapshot deletion policy into the combined 
deletion policy and modifies the peer recovery source to send a safe
commit.

Relates #10708
  • Loading branch information
dnhatn authored Jan 11, 2018
1 parent 39ff7b5 commit 626c3d1
Show file tree
Hide file tree
Showing 14 changed files with 257 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@

package org.elasticsearch.index.engine;

import com.carrotsearch.hppc.ObjectIntHashMap;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.store.Directory;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.LongSupplier;
Expand All @@ -42,12 +45,16 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final TranslogDeletionPolicy translogDeletionPolicy;
private final EngineConfig.OpenMode openMode;
private final LongSupplier globalCheckpointSupplier;
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
private IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
private IndexCommit lastCommit; // the most recent commit point

CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier) {
this.openMode = openMode;
this.translogDeletionPolicy = translogDeletionPolicy;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.snapshottedCommits = new ObjectIntHashMap<>();
}

@Override
Expand All @@ -70,18 +77,22 @@ public void onInit(List<? extends IndexCommit> commits) throws IOException {
}

@Override
public void onCommit(List<? extends IndexCommit> commits) throws IOException {
public synchronized void onCommit(List<? extends IndexCommit> commits) throws IOException {
final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong());
lastCommit = commits.get(commits.size() - 1);
safeCommit = commits.get(keptPosition);
for (int i = 0; i < keptPosition; i++) {
commits.get(i).delete();
if (snapshottedCommits.containsKey(commits.get(i)) == false) {
commits.get(i).delete();
}
}
updateTranslogDeletionPolicy(commits.get(keptPosition), commits.get(commits.size() - 1));
updateTranslogDeletionPolicy();
}

private void updateTranslogDeletionPolicy(final IndexCommit minRequiredCommit, final IndexCommit lastCommit) throws IOException {
assert minRequiredCommit.isDeleted() == false : "The minimum required commit must not be deleted";
final long minRequiredGen = Long.parseLong(minRequiredCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));

private void updateTranslogDeletionPolicy() throws IOException {
assert Thread.holdsLock(this);
assert safeCommit.isDeleted() == false : "The safe commit must not be deleted";
final long minRequiredGen = Long.parseLong(safeCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
assert lastCommit.isDeleted() == false : "The last commit must not be deleted";
final long lastGen = Long.parseLong(lastCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));

Expand All @@ -90,6 +101,34 @@ private void updateTranslogDeletionPolicy(final IndexCommit minRequiredCommit, f
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen);
}

/**
* Captures the most recent commit point {@link #lastCommit} or the most recent safe commit point {@link #safeCommit}.
* Index files of the capturing commit point won't be released until the commit reference is closed.
*
* @param acquiringSafeCommit captures the most recent safe commit point if true; otherwise captures the most recent commit point.
*/
synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) {
assert safeCommit != null : "Safe commit is not initialized yet";
assert lastCommit != null : "Last commit is not initialized yet";
final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit;
snapshottedCommits.addTo(snapshotting, 1); // increase refCount
return new SnapshotIndexCommit(snapshotting);
}

/**
* Releases an index commit that acquired by {@link #acquireIndexCommit(boolean)}.
*/
synchronized void releaseCommit(final IndexCommit snapshotCommit) {
final IndexCommit releasingCommit = ((SnapshotIndexCommit) snapshotCommit).delegate;
assert snapshottedCommits.containsKey(releasingCommit) : "Release non-snapshotted commit;" +
"snapshotted commits [" + snapshottedCommits + "], releasing commit [" + releasingCommit + "]";
final int refCount = snapshottedCommits.addTo(releasingCommit, -1); // release refCount
assert refCount >= 0 : "Number of snapshots can not be negative [" + refCount + "]";
if (refCount == 0) {
snapshottedCommits.remove(releasingCommit);
}
}

/**
* Find a safe commit point from a list of existing commits based on the supplied global checkpoint.
* The max sequence number of a safe commit point should be at most the global checkpoint.
Expand Down Expand Up @@ -139,4 +178,60 @@ private static int indexOfKeptCommits(List<? extends IndexCommit> commits, long
*/
return 0;
}

/**
* A wrapper of an index commit that prevents it from being deleted.
*/
private static class SnapshotIndexCommit extends IndexCommit {
private final IndexCommit delegate;

SnapshotIndexCommit(IndexCommit delegate) {
this.delegate = delegate;
}

@Override
public String getSegmentsFileName() {
return delegate.getSegmentsFileName();
}

@Override
public Collection<String> getFileNames() throws IOException {
return delegate.getFileNames();
}

@Override
public Directory getDirectory() {
return delegate.getDirectory();
}

@Override
public void delete() {
throw new UnsupportedOperationException("A snapshot commit does not support deletion");
}

@Override
public boolean isDeleted() {
return delegate.isDeleted();
}

@Override
public int getSegmentCount() {
return delegate.getSegmentCount();
}

@Override
public long getGeneration() {
return delegate.getGeneration();
}

@Override
public Map<String, String> getUserData() throws IOException {
return delegate.getUserData();
}

@Override
public String toString() {
return "SnapshotIndexCommit{" + delegate + "}";
}
}
}
13 changes: 6 additions & 7 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ReferenceManager;
Expand Down Expand Up @@ -92,7 +91,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

public abstract class Engine implements Closeable {

Expand Down Expand Up @@ -568,7 +566,7 @@ public CommitStats commitStats() {
* @return the sequence number service
*/
public abstract LocalCheckpointTracker getLocalCheckpointTracker();

/**
* Global stats on segments.
*/
Expand Down Expand Up @@ -859,9 +857,10 @@ public void forceMerge(boolean flush) throws IOException {
* Snapshots the index and returns a handle to it. If needed will try and "commit" the
* lucene index to make sure we have a "fresh" copy of the files to snapshot.
*
* @param safeCommit indicates whether the engine should acquire the most recent safe commit, or the most recent commit.
* @param flushFirst indicates whether the engine should flush before returning the snapshot
*/
public abstract IndexCommitRef acquireIndexCommit(boolean flushFirst) throws EngineException;
public abstract IndexCommitRef acquireIndexCommit(boolean safeCommit, boolean flushFirst) throws EngineException;

/**
* fail engine due to some error. the engine will also be closed.
Expand Down Expand Up @@ -1437,9 +1436,9 @@ public static class IndexCommitRef implements Closeable {
private final CheckedRunnable<IOException> onClose;
private final IndexCommit indexCommit;

IndexCommitRef(SnapshotDeletionPolicy deletionPolicy) throws IOException {
indexCommit = deletionPolicy.snapshot();
onClose = () -> deletionPolicy.release(indexCommit);
IndexCommitRef(IndexCommit indexCommit, CheckedRunnable<IOException> onClose) {
this.indexCommit = indexCommit;
this.onClose = onClose;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public class InternalEngine extends Engine {

private final String uidField;

private final SnapshotDeletionPolicy snapshotDeletionPolicy;
private final CombinedDeletionPolicy combinedDeletionPolicy;

// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
Expand Down Expand Up @@ -184,9 +184,8 @@ public InternalEngine(EngineConfig engineConfig) {
assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG || startingCommit != null :
"Starting commit should be non-null; mode [" + openMode + "]; startingCommit [" + startingCommit + "]";
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier, startingCommit);
this.snapshotDeletionPolicy = new SnapshotDeletionPolicy(
new CombinedDeletionPolicy(openMode, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint)
);
this.combinedDeletionPolicy = new CombinedDeletionPolicy(openMode, translogDeletionPolicy,
translog::getLastSyncedGlobalCheckpoint);
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, startingCommit);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
assert engineConfig.getForceNewHistoryUUID() == false
Expand Down Expand Up @@ -1644,20 +1643,16 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu
}

@Override
public IndexCommitRef acquireIndexCommit(final boolean flushFirst) throws EngineException {
public IndexCommitRef acquireIndexCommit(final boolean safeCommit, final boolean flushFirst) throws EngineException {
// we have to flush outside of the readlock otherwise we might have a problem upgrading
// the to a write lock when we fail the engine in this operation
if (flushFirst) {
logger.trace("start flush for snapshot");
flush(false, true);
logger.trace("finish flush for snapshot");
}
try (ReleasableLock lock = readLock.acquire()) {
logger.trace("pulling snapshot");
return new IndexCommitRef(snapshotDeletionPolicy);
} catch (IOException e) {
throw new SnapshotFailedEngineException(shardId, e);
}
final IndexCommit snapshotCommit = combinedDeletionPolicy.acquireIndexCommit(safeCommit);
return new Engine.IndexCommitRef(snapshotCommit, () -> combinedDeletionPolicy.releaseCommit(snapshotCommit));
}

private boolean failOnTragicEvent(AlreadyClosedException ex) {
Expand Down Expand Up @@ -1828,7 +1823,7 @@ private IndexWriterConfig getIndexWriterConfig(boolean create, IndexCommit start
iwc.setCommitOnClose(false); // we by default don't commit on close
iwc.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
iwc.setIndexCommit(startingCommit);
iwc.setIndexDeletionPolicy(snapshotDeletionPolicy);
iwc.setIndexDeletionPolicy(combinedDeletionPolicy);
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
boolean verbose = false;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1085,13 +1085,14 @@ public org.apache.lucene.util.Version minimumCompatibleVersion() {
* Creates a new {@link IndexCommit} snapshot form the currently running engine. All resources referenced by this
* commit won't be freed until the commit / snapshot is closed.
*
* @param safeCommit <code>true</code> capture the most recent safe commit point; otherwise the most recent commit point.
* @param flushFirst <code>true</code> if the index should first be flushed to disk / a low level lucene commit should be executed
*/
public Engine.IndexCommitRef acquireIndexCommit(boolean flushFirst) throws EngineException {
public Engine.IndexCommitRef acquireIndexCommit(boolean safeCommit, boolean flushFirst) throws EngineException {
IndexShardState state = this.state; // one time volatile read
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
return getEngine().acquireIndexCommit(flushFirst);
return getEngine().acquireIndexCommit(safeCommit, flushFirst);
} else {
throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
}
Expand Down Expand Up @@ -1125,7 +1126,7 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
return store.getMetadata(null, true);
}
}
indexCommit = engine.acquireIndexCommit(false);
indexCommit = engine.acquireIndexCommit(false, false);
return store.getMetadata(indexCommit.getIndexCommit());
} finally {
store.decRef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ final class LocalShardSnapshot implements Closeable {
store.incRef();
boolean success = false;
try {
indexCommit = shard.acquireIndexCommit(true);
indexCommit = shard.acquireIndexCommit(false, true);
success = true;
} finally {
if (success == false) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ final void ensureOpen() {
*
* {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking
* {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard
* {@link IndexShard#acquireIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
* {@link IndexShard#acquireIndexCommit(boolean, boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
* @param commit the index commit to read the snapshot from or <code>null</code> if the latest snapshot should be read from the
* directory
* @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
Expand All @@ -270,7 +270,7 @@ public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException {
*
* {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking
* {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard
* {@link IndexShard#acquireIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
* {@link IndexShard#acquireIndexCommit(boolean, boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
*
* @param commit the index commit to read the snapshot from or <code>null</code> if the latest snapshot should be read from the
* directory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
} else {
final Engine.IndexCommitRef phase1Snapshot;
try {
phase1Snapshot = shard.acquireIndexCommit(false);
phase1Snapshot = shard.acquireIndexCommit(true, false);
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, fina
final Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository());
try {
// we flush first to make sure we get the latest writes snapshotted
try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(true)) {
try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(false, true)) {
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus);
if (logger.isDebugEnabled()) {
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
Expand Down
Loading

0 comments on commit 626c3d1

Please sign in to comment.