Skip to content

Commit

Permalink
Avoid casting - manually mock index commit
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Jan 10, 2018
1 parent cce0742 commit 0285001
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) {
/**
* Releases an index commit that acquired by {@link #acquireIndexCommit(boolean)}.
*/
synchronized void releaseCommit(final IndexCommit snapshotCommit) {
final IndexCommit releasingCommit = ((SnapshotIndexCommit) snapshotCommit).delegate;
synchronized void releaseCommit(final IndexCommit releasingCommit) {
assert snapshottedCommits.containsKey(releasingCommit) : "Release non-snapshotted commit;" +
"snapshotted commits [" + snapshottedCommits + "], releasing commit [" + releasingCommit + "]";
final int refCount = snapshottedCommits.addTo(releasingCommit, -1); // release refCount
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static java.util.Collections.singletonList;
Expand All @@ -43,10 +45,6 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class CombinedDeletionPolicyTests extends ESTestCase {

Expand All @@ -57,7 +55,7 @@ public void testKeepCommitsAfterGlobalCheckpoint() throws Exception {

final LongArrayList maxSeqNoList = new LongArrayList();
final LongArrayList translogGenList = new LongArrayList();
final List<IndexCommit> commitList = new ArrayList<>();
final List<MockIndexCommit> commitList = new ArrayList<>();
int totalCommits = between(2, 20);
long lastMaxSeqNo = 0;
long lastTranslogGen = 0;
Expand All @@ -79,9 +77,9 @@ public void testKeepCommitsAfterGlobalCheckpoint() throws Exception {

for (int i = 0; i < commitList.size(); i++) {
if (i < keptIndex) {
verify(commitList.get(i), times(1)).delete();
assertThat(commitList.get(i).deleteTimes(), equalTo(1));
} else {
verify(commitList.get(i), never()).delete();
assertThat(commitList.get(i).deleteTimes(), equalTo(0));
}
}
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(translogGenList.get(keptIndex)));
Expand All @@ -96,10 +94,10 @@ public void testAcquireIndexCommit() throws Exception {

final long maxSeqNo1 = between(1, 1000);
final long translogGen1 = between(1, 100);
final IndexCommit c1 = mockIndexCommit(maxSeqNo1, translogUUID, translogGen1);
final MockIndexCommit c1 = mockIndexCommit(maxSeqNo1, translogUUID, translogGen1);
final long maxSeqNo2 = maxSeqNo1 + between(1, 1000);
final long translogGen2 = translogGen1 + between(1, 100);
final IndexCommit c2 = mockIndexCommit(maxSeqNo2, translogUUID, translogGen2);
final MockIndexCommit c2 = mockIndexCommit(maxSeqNo2, translogUUID, translogGen2);
globalCheckpoint.set(randomLongBetween(0, maxSeqNo2 - 1)); // Keep both c1 and c2.
indexPolicy.onCommit(Arrays.asList(c1, c2));
final IndexCommit ref1 = indexPolicy.acquireIndexCommit(true);
Expand All @@ -112,13 +110,13 @@ public void testAcquireIndexCommit() throws Exception {

globalCheckpoint.set(randomLongBetween(maxSeqNo2, Long.MAX_VALUE));
indexPolicy.onCommit(Arrays.asList(c1, c2)); // Policy keeps c2 only, but c1 is snapshotted.
verify(c1, times(0)).delete();
assertThat(c1.deleteTimes(), equalTo(0));
final IndexCommit ref3 = indexPolicy.acquireIndexCommit(true);
assertThat(ref3, equalTo(c2));
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(translogGen1));
indexPolicy.releaseCommit(ref1); // release acquired commit releases translog and commit
indexPolicy.onCommit(Arrays.asList(c1, c2)); // Flush new commit deletes c1
verify(c1, times(1)).delete();
assertThat(c1.deleteTimes(), equalTo(1));
}

public void testLegacyIndex() throws Exception {
Expand All @@ -129,28 +127,28 @@ public void testLegacyIndex() throws Exception {
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get);

long legacyTranslogGen = randomNonNegativeLong();
IndexCommit legacyCommit = mockLegacyIndexCommit(translogUUID, legacyTranslogGen);
MockIndexCommit legacyCommit = mockLegacyIndexCommit(translogUUID, legacyTranslogGen);
indexPolicy.onInit(singletonList(legacyCommit));
verify(legacyCommit, never()).delete();
assertThat(legacyCommit.deleteTimes(), equalTo(0));
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(legacyTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(legacyTranslogGen));

long safeTranslogGen = randomLongBetween(legacyTranslogGen, Long.MAX_VALUE);
long maxSeqNo = randomLongBetween(1, Long.MAX_VALUE);
final IndexCommit freshCommit = mockIndexCommit(maxSeqNo, translogUUID, safeTranslogGen);
final MockIndexCommit freshCommit = mockIndexCommit(maxSeqNo, translogUUID, safeTranslogGen);

globalCheckpoint.set(randomLongBetween(0, maxSeqNo - 1));
indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit));
verify(legacyCommit, times(0)).delete();
verify(freshCommit, times(0)).delete();
assertThat(legacyCommit.deleteTimes(), equalTo(0));
assertThat(freshCommit.deleteTimes(), equalTo(0));
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(legacyTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen));

// Make the fresh commit safe.
globalCheckpoint.set(randomLongBetween(maxSeqNo, Long.MAX_VALUE));
indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit));
verify(legacyCommit, times(1)).delete();
verify(freshCommit, times(0)).delete();
assertThat(legacyCommit.deleteTimes(), equalTo(1));
assertThat(freshCommit.deleteTimes(), equalTo(0));
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(safeTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen));
}
Expand All @@ -161,7 +159,7 @@ public void testDeleteInvalidCommits() throws Exception {
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_CREATE_TRANSLOG, translogPolicy, globalCheckpoint::get);

final int invalidCommits = between(1, 10);
final List<IndexCommit> commitList = new ArrayList<>();
final List<MockIndexCommit> commitList = new ArrayList<>();
for (int i = 0; i < invalidCommits; i++) {
commitList.add(mockIndexCommit(randomNonNegativeLong(), UUID.randomUUID(), randomNonNegativeLong()));
}
Expand All @@ -177,27 +175,84 @@ public void testDeleteInvalidCommits() throws Exception {
// We should never keep invalid commits regardless of the value of the global checkpoint.
indexPolicy.onCommit(commitList);
for (int i = 0; i < invalidCommits - 1; i++) {
verify(commitList.get(i), times(1)).delete();
assertThat(commitList.get(i).deleteTimes(), equalTo(1));
}
}

IndexCommit mockIndexCommit(long maxSeqNo, UUID translogUUID, long translogGen) throws IOException {
MockIndexCommit mockIndexCommit(long maxSeqNo, UUID translogUUID, long translogGen) throws IOException {
final Map<String, String> userData = new HashMap<>();
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID.toString());
userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGen));
final IndexCommit commit = mock(IndexCommit.class);
when(commit.getUserData()).thenReturn(userData);
when(commit.getDirectory()).thenReturn(mock(Directory.class));
return commit;
return new MockIndexCommit(randomNonNegativeLong(), mock(Directory.class), userData);
}

IndexCommit mockLegacyIndexCommit(UUID translogUUID, long translogGen) throws IOException {
MockIndexCommit mockLegacyIndexCommit(UUID translogUUID, long translogGen) throws IOException {
final Map<String, String> userData = new HashMap<>();
userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID.toString());
userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGen));
final IndexCommit commit = mock(IndexCommit.class);
when(commit.getUserData()).thenReturn(userData);
return commit;
return new MockIndexCommit(randomNonNegativeLong(), mock(Directory.class), userData);
}

static class MockIndexCommit extends IndexCommit {
private final long generation;
private final Directory directory;
private final Map<String, String> userData;
private final AtomicInteger deleteTimes = new AtomicInteger();

MockIndexCommit(long generation, Directory directory, Map<String, String> userData) {
this.generation = generation;
this.directory = directory;
this.userData = userData;
}

@Override
public String getSegmentsFileName() {
throw new UnsupportedOperationException();
}

@Override
public Collection<String> getFileNames() throws IOException {
throw new UnsupportedOperationException();
}

@Override
public Directory getDirectory() {
return directory;
}

@Override
public void delete() {
deleteTimes.getAndIncrement();
}

int deleteTimes(){
return deleteTimes.get();
}

@Override
public boolean isDeleted() {
return deleteTimes.get() > 0;
}

@Override
public int getSegmentCount() {
throw new UnsupportedOperationException();
}

@Override
public long getGeneration() {
return generation;
}

@Override
public Map<String, String> getUserData() throws IOException {
return userData;
}

@Override
public String toString() {
return "MockIndexCommit{" + userData + "}";
}
}
}

0 comments on commit 0285001

Please sign in to comment.