Skip to content

Commit

Permalink
Return non-deletable commit
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Jan 10, 2018
1 parent 7f4d848 commit cce0742
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import com.carrotsearch.hppc.cursors.ObjectIntCursor;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.store.Directory;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -94,22 +96,25 @@ public synchronized void onCommit(List<? extends IndexCommit> commits) throws IO
*
* @param acquiringSafeCommit captures the most recent safe commit point if true; otherwise captures the most recent commit point.
*/
synchronized Engine.IndexCommitRef acquireIndexCommit(boolean acquiringSafeCommit) {
synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) {
assert safeCommit != null : "Safe commit is not initialized yet";
assert lastCommit != null : "Last commit is not initialized yet";
final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit;
snapshottedCommits.addTo(snapshotting, 1); // increase refCount
return new Engine.IndexCommitRef(snapshotting, () -> releaseCommit(snapshotting));
return new SnapshotIndexCommit(snapshotting);
}

private synchronized void releaseCommit(IndexCommit releasingCommit) throws IOException {
/**
* Releases an index commit that acquired by {@link #acquireIndexCommit(boolean)}.
*/
synchronized void releaseCommit(final IndexCommit snapshotCommit) {
final IndexCommit releasingCommit = ((SnapshotIndexCommit) snapshotCommit).delegate;
assert snapshottedCommits.containsKey(releasingCommit) : "Release non-snapshotted commit;" +
"snapshotted commits [" + snapshottedCommits + "], releasing commit [" + releasingCommit + "]";
final int refCount = snapshottedCommits.addTo(releasingCommit, -1); // release refCount
assert refCount >= 0 : "Number of snapshots can not be negative [" + refCount + "]";
if (refCount == 0) {
snapshottedCommits.remove(releasingCommit);
updateTranslogDeletionPolicy();
}
}

Expand Down Expand Up @@ -178,4 +183,60 @@ private static int indexOfKeptCommits(List<? extends IndexCommit> commits, long
*/
return 0;
}

/**
* A wrapper of an index commit that prevents it from being deleted.
*/
private static class SnapshotIndexCommit extends IndexCommit {
private final IndexCommit delegate;

SnapshotIndexCommit(IndexCommit delegate) {
this.delegate = delegate;
}

@Override
public String getSegmentsFileName() {
return delegate.getSegmentsFileName();
}

@Override
public Collection<String> getFileNames() throws IOException {
return delegate.getFileNames();
}

@Override
public Directory getDirectory() {
return delegate.getDirectory();
}

@Override
public void delete() {
throw new UnsupportedOperationException("A snapshot commit does not support deletion");
}

@Override
public boolean isDeleted() {
return delegate.isDeleted();
}

@Override
public int getSegmentCount() {
return delegate.getSegmentCount();
}

@Override
public long getGeneration() {
return delegate.getGeneration();
}

@Override
public Map<String, String> getUserData() throws IOException {
return delegate.getUserData();
}

@Override
public String toString() {
return "SnapshotIndexCommit{" + delegate + "}";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1651,7 +1651,8 @@ public IndexCommitRef acquireIndexCommit(final boolean safeCommit, final boolean
flush(false, true);
logger.trace("finish flush for snapshot");
}
return combinedDeletionPolicy.acquireIndexCommit(safeCommit);
final IndexCommit snapshotCommit = combinedDeletionPolicy.acquireIndexCommit(safeCommit);
return new Engine.IndexCommitRef(snapshotCommit, () -> combinedDeletionPolicy.releaseCommit(snapshotCommit));
}

private boolean failOnTragicEvent(AlreadyClosedException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.carrotsearch.hppc.LongArrayList;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
Expand Down Expand Up @@ -103,23 +102,23 @@ public void testAcquireIndexCommit() throws Exception {
final IndexCommit c2 = mockIndexCommit(maxSeqNo2, translogUUID, translogGen2);
globalCheckpoint.set(randomLongBetween(0, maxSeqNo2 - 1)); // Keep both c1 and c2.
indexPolicy.onCommit(Arrays.asList(c1, c2));
final Engine.IndexCommitRef ref1 = indexPolicy.acquireIndexCommit(true);
assertThat(ref1.getIndexCommit(), equalTo(c1));
final Engine.IndexCommitRef ref2 = indexPolicy.acquireIndexCommit(false);
assertThat(ref2.getIndexCommit(), equalTo(c2));
final IndexCommit ref1 = indexPolicy.acquireIndexCommit(true);
assertThat(ref1, equalTo(c1));
expectThrows(UnsupportedOperationException.class, ref1::delete);
final IndexCommit ref2 = indexPolicy.acquireIndexCommit(false);
assertThat(ref2, equalTo(c2));
expectThrows(UnsupportedOperationException.class, ref2::delete);
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), lessThanOrEqualTo(100L));

globalCheckpoint.set(randomLongBetween(maxSeqNo2, Long.MAX_VALUE));
indexPolicy.onCommit(Arrays.asList(c1, c2)); // Policy keeps c2 only, but c1 is snapshotted.
verify(c1, times(0)).delete();
final Engine.IndexCommitRef ref3 = indexPolicy.acquireIndexCommit(true);
assertThat(ref3.getIndexCommit(), equalTo(c2));
final IndexCommit ref3 = indexPolicy.acquireIndexCommit(true);
assertThat(ref3, equalTo(c2));
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(translogGen1));
ref1.close(); // closing acquired commit releases translog and commit
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(translogGen2));
indexPolicy.releaseCommit(ref1); // release acquired commit releases translog and commit
indexPolicy.onCommit(Arrays.asList(c1, c2)); // Flush new commit deletes c1
verify(c1, times(1)).delete();
IOUtils.close(ref2, ref3);
}

public void testLegacyIndex() throws Exception {
Expand Down

0 comments on commit cce0742

Please sign in to comment.