Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Primary send safe commit in file-based recovery #28038

Merged
merged 8 commits into from
Jan 11, 2018
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@

package org.elasticsearch.index.engine;

import com.carrotsearch.hppc.ObjectIntHashMap;
import com.carrotsearch.hppc.cursors.ObjectIntCursor;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.store.Directory;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.LongSupplier;
Expand All @@ -42,12 +46,16 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final TranslogDeletionPolicy translogDeletionPolicy;
private final EngineConfig.OpenMode openMode;
private final LongSupplier globalCheckpointSupplier;
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
private IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The safe commit point is a constantly moving target (as the global checkpoint keeps going up and new commits are being added). I wonder if it's nicer to calculate the safe commit point when it's accessed in acquireIndexCommit, based on the then current globalcheckpoint and the current list of commits (This will require storing the last seen indexCommits, but that would be equivalent to what's being done in Lucene's SnapshotDeletionPolicy).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ywelsch Nhat has a follow up to trim unneeded commits as soon as the global checkpoint advances enough. This will have a side effect you mention (it will update things) with some added value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but that involves some extra machinery to update safeCommit at the right points in time (e.g. when gcp advances)? My suggestion here makes that unnecessary?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think the added value of the approach in the follow-up is that the clean-up logic is also possibly more eagerly invoked (when gcp advances).

private IndexCommit lastCommit; // the most recent commit point

CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier) {
this.openMode = openMode;
this.translogDeletionPolicy = translogDeletionPolicy;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.snapshottedCommits = new ObjectIntHashMap<>();
}

@Override
Expand All @@ -70,18 +78,53 @@ public void onInit(List<? extends IndexCommit> commits) throws IOException {
}

@Override
public void onCommit(List<? extends IndexCommit> commits) throws IOException {
public synchronized void onCommit(List<? extends IndexCommit> commits) throws IOException {
final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong());
lastCommit = commits.get(commits.size() - 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we assert that the translog gen is in this commit is lower than all the ones in higher commits?

safeCommit = commits.get(keptPosition);
for (int i = 0; i < keptPosition; i++) {
commits.get(i).delete();
if (snapshottedCommits.containsKey(commits.get(i)) == false) {
commits.get(i).delete();
}
}
updateTranslogDeletionPolicy(commits.get(keptPosition), commits.get(commits.size() - 1));
updateTranslogDeletionPolicy();
}

private void updateTranslogDeletionPolicy(final IndexCommit minRequiredCommit, final IndexCommit lastCommit) throws IOException {
assert minRequiredCommit.isDeleted() == false : "The minimum required commit must not be deleted";
final long minRequiredGen = Long.parseLong(minRequiredCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
/**
* Captures the most recent commit point {@link #lastCommit} or the most recent safe commit point {@link #safeCommit}.
* Index files and translog of the capturing commit point won't be released until the commit reference is closed.
*
* @param acquiringSafeCommit captures the most recent safe commit point if true; otherwise captures the most recent commit point.
*/
synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) {
assert safeCommit != null : "Safe commit is not initialized yet";
assert lastCommit != null : "Last commit is not initialized yet";
final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit;
snapshottedCommits.addTo(snapshotting, 1); // increase refCount
return new SnapshotIndexCommit(snapshotting);
}

/**
* Releases an index commit that acquired by {@link #acquireIndexCommit(boolean)}.
*/
synchronized void releaseCommit(final IndexCommit releasingCommit) {
assert snapshottedCommits.containsKey(releasingCommit) : "Release non-snapshotted commit;" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does that work? releasingCommit is a SnapshotIndexCommit ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The snapshotting commits are stored as keys in a HashMap. Both SnapshotIndexCommit and regular index commit inherit equals and hashCode from the root IndexCommit, thus they are interchangeable. This can be problematic if a regular index commit overrides equals or hashCode.

I see some options to avoid this.

  1. Exposes SnapshotIndexCommit to package level; makes acquireCommit and releaseCommit with SnapshotIndexCommit type.
  2. Delegate hashCode and equals of SnapshotIndexCommit to the original index commit.

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I think it's risky as the underlying IndexCommit may have a different implementation (it's not final). I see a 3rd option - we could use an identity map and sure people only release index commits they got from us. Until we need the ability to work all kind of crazyness like wrapped IndexCommits, I prefer to keep things strict.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lucene's SnapshotDeletionPolicy identifies the IndexCommit's based on their generation (long field).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed and agreed to keep the current implementation.

"snapshotted commits [" + snapshottedCommits + "], releasing commit [" + releasingCommit + "]";
final int refCount = snapshottedCommits.addTo(releasingCommit, -1); // release refCount
assert refCount >= 0 : "Number of snapshots can not be negative [" + refCount + "]";
if (refCount == 0) {
snapshottedCommits.remove(releasingCommit);
}
}

private void updateTranslogDeletionPolicy() throws IOException {
assert Thread.holdsLock(this);
assert safeCommit.isDeleted() == false : "The safe commit must not be deleted";
long minRequiredGen = Long.parseLong(safeCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
for (ObjectIntCursor<IndexCommit> entry : snapshottedCommits) {
assert entry.key.isDeleted() == false : "Snapshotted commit must not be deleted";
minRequiredGen = Math.min(minRequiredGen, Long.parseLong(entry.key.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)));
}
assert lastCommit.isDeleted() == false : "The last commit must not be deleted";
final long lastGen = Long.parseLong(lastCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));

Expand Down Expand Up @@ -139,4 +182,60 @@ private static int indexOfKeptCommits(List<? extends IndexCommit> commits, long
*/
return 0;
}

/**
* A wrapper of an index commit that prevents it from being deleted.
*/
private static class SnapshotIndexCommit extends IndexCommit {
private final IndexCommit delegate;

SnapshotIndexCommit(IndexCommit delegate) {
this.delegate = delegate;
}

@Override
public String getSegmentsFileName() {
return delegate.getSegmentsFileName();
}

@Override
public Collection<String> getFileNames() throws IOException {
return delegate.getFileNames();
}

@Override
public Directory getDirectory() {
return delegate.getDirectory();
}

@Override
public void delete() {
throw new UnsupportedOperationException("A snapshot commit does not support deletion");
}

@Override
public boolean isDeleted() {
return delegate.isDeleted();
}

@Override
public int getSegmentCount() {
return delegate.getSegmentCount();
}

@Override
public long getGeneration() {
return delegate.getGeneration();
}

@Override
public Map<String, String> getUserData() throws IOException {
return delegate.getUserData();
}

@Override
public String toString() {
return "SnapshotIndexCommit{" + delegate + "}";
}
}
}
13 changes: 6 additions & 7 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ReferenceManager;
Expand Down Expand Up @@ -92,7 +91,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

public abstract class Engine implements Closeable {

Expand Down Expand Up @@ -568,7 +566,7 @@ public CommitStats commitStats() {
* @return the sequence number service
*/
public abstract LocalCheckpointTracker getLocalCheckpointTracker();

/**
* Global stats on segments.
*/
Expand Down Expand Up @@ -859,9 +857,10 @@ public void forceMerge(boolean flush) throws IOException {
* Snapshots the index and returns a handle to it. If needed will try and "commit" the
* lucene index to make sure we have a "fresh" copy of the files to snapshot.
*
* @param safeCommit indicates whether the engine should acquire the most recent safe commit, or the most recent commit.
* @param flushFirst indicates whether the engine should flush before returning the snapshot
*/
public abstract IndexCommitRef acquireIndexCommit(boolean flushFirst) throws EngineException;
public abstract IndexCommitRef acquireIndexCommit(boolean safeCommit, boolean flushFirst) throws EngineException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should look in future to remove one of these booleans - either you want to "get everything" or you want a safe commit. I don't think there's a point in "flush but give me a safe commit"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Are you ok if we replace this by having two methods: acquireLastIndexCommit(flushFirst) and acquireSafeIndexCommit(no option)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree. Are you ok if we replace this by having two methods: acquireLastIndexCommit(flushFirst) and acquireSafeIndexCommit(no option)?

+1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double checking - the new methods will be a follow up?


/**
* fail engine due to some error. the engine will also be closed.
Expand Down Expand Up @@ -1437,9 +1436,9 @@ public static class IndexCommitRef implements Closeable {
private final CheckedRunnable<IOException> onClose;
private final IndexCommit indexCommit;

IndexCommitRef(SnapshotDeletionPolicy deletionPolicy) throws IOException {
indexCommit = deletionPolicy.snapshot();
onClose = () -> deletionPolicy.release(indexCommit);
IndexCommitRef(IndexCommit indexCommit, CheckedRunnable<IOException> onClose) {
this.indexCommit = indexCommit;
this.onClose = onClose;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public class InternalEngine extends Engine {

private final String uidField;

private final SnapshotDeletionPolicy snapshotDeletionPolicy;
private final CombinedDeletionPolicy combinedDeletionPolicy;

// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
Expand Down Expand Up @@ -184,9 +184,8 @@ public InternalEngine(EngineConfig engineConfig) {
assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG || startingCommit != null :
"Starting commit should be non-null; mode [" + openMode + "]; startingCommit [" + startingCommit + "]";
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier, startingCommit);
this.snapshotDeletionPolicy = new SnapshotDeletionPolicy(
new CombinedDeletionPolicy(openMode, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint)
);
this.combinedDeletionPolicy = new CombinedDeletionPolicy(openMode, translogDeletionPolicy,
translog::getLastSyncedGlobalCheckpoint);
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, startingCommit);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
assert engineConfig.getForceNewHistoryUUID() == false
Expand Down Expand Up @@ -1644,20 +1643,16 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu
}

@Override
public IndexCommitRef acquireIndexCommit(final boolean flushFirst) throws EngineException {
public IndexCommitRef acquireIndexCommit(final boolean safeCommit, final boolean flushFirst) throws EngineException {
// we have to flush outside of the readlock otherwise we might have a problem upgrading
// the to a write lock when we fail the engine in this operation
if (flushFirst) {
logger.trace("start flush for snapshot");
flush(false, true);
logger.trace("finish flush for snapshot");
}
try (ReleasableLock lock = readLock.acquire()) {
logger.trace("pulling snapshot");
return new IndexCommitRef(snapshotDeletionPolicy);
} catch (IOException e) {
throw new SnapshotFailedEngineException(shardId, e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means a potential change to the exception type. Can you double check it's OK?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be ok. The method snapshot of Lucene's SnapshotDeletionPolicy throws IOException but we don't. Acquiring a commit is just increasing refCount of a commit.

}
final IndexCommit snapshotCommit = combinedDeletionPolicy.acquireIndexCommit(safeCommit);
return new Engine.IndexCommitRef(snapshotCommit, () -> combinedDeletionPolicy.releaseCommit(snapshotCommit));
}

private boolean failOnTragicEvent(AlreadyClosedException ex) {
Expand Down Expand Up @@ -1828,7 +1823,7 @@ private IndexWriterConfig getIndexWriterConfig(boolean create, IndexCommit start
iwc.setCommitOnClose(false); // we by default don't commit on close
iwc.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
iwc.setIndexCommit(startingCommit);
iwc.setIndexDeletionPolicy(snapshotDeletionPolicy);
iwc.setIndexDeletionPolicy(combinedDeletionPolicy);
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
boolean verbose = false;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1085,13 +1085,14 @@ public org.apache.lucene.util.Version minimumCompatibleVersion() {
* Creates a new {@link IndexCommit} snapshot form the currently running engine. All resources referenced by this
* commit won't be freed until the commit / snapshot is closed.
*
* @param safeCommit <code>true</code> capture the most recent safe commit point; otherwise the most recent commit point.
* @param flushFirst <code>true</code> if the index should first be flushed to disk / a low level lucene commit should be executed
*/
public Engine.IndexCommitRef acquireIndexCommit(boolean flushFirst) throws EngineException {
public Engine.IndexCommitRef acquireIndexCommit(boolean safeCommit, boolean flushFirst) throws EngineException {
IndexShardState state = this.state; // one time volatile read
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
return getEngine().acquireIndexCommit(flushFirst);
return getEngine().acquireIndexCommit(safeCommit, flushFirst);
} else {
throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
}
Expand Down Expand Up @@ -1125,7 +1126,7 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
return store.getMetadata(null, true);
}
}
indexCommit = engine.acquireIndexCommit(false);
indexCommit = engine.acquireIndexCommit(false, false);
return store.getMetadata(indexCommit.getIndexCommit());
} finally {
store.decRef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ final class LocalShardSnapshot implements Closeable {
store.incRef();
boolean success = false;
try {
indexCommit = shard.acquireIndexCommit(true);
indexCommit = shard.acquireIndexCommit(false, true);
success = true;
} finally {
if (success == false) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ final void ensureOpen() {
*
* {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking
* {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard
* {@link IndexShard#acquireIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
* {@link IndexShard#acquireIndexCommit(boolean, boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
* @param commit the index commit to read the snapshot from or <code>null</code> if the latest snapshot should be read from the
* directory
* @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
Expand All @@ -270,7 +270,7 @@ public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException {
*
* {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking
* {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard
* {@link IndexShard#acquireIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
* {@link IndexShard#acquireIndexCommit(boolean, boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
*
* @param commit the index commit to read the snapshot from or <code>null</code> if the latest snapshot should be read from the
* directory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
} else {
final Engine.IndexCommitRef phase1Snapshot;
try {
phase1Snapshot = shard.acquireIndexCommit(false);
phase1Snapshot = shard.acquireIndexCommit(true, false);
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, fina
final Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository());
try {
// we flush first to make sure we get the latest writes snapshotted
try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(true)) {
try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(false, true)) {
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus);
if (logger.isDebugEnabled()) {
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
Expand Down
Loading