diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index db724112574a2..585406d01a6f6 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -697,8 +697,7 @@ private void maybeFSyncTranslogs() { if (indexSettings.getTranslogDurability() == Translog.Durability.ASYNC) { for (IndexShard shard : this.shards.values()) { try { - Translog translog = shard.getTranslog(); - if (translog.syncNeeded()) { + if (shard.isSyncNeeded()) { shard.sync(); } } catch (AlreadyClosedException ex) { diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index fab8cba468b56..4c782cb500418 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -66,6 +66,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogStats; import java.io.Closeable; import java.io.FileNotFoundException; @@ -510,8 +511,18 @@ public enum SearcherScope { EXTERNAL, INTERNAL } - /** returns the translog for this engine */ - public abstract Translog getTranslog(); + /** + * Returns the translog associated with this engine. + * Prefer to keep the translog package-private, so that an engine can control all accesses to the translog. + */ + abstract Translog getTranslog(); + + /** + * Checks if the underlying storage sync is required. + */ + public boolean isTranslogSyncNeeded() { + return getTranslog().syncNeeded(); + } /** * Ensures that all locations in the given stream have been written to the underlying storage. @@ -520,6 +531,36 @@ public enum SearcherScope { public abstract void syncTranslog() throws IOException; + public Closeable acquireTranslogRetentionLock() { + return getTranslog().acquireRetentionLock(); + } + + /** + * Creates a new translog snapshot from this engine for reading translog operations whose seq# at least the provided seq#. + * The caller has to close the returned snapshot after finishing the reading. + */ + public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException { + return getTranslog().newSnapshotFromMinSeqNo(minSeqNo); + } + + /** + * Returns the estimated number of translog operations in this engine whose seq# at least the provided seq#. + */ + public int estimateTranslogOperationsFromMinSeq(long minSeqNo) { + return getTranslog().estimateTotalOperationsFromMinSeq(minSeqNo); + } + + public TranslogStats getTranslogStats() { + return getTranslog().stats(); + } + + /** + * Returns the last location that the translog of this engine has written into. + */ + public Translog.Location getTranslogLastWriteLocation() { + return getTranslog().getLastWriteLocation(); + } + protected final void ensureOpen(Exception suppressed) { if (isClosed.get()) { AlreadyClosedException ace = new AlreadyClosedException(shardId + " engine is closed", failedEngine.get()); @@ -546,6 +587,13 @@ public CommitStats commitStats() { */ public abstract LocalCheckpointTracker getLocalCheckpointTracker(); + /** + * Returns the latest global checkpoint value that has been persisted in the underlying storage (i.e. translog's checkpoint) + */ + public long getLastSyncedGlobalCheckpoint() { + return getTranslog().getLastSyncedGlobalCheckpoint(); + } + /** * Global stats on segments. */ @@ -810,6 +858,16 @@ public final boolean refreshNeeded() { */ public abstract void trimTranslog() throws EngineException; + /** + * Tests whether or not the translog generation should be rolled to a new generation. + * This test is based on the size of the current generation compared to the configured generation threshold size. + * + * @return {@code true} if the current generation should be rolled to a new generation + */ + public boolean shouldRollTranslogGeneration() { + return getTranslog().shouldRollGeneration(); + } + /** * Rolls the translog generation and cleans unneeded. */ diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index dcd1ba65d8950..b28a5cd59e25b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -422,7 +422,7 @@ private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy } @Override - public Translog getTranslog() { + Translog getTranslog() { ensureOpen(); return translog; } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index 0ec03cb7a8f5e..9b55cff8cff9a 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -131,7 +131,7 @@ protected ReplicaResult shardOperationOnReplica(final Request request, final Ind private void maybeSyncTranslog(final IndexShard indexShard) throws IOException { if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST && - indexShard.getTranslog().getLastSyncedGlobalCheckpoint() < indexShard.getGlobalCheckpoint()) { + indexShard.getLastSyncedGlobalCheckpoint() < indexShard.getGlobalCheckpoint()) { indexShard.sync(); } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 520115dc30a46..def6362e334e4 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -934,7 +934,7 @@ public FieldDataStats fieldDataStats(String... fields) { } public TranslogStats translogStats() { - return getEngine().getTranslog().stats(); + return getEngine().getTranslogStats(); } public CompletionStats completionStats(String... fields) { @@ -1331,7 +1331,7 @@ private boolean assertMaxUnsafeAutoIdInCommit() throws IOException { } protected void onNewEngine(Engine newEngine) { - refreshListeners.setTranslog(newEngine.getTranslog()); + refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation); } /** @@ -1563,8 +1563,7 @@ boolean shouldRollTranslogGeneration() { final Engine engine = getEngineOrNull(); if (engine != null) { try { - final Translog translog = engine.getTranslog(); - return translog.shouldRollGeneration(); + return engine.shouldRollTranslogGeneration(); } catch (final AlreadyClosedException e) { // we are already closed, no need to flush or roll } @@ -1579,9 +1578,26 @@ public void onSettingsChanged() { } } + /** + * Acquires a lock on the translog files, preventing them from being trimmed. + */ public Closeable acquireTranslogRetentionLock() { - Engine engine = getEngine(); - return engine.getTranslog().acquireRetentionLock(); + return getEngine().acquireTranslogRetentionLock(); + } + + /** + * Creates a new translog snapshot for reading translog operations whose seq# at least the provided seq#. + * The caller has to close the returned snapshot after finishing the reading. + */ + public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException { + return getEngine().newTranslogSnapshotFromMinSeqNo(minSeqNo); + } + + /** + * Returns the estimated number of operations in translog whose seq# at least the provided seq#. + */ + public int estimateTranslogOperationsFromMinSeq(long minSeqNo) { + return getEngine().estimateTranslogOperationsFromMinSeq(minSeqNo); } public List segments(boolean verbose) { @@ -1592,10 +1608,6 @@ public void flushAndCloseEngine() throws IOException { getEngine().flushAndClose(); } - public Translog getTranslog() { - return getEngine().getTranslog(); - } - public String getHistoryUUID() { return getEngine().getHistoryUUID(); } @@ -1733,6 +1745,13 @@ public long getGlobalCheckpoint() { return replicationTracker.getGlobalCheckpoint(); } + /** + * Returns the latest global checkpoint value that has been persisted in the underlying storage (i.e. translog's checkpoint) + */ + public long getLastSyncedGlobalCheckpoint() { + return getEngine().getLastSyncedGlobalCheckpoint(); + } + /** * Get the local knowledge of the global checkpoints for all in-sync allocation IDs. * @@ -2308,6 +2327,13 @@ public void sync() throws IOException { getEngine().syncTranslog(); } + /** + * Checks if the underlying storage sync is required. + */ + public boolean isSyncNeeded() { + return getEngine().isTranslogSyncNeeded(); + } + /** * Returns the current translog durability mode */ @@ -2467,7 +2493,7 @@ final long getLastSearcherAccess() { } private void setRefreshPending(Engine engine) { - Translog.Location lastWriteLocation = engine.getTranslog().getLastWriteLocation(); + Translog.Location lastWriteLocation = engine.getTranslogLastWriteLocation(); Translog.Location location; do { location = this.pendingRefreshLocation.get(); diff --git a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index 1e31eae7d417f..af8c9bdd0272f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -83,7 +83,7 @@ public void resync(final IndexShard indexShard, final ActionListener ActionListener resyncListener = null; try { final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1; - Translog.Snapshot snapshot = indexShard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo); + Translog.Snapshot snapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo); resyncListener = new ActionListener() { @Override public void onResponse(final ResyncTask resyncTask) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java b/server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java index 17e824eb046c7..d8a51d58ad956 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java @@ -32,6 +32,7 @@ import java.util.concurrent.Executor; import java.util.function.Consumer; import java.util.function.IntSupplier; +import java.util.function.Supplier; import static java.util.Objects.requireNonNull; @@ -153,21 +154,20 @@ public int pendingCount() { /** * Setup the translog used to find the last refreshed location. */ - public void setTranslog(Translog translog) { - this.translog = translog; + public void setCurrentRefreshLocationSupplier(Supplier currentRefreshLocationSupplier) { + this.currentRefreshLocationSupplier = currentRefreshLocationSupplier; } - // Implementation of ReferenceManager.RefreshListener that adapts Lucene's RefreshListener into Elasticsearch's refresh listeners. - private Translog translog; /** * Snapshot of the translog location before the current refresh if there is a refresh going on or null. Doesn't have to be volatile * because when it is used by the refreshing thread. */ private Translog.Location currentRefreshLocation; + private Supplier currentRefreshLocationSupplier; @Override public void beforeRefresh() throws IOException { - currentRefreshLocation = translog.getLastWriteLocation(); + currentRefreshLocation = currentRefreshLocationSupplier.get(); } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 710b4bc46e235..78f44ee723114 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -145,9 +145,6 @@ public RecoveryResponse recoverToTarget() throws IOException { }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered "); try (Closeable ignored = shard.acquireTranslogRetentionLock()) { - - final Translog translog = shard.getTranslog(); - final long startingSeqNo; final long requiredSeqNoRangeStart; final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && @@ -170,7 +167,7 @@ public RecoveryResponse recoverToTarget() throws IOException { requiredSeqNoRangeStart = Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; try { - phase1(phase1Snapshot.getIndexCommit(), translog::totalOperations); + phase1(phase1Snapshot.getIndexCommit(), () -> shard.estimateTranslogOperationsFromMinSeq(startingSeqNo)); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e); } finally { @@ -187,7 +184,7 @@ public RecoveryResponse recoverToTarget() throws IOException { try { // For a sequence based recovery, the target can keep its local translog - prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); + prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, shard.estimateTranslogOperationsFromMinSeq(startingSeqNo)); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e); } @@ -210,9 +207,9 @@ public RecoveryResponse recoverToTarget() throws IOException { logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo); - logger.trace("snapshot translog for recovery; current size is [{}]", translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); + logger.trace("snapshot translog for recovery; current size is [{}]", shard.estimateTranslogOperationsFromMinSeq(startingSeqNo)); final long targetLocalCheckpoint; - try(Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(startingSeqNo)) { + try(Translog.Snapshot snapshot = shard.newTranslogSnapshotFromMinSeqNo(startingSeqNo)) { targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot); } catch (Exception e) { throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e); @@ -261,7 +258,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException { // the start recovery request is initialized with the starting sequence number set to the target shard's local checkpoint plus one if (startingSeqNo - 1 <= localCheckpoint) { final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); - try (Translog.Snapshot snapshot = shard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { + try (Translog.Snapshot snapshot = shard.newTranslogSnapshotFromMinSeqNo(startingSeqNo)) { Translog.Operation operation; while ((operation = snapshot.next()) != null) { if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { diff --git a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java index 5000af6688f83..4dc6a859a5c5a 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -250,7 +250,7 @@ public void testAsyncFsyncActuallyWorks() throws Exception { client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON).get(); IndexShard shard = indexService.getShard(0); assertBusy(() -> { - assertFalse(shard.getTranslog().syncNeeded()); + assertFalse(shard.isSyncNeeded()); }); } @@ -275,7 +275,7 @@ public void testRescheduleAsyncFsync() throws Exception { client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON).get(); assertNotNull(indexService.getFsyncTask()); final IndexShard shard = indexService.getShard(0); - assertBusy(() -> assertFalse(shard.getTranslog().syncNeeded())); + assertBusy(() -> assertFalse(shard.isSyncNeeded())); client() .admin() @@ -311,7 +311,7 @@ public void testAsyncTranslogTrimActuallyWorks() throws Exception { indexService.updateMetaData(metaData); IndexShard shard = indexService.getShard(0); - assertBusy(() -> assertThat(shard.getTranslog().totalOperations(), equalTo(0))); + assertBusy(() -> assertThat(shard.estimateTranslogOperationsFromMinSeq(0L), equalTo(0))); } public void testIllegalFsyncInterval() { diff --git a/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 97fc1b528acf3..fd62318f96c3a 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -681,7 +681,7 @@ class GlobalCheckpointSync extends ReplicationAction< @Override protected PrimaryResult performOnPrimary( final IndexShard primary, final GlobalCheckpointSyncAction.Request request) throws Exception { - primary.getTranslog().sync(); + primary.sync(); return new PrimaryResult(request, new ReplicationResponse()); } diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index baa56ee9585f6..2d2aaac7bbd26 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -330,7 +330,7 @@ public void testSeqNoCollision() throws Exception { final Translog.Operation op1; final List initOperations = new ArrayList<>(initDocs); - try (Translog.Snapshot snapshot = replica2.getTranslog().newSnapshot()) { + try (Translog.Snapshot snapshot = getTranslog(replica2).newSnapshot()) { assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); for (int i = 0; i < initDocs; i++) { Translog.Operation op = snapshot.next(); @@ -347,7 +347,7 @@ public void testSeqNoCollision() throws Exception { shards.promoteReplicaToPrimary(replica1).get(); // wait until resync completed. shards.index(new IndexRequest(index.getName(), "type", "d2").source("{}", XContentType.JSON)); final Translog.Operation op2; - try (Translog.Snapshot snapshot = replica2.getTranslog().newSnapshot()) { + try (Translog.Snapshot snapshot = getTranslog(replica2).newSnapshot()) { assertThat(snapshot.totalOperations(), equalTo(initDocs + 2)); op2 = snapshot.next(); assertThat(op2.seqNo(), equalTo(op1.seqNo())); @@ -362,7 +362,7 @@ public void testSeqNoCollision() throws Exception { shards.promoteReplicaToPrimary(replica2); logger.info("--> Recover replica3 from replica2"); recoverReplica(replica3, replica2); - try (Translog.Snapshot snapshot = replica3.getTranslog().newSnapshot()) { + try (Translog.Snapshot snapshot = getTranslog(replica3).newSnapshot()) { assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); assertThat(snapshot.next(), equalTo(op2)); assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations)); @@ -468,7 +468,7 @@ private static void assertNoOpTranslogOperationForDocumentFailure( long expectedPrimaryTerm, String failureMessage) throws IOException { for (IndexShard indexShard : replicationGroup) { - try(Translog.Snapshot snapshot = indexShard.getTranslog().newSnapshot()) { + try(Translog.Snapshot snapshot = getTranslog(indexShard).newSnapshot()) { assertThat(snapshot.totalOperations(), equalTo(expectedOperation)); long expectedSeqNo = 0L; Translog.Operation op = snapshot.next(); diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index c7469f2432ad3..323b0364dfb93 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -128,7 +128,7 @@ public void testRecoveryOfDisconnectedReplica() throws Exception { shards.flush(); translogTrimmed = randomBoolean(); if (translogTrimmed) { - final Translog translog = shards.getPrimary().getTranslog(); + final Translog translog = getTranslog(shards.getPrimary()); translog.getDeletionPolicy().setRetentionAgeInMillis(0); translog.trimUnreferencedReaders(); } @@ -271,7 +271,7 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { // otherwise the deletion policy won't trim translog assertBusy(() -> { shards.syncGlobalCheckpoint(); - assertThat(newPrimary.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(newPrimary.seqNoStats().getMaxSeqNo())); + assertThat(newPrimary.getLastSyncedGlobalCheckpoint(), equalTo(newPrimary.seqNoStats().getMaxSeqNo())); }); newPrimary.flush(new FlushRequest()); uncommittedOpsOnPrimary = shards.indexDocs(randomIntBetween(0, 10)); @@ -340,7 +340,7 @@ public void testReplicaRollbackStaleDocumentsInPeerRecovery() throws Exception { // Index more docs - move the global checkpoint >= seqno of the stale operations. goodDocs += shards.indexDocs(scaledRandomIntBetween(staleDocs, staleDocs * 5)); shards.syncGlobalCheckpoint(); - assertThat(replica.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(replica.seqNoStats().getMaxSeqNo())); + assertThat(replica.getLastSyncedGlobalCheckpoint(), equalTo(replica.seqNoStats().getMaxSeqNo())); // Recover a replica again should also rollback the stale documents. shards.removeReplica(replica); replica.close("recover replica - second time", false); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java index 70813531aeb0e..596575abc3025 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java @@ -90,9 +90,6 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception { final Translog.Durability durability = randomFrom(Translog.Durability.ASYNC, Translog.Durability.REQUEST); when(indexShard.getTranslogDurability()).thenReturn(durability); - final Translog translog = mock(Translog.class); - when(indexShard.getTranslog()).thenReturn(translog); - final long globalCheckpoint = randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Integer.MAX_VALUE); final long lastSyncedGlobalCheckpoint; if (randomBoolean() && globalCheckpoint != SequenceNumbers.NO_OPS_PERFORMED) { @@ -104,7 +101,7 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception { } when(indexShard.getGlobalCheckpoint()).thenReturn(globalCheckpoint); - when(translog.getLastSyncedGlobalCheckpoint()).thenReturn(lastSyncedGlobalCheckpoint); + when(indexShard.getLastSyncedGlobalCheckpoint()).thenReturn(lastSyncedGlobalCheckpoint); final GlobalCheckpointSyncAction action = new GlobalCheckpointSyncAction( Settings.EMPTY, diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index f7ee54b32ee84..bc34aa60c4925 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -107,6 +107,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.elasticsearch.index.shard.IndexShardTestCase.getTranslog; public class IndexShardIT extends ESSingleNodeTestCase { @@ -167,7 +168,7 @@ public void testDurableFlagHasEffect() throws Exception { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService test = indicesService.indexService(resolveIndex("test")); IndexShard shard = test.getShardOrNull(0); - Translog translog = ShardUtilsTests.getShardEngine(shard).getTranslog(); + Translog translog = getTranslog(shard); Predicate needsSync = (tlog) -> { // we can't use tlog.needsSync() here since it also takes the global checkpoint into account // we explicitly want to check here if our durability checks are taken into account so we only @@ -343,7 +344,7 @@ public void testMaybeFlush() throws Exception { SourceToParse.source("test", "test", "1", new BytesArray("{}"), XContentType.JSON), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, update -> {}); assertTrue(shard.shouldPeriodicallyFlush()); - final Translog translog = shard.getEngine().getTranslog(); + final Translog translog = getTranslog(shard); assertEquals(2, translog.stats().getUncommittedOperations()); client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON) .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); @@ -384,7 +385,7 @@ public void testMaybeRollTranslogGeneration() throws Exception { final IndexService test = indicesService.indexService(resolveIndex("test")); final IndexShard shard = test.getShardOrNull(0); int rolls = 0; - final Translog translog = shard.getEngine().getTranslog(); + final Translog translog = getTranslog(shard); final long generation = translog.currentFileGeneration(); final int numberOfDocuments = randomIntBetween(32, 128); for (int i = 0; i < numberOfDocuments; i++) { @@ -454,11 +455,11 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception { assertThat(shard.flushStats().getPeriodic(), equalTo(periodic + 1)); }; } else { - final long generation = shard.getEngine().getTranslog().currentFileGeneration(); + final long generation = getTranslog(shard).currentFileGeneration(); client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get(); check = () -> assertEquals( generation + 1, - shard.getEngine().getTranslog().currentFileGeneration()); + getTranslog(shard).currentFileGeneration()); } assertBusy(check); running.set(false); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 5506bc515f24c..b608bc9cc5081 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -72,7 +72,6 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; @@ -518,7 +517,7 @@ public void onFailure(Exception e) { public void testPrimaryPromotionRollsGeneration() throws Exception { final IndexShard indexShard = newStartedShard(false); - final long currentTranslogGeneration = indexShard.getTranslog().getGeneration().translogFileGeneration; + final long currentTranslogGeneration = getTranslog(indexShard).getGeneration().translogFileGeneration; // promote the replica final ShardRouting replicaRouting = indexShard.routingEntry(); @@ -556,8 +555,8 @@ public void onFailure(Exception e) { ThreadPool.Names.GENERIC, ""); latch.await(); - assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(currentTranslogGeneration + 1)); - assertThat(TestTranslog.getCurrentTerm(indexShard.getTranslog()), equalTo(newPrimaryTerm)); + assertThat(getTranslog(indexShard).getGeneration().translogFileGeneration, equalTo(currentTranslogGeneration + 1)); + assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm)); closeShards(indexShard); } @@ -578,7 +577,7 @@ public void testOperationPermitsOnPrimaryShards() throws InterruptedException, E true, ShardRoutingState.STARTED, replicaRouting.allocationId()); final long newPrimaryTerm = indexShard.getPrimaryTerm() + between(1, 1000); indexShard.updateShardState(primaryRouting, newPrimaryTerm, (shard, listener) -> { - assertThat(TestTranslog.getCurrentTerm(indexShard.getTranslog()), equalTo(newPrimaryTerm)); + assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm)); }, 0L, Collections.singleton(indexShard.routingEntry().allocationId().getId()), new IndexShardRoutingTable.Builder(indexShard.shardId()).addShard(primaryRouting).build(), @@ -669,7 +668,7 @@ public void testOperationPermitOnReplicaShards() throws Exception { } final long primaryTerm = indexShard.getPrimaryTerm(); - final long translogGen = engineClosed ? -1 : indexShard.getTranslog().getGeneration().translogFileGeneration; + final long translogGen = engineClosed ? -1 : getTranslog(indexShard).getGeneration().translogFileGeneration; final Releasable operation1; final Releasable operation2; @@ -747,7 +746,7 @@ public void onFailure(Exception e) { @Override public void onResponse(Releasable releasable) { assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm)); - assertThat(TestTranslog.getCurrentTerm(indexShard.getTranslog()), equalTo(newPrimaryTerm)); + assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm)); assertThat(indexShard.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint)); assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint)); onResponse.set(true); @@ -793,25 +792,25 @@ private void finish() { assertFalse(onResponse.get()); assertNull(onFailure.get()); assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm)); - assertThat(TestTranslog.getCurrentTerm(indexShard.getTranslog()), equalTo(primaryTerm)); + assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(primaryTerm)); Releasables.close(operation1); // our operation should still be blocked assertFalse(onResponse.get()); assertNull(onFailure.get()); assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm)); - assertThat(TestTranslog.getCurrentTerm(indexShard.getTranslog()), equalTo(primaryTerm)); + assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(primaryTerm)); Releasables.close(operation2); barrier.await(); // now lock acquisition should have succeeded assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm)); - assertThat(TestTranslog.getCurrentTerm(indexShard.getTranslog()), equalTo(newPrimaryTerm)); + assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm)); if (engineClosed) { assertFalse(onResponse.get()); assertThat(onFailure.get(), instanceOf(AlreadyClosedException.class)); } else { assertTrue(onResponse.get()); assertNull(onFailure.get()); - assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(translogGen + 1)); + assertThat(getTranslog(indexShard).getGeneration().translogFileGeneration, equalTo(translogGen + 1)); assertThat(indexShard.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint)); assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint)); } @@ -1647,7 +1646,7 @@ public void testRecoverFromStoreWithNoOps() throws IOException { assertEquals(1, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(1, newShard.recoveryState().getTranslog().totalOperationsOnStart()); assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); - try (Translog.Snapshot snapshot = newShard.getTranslog().newSnapshot()) { + try (Translog.Snapshot snapshot = getTranslog(newShard).newSnapshot()) { Translog.Operation operation; int numNoops = 0; while ((operation = snapshot.next()) != null) { @@ -2048,7 +2047,7 @@ public void testTranslogRecoverySyncsTranslog() throws IOException { @Override public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps); - assertFalse(replica.getTranslog().syncNeeded()); + assertFalse(replica.isSyncNeeded()); return localCheckpoint; } }, true); diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 5803bf263633d..2d1c1d4e15af8 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -133,7 +133,7 @@ indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilari (e, s) -> 0, new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED, () -> primaryTerm); engine = new InternalEngine(config); engine.recoverFromTranslog(); - listeners.setTranslog(engine.getTranslog()); + listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); } @After diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index d65d40e5bcdaa..91b35594772cf 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -59,7 +59,7 @@ public void testGetStartingSeqNo() throws Exception { } flushShard(replica); replica.updateGlobalCheckpointOnReplica(initDocs - 1, "test"); - replica.getTranslog().sync(); + replica.sync(); final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null); assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs)); recoveryTarget.decRef(); @@ -81,7 +81,7 @@ public void testGetStartingSeqNo() throws Exception { // Advances the global checkpoint, a safe commit also advances { replica.updateGlobalCheckpointOnReplica(initDocs + moreDocs - 1, "test"); - replica.getTranslog().sync(); + replica.sync(); final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null); assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs + moreDocs)); recoveryTarget.decRef(); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index f46ab7ebbd603..4e9d0ccb22e11 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -61,7 +61,7 @@ public void testTranslogHistoryTransferred() throws Exception { try (ReplicationGroup shards = createGroup(0)) { shards.startPrimary(); int docs = shards.indexDocs(10); - shards.getPrimary().getTranslog().rollGeneration(); + getTranslog(shards.getPrimary()).rollGeneration(); shards.flush(); if (randomBoolean()) { docs += shards.indexDocs(10); @@ -69,7 +69,7 @@ public void testTranslogHistoryTransferred() throws Exception { shards.addReplica(); shards.startAll(); final IndexShard replica = shards.getReplicas().get(0); - assertThat(replica.getTranslog().totalOperations(), equalTo(docs)); + assertThat(replica.estimateTranslogOperationsFromMinSeq(0), equalTo(docs)); } } @@ -77,7 +77,7 @@ public void testRetentionPolicyChangeDuringRecovery() throws Exception { try (ReplicationGroup shards = createGroup(0)) { shards.startPrimary(); shards.indexDocs(10); - shards.getPrimary().getTranslog().rollGeneration(); + getTranslog(shards.getPrimary()).rollGeneration(); shards.flush(); shards.indexDocs(10); final IndexShard replica = shards.addReplica(); @@ -99,7 +99,7 @@ public void testRetentionPolicyChangeDuringRecovery() throws Exception { releaseRecovery.countDown(); future.get(); // rolling/flushing is async - assertBusy(() -> assertThat(replica.getTranslog().totalOperations(), equalTo(0))); + assertBusy(() -> assertThat(replica.estimateTranslogOperationsFromMinSeq(0), equalTo(0))); } } @@ -123,7 +123,7 @@ public void testRecoveryWithOutOfOrderDelete() throws Exception { // delete #1 orgReplica.applyDeleteOperationOnReplica(1, 2, "type", "id", VersionType.EXTERNAL, u -> {}); - orgReplica.getTranslog().rollGeneration(); // isolate the delete in it's own generation + getTranslog(orgReplica).rollGeneration(); // isolate the delete in it's own generation // index #0 orgReplica.applyIndexOperationOnReplica(0, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(indexName, "type", "id", new BytesArray("{}"), XContentType.JSON), u -> {}); @@ -167,7 +167,7 @@ public void testRecoveryWithOutOfOrderDelete() throws Exception { shards.recoverReplica(newReplica); shards.assertAllEqual(3); - assertThat(newReplica.getTranslog().totalOperations(), equalTo(translogOps)); + assertThat(newReplica.estimateTranslogOperationsFromMinSeq(0), equalTo(translogOps)); } } @@ -184,7 +184,7 @@ public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception { IndexShard replica = shards.getReplicas().get(0); final String historyUUID = replica.getHistoryUUID(); - Translog.TranslogGeneration translogGeneration = replica.getTranslog().getGeneration(); + Translog.TranslogGeneration translogGeneration = getTranslog(replica).getGeneration(); shards.removeReplica(replica); replica.close("test", false); IndexWriterConfig iwc = new IndexWriterConfig(null) @@ -219,7 +219,7 @@ public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception { shards.recoverReplica(newReplica); // file based recovery should be made assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty())); - assertThat(newReplica.getTranslog().totalOperations(), equalTo(numDocs)); + assertThat(newReplica.estimateTranslogOperationsFromMinSeq(0), equalTo(numDocs)); // history uuid was restored assertThat(newReplica.getHistoryUUID(), equalTo(historyUUID)); @@ -238,7 +238,7 @@ public void testPeerRecoveryPersistGlobalCheckpoint() throws Exception { } final IndexShard replica = shards.addReplica(); shards.recoverReplica(replica); - assertThat(replica.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(numDocs - 1)); + assertThat(replica.getLastSyncedGlobalCheckpoint(), equalTo(numDocs - 1)); } } @@ -291,7 +291,7 @@ public void testSequenceBasedRecoveryKeepsTranslog() throws Exception { final IndexShard newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId()); shards.recoverReplica(newReplica); - try (Translog.Snapshot snapshot = newReplica.getTranslog().newSnapshot()) { + try (Translog.Snapshot snapshot = getTranslog(newReplica).newSnapshot()) { assertThat("Sequence based recovery should keep existing translog", snapshot, SnapshotMatchers.size(initDocs + moreDocs)); } assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(uncommittedDocs + moreDocs)); @@ -321,7 +321,7 @@ public void testShouldFlushAfterPeerRecovery() throws Exception { shards.recoverReplica(replica); // Make sure the flushing will eventually be completed (eg. `shouldPeriodicallyFlush` is false) assertBusy(() -> assertThat(getEngine(replica).shouldPeriodicallyFlush(), equalTo(false))); - assertThat(replica.getTranslog().totalOperations(), equalTo(numDocs)); + assertThat(replica.estimateTranslogOperationsFromMinSeq(0), equalTo(numDocs)); shards.assertAllEqual(numDocs); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index dea92c2927d86..8fff17900b072 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -493,4 +493,11 @@ protected Engine.Delete replicaDeleteForDoc(String id, long version, long seqNo, return new Engine.Delete("test", id, newUid(id), seqNo, 1, version, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, startTime); } + + /** + * Exposes a translog associated with the given engine for testing purpose. + */ + public static Translog getTranslog(Engine engine) { + return engine.getTranslog(); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 0d535d9af3851..a0e1cfc334110 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -55,6 +55,7 @@ import org.elasticsearch.index.cache.query.DisabledQueryCache; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; @@ -66,6 +67,7 @@ import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; @@ -643,6 +645,10 @@ public static Engine getEngine(IndexShard indexShard) { return indexShard.getEngine(); } + public static Translog getTranslog(IndexShard shard) { + return EngineTestCase.getTranslog(getEngine(shard)); + } + public static ReplicationTracker getReplicationTracker(IndexShard indexShard) { return indexShard.getReplicationTracker(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index d82b5052dbf54..12acd21903ec4 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -76,6 +76,7 @@ import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; @@ -1158,7 +1159,7 @@ private void assertOpenTranslogReferences() throws Exception { for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { try { - indexShard.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); + IndexShardTestCase.getTranslog(indexShard).getDeletionPolicy().assertNoOpenTranslogRefs(); } catch (AlreadyClosedException ok) { // all good }