Skip to content

Commit

Permalink
Add test specific to translog trimming.
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed May 12, 2022
1 parent 5ea0e7b commit 5bccdd8
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ private Translog openTranslog(
LongConsumer persistedSequenceNumberConsumer
) throws IOException {
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final Map<String, String> userData = store.readLastCommittedSegmentsInfo().getUserData();
final Map<String, String> userData = lastCommittedSegmentInfos.getUserData();
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
return new Translog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,23 @@
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.index.mapper.ParsedDocument;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.TestTranslog;
import org.opensearch.index.translog.Translog;

import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;

public class NRTReplicationEngineTests extends EngineTestCase {

public void testCreateEngine() throws IOException {
Expand Down Expand Up @@ -68,6 +75,9 @@ public void testEngineWritesOpsToTranslog() throws Exception {
applyOperation(nrtEngine, op);
}

assertEquals(nrtEngine.getTranslogLastWriteLocation(), engine.getTranslogLastWriteLocation());
assertEquals(nrtEngine.getLastSyncedGlobalCheckpoint(), engine.getLastSyncedGlobalCheckpoint());

// we don't index into nrtEngine, so get the doc ids from the regular engine.
final List<DocIdSeqNoAndSource> docs = getDocIds(engine, true);

Expand Down Expand Up @@ -103,6 +113,18 @@ public void testUpdateSegments() throws Exception {
nrtEngine.updateSegments(engine.getLatestSegmentInfos(), engine.getProcessedLocalCheckpoint());
assertMatchingSegmentsAndCheckpoints(nrtEngine);

// assert a doc from the operations exists.
final ParsedDocument parsedDoc = createParsedDoc(operations.stream().findFirst().get().id(), null);
try (Engine.GetResult getResult = engine.get(newGet(true, parsedDoc), engine::acquireSearcher)) {
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
}

try (Engine.GetResult getResult = nrtEngine.get(newGet(true, parsedDoc), nrtEngine::acquireSearcher)) {
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
}

// Flush the primary and update the NRTEngine with the latest committed infos.
engine.flush();
nrtEngine.syncTranslog(); // to advance persisted checkpoint
Expand All @@ -120,6 +142,37 @@ public void testUpdateSegments() throws Exception {
}
}

public void testTrimTranslogOps() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

try (
final Store nrtEngineStore = createStore();
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore);
) {
List<Engine.Operation> operations = generateHistoryOnReplica(
between(1, 100),
randomBoolean(),
randomBoolean(),
randomBoolean()
);
applyOperations(nrtEngine, operations);
Set<Long> seqNos = operations.stream().map(Engine.Operation::seqNo).collect(Collectors.toSet());
try (Translog.Snapshot snapshot = nrtEngine.getTranslog().newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
assertThat(
TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()),
equalTo(seqNos)
);
}
nrtEngine.rollTranslogGeneration();
nrtEngine.trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED);
try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(0));
assertNull(snapshot.next());
}
}
}

private void assertMatchingSegmentsAndCheckpoints(NRTReplicationEngine nrtEngine) throws IOException {
assertEquals(engine.getPersistedLocalCheckpoint(), nrtEngine.getPersistedLocalCheckpoint());
assertEquals(engine.getProcessedLocalCheckpoint(), nrtEngine.getProcessedLocalCheckpoint());
Expand Down

0 comments on commit 5bccdd8

Please sign in to comment.