From 45c6c2046729c49bb1cdacf35cadc0adc558e924 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 17 Apr 2018 08:03:41 -0400 Subject: [PATCH] Enforce translog access via engine (#29542) Today the translog of an engine is exposed and can be accessed directly. While this exposure offers much flexibility, it also causes these troubles: - Inconsistent behavior between translog method and engine method. For example, rolling a translog generation via an engine also trims unreferenced files, but translog's method does not. - An engine does not get notified when critical errors happen in translog as the access is direct. This change isolates translog of an engine and enforces all accesses to translog via the engine. --- .../org/elasticsearch/index/IndexService.java | 3 +- .../elasticsearch/index/engine/Engine.java | 62 ++++++++++++++++++- .../index/engine/InternalEngine.java | 2 +- .../seqno/GlobalCheckpointSyncAction.java | 2 +- .../elasticsearch/index/shard/IndexShard.java | 48 ++++++++++---- .../index/shard/PrimaryReplicaSyncer.java | 2 +- .../index/shard/RefreshListeners.java | 10 +-- .../recovery/RecoverySourceHandler.java | 13 ++-- .../index/IndexServiceTests.java | 6 +- .../ESIndexLevelReplicationTestCase.java | 2 +- .../IndexLevelReplicationTests.java | 8 +-- .../RecoveryDuringReplicationTests.java | 6 +- .../GlobalCheckpointSyncActionTests.java | 5 +- .../index/shard/IndexShardIT.java | 11 ++-- .../index/shard/IndexShardTests.java | 25 ++++---- .../index/shard/RefreshListenersTests.java | 2 +- .../PeerRecoveryTargetServiceTests.java | 4 +- .../indices/recovery/RecoveryTests.java | 22 +++---- .../index/engine/EngineTestCase.java | 7 +++ .../index/shard/IndexShardTestCase.java | 6 ++ .../test/InternalTestCluster.java | 3 +- 21 files changed, 170 insertions(+), 79 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index db724112574a2..585406d01a6f6 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -697,8 +697,7 @@ private void maybeFSyncTranslogs() { if (indexSettings.getTranslogDurability() == Translog.Durability.ASYNC) { for (IndexShard shard : this.shards.values()) { try { - Translog translog = shard.getTranslog(); - if (translog.syncNeeded()) { + if (shard.isSyncNeeded()) { shard.sync(); } } catch (AlreadyClosedException ex) { diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index fab8cba468b56..4c782cb500418 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -66,6 +66,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogStats; import java.io.Closeable; import java.io.FileNotFoundException; @@ -510,8 +511,18 @@ public enum SearcherScope { EXTERNAL, INTERNAL } - /** returns the translog for this engine */ - public abstract Translog getTranslog(); + /** + * Returns the translog associated with this engine. + * Prefer to keep the translog package-private, so that an engine can control all accesses to the translog. + */ + abstract Translog getTranslog(); + + /** + * Checks if the underlying storage sync is required. + */ + public boolean isTranslogSyncNeeded() { + return getTranslog().syncNeeded(); + } /** * Ensures that all locations in the given stream have been written to the underlying storage. @@ -520,6 +531,36 @@ public enum SearcherScope { public abstract void syncTranslog() throws IOException; + public Closeable acquireTranslogRetentionLock() { + return getTranslog().acquireRetentionLock(); + } + + /** + * Creates a new translog snapshot from this engine for reading translog operations whose seq# at least the provided seq#. + * The caller has to close the returned snapshot after finishing the reading. + */ + public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException { + return getTranslog().newSnapshotFromMinSeqNo(minSeqNo); + } + + /** + * Returns the estimated number of translog operations in this engine whose seq# at least the provided seq#. + */ + public int estimateTranslogOperationsFromMinSeq(long minSeqNo) { + return getTranslog().estimateTotalOperationsFromMinSeq(minSeqNo); + } + + public TranslogStats getTranslogStats() { + return getTranslog().stats(); + } + + /** + * Returns the last location that the translog of this engine has written into. + */ + public Translog.Location getTranslogLastWriteLocation() { + return getTranslog().getLastWriteLocation(); + } + protected final void ensureOpen(Exception suppressed) { if (isClosed.get()) { AlreadyClosedException ace = new AlreadyClosedException(shardId + " engine is closed", failedEngine.get()); @@ -546,6 +587,13 @@ public CommitStats commitStats() { */ public abstract LocalCheckpointTracker getLocalCheckpointTracker(); + /** + * Returns the latest global checkpoint value that has been persisted in the underlying storage (i.e. translog's checkpoint) + */ + public long getLastSyncedGlobalCheckpoint() { + return getTranslog().getLastSyncedGlobalCheckpoint(); + } + /** * Global stats on segments. */ @@ -810,6 +858,16 @@ public final boolean refreshNeeded() { */ public abstract void trimTranslog() throws EngineException; + /** + * Tests whether or not the translog generation should be rolled to a new generation. + * This test is based on the size of the current generation compared to the configured generation threshold size. + * + * @return {@code true} if the current generation should be rolled to a new generation + */ + public boolean shouldRollTranslogGeneration() { + return getTranslog().shouldRollGeneration(); + } + /** * Rolls the translog generation and cleans unneeded. */ diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index dcd1ba65d8950..b28a5cd59e25b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -422,7 +422,7 @@ private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy } @Override - public Translog getTranslog() { + Translog getTranslog() { ensureOpen(); return translog; } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index 0ec03cb7a8f5e..9b55cff8cff9a 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -131,7 +131,7 @@ protected ReplicaResult shardOperationOnReplica(final Request request, final Ind private void maybeSyncTranslog(final IndexShard indexShard) throws IOException { if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST && - indexShard.getTranslog().getLastSyncedGlobalCheckpoint() < indexShard.getGlobalCheckpoint()) { + indexShard.getLastSyncedGlobalCheckpoint() < indexShard.getGlobalCheckpoint()) { indexShard.sync(); } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 520115dc30a46..def6362e334e4 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -934,7 +934,7 @@ public FieldDataStats fieldDataStats(String... fields) { } public TranslogStats translogStats() { - return getEngine().getTranslog().stats(); + return getEngine().getTranslogStats(); } public CompletionStats completionStats(String... fields) { @@ -1331,7 +1331,7 @@ private boolean assertMaxUnsafeAutoIdInCommit() throws IOException { } protected void onNewEngine(Engine newEngine) { - refreshListeners.setTranslog(newEngine.getTranslog()); + refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation); } /** @@ -1563,8 +1563,7 @@ boolean shouldRollTranslogGeneration() { final Engine engine = getEngineOrNull(); if (engine != null) { try { - final Translog translog = engine.getTranslog(); - return translog.shouldRollGeneration(); + return engine.shouldRollTranslogGeneration(); } catch (final AlreadyClosedException e) { // we are already closed, no need to flush or roll } @@ -1579,9 +1578,26 @@ public void onSettingsChanged() { } } + /** + * Acquires a lock on the translog files, preventing them from being trimmed. + */ public Closeable acquireTranslogRetentionLock() { - Engine engine = getEngine(); - return engine.getTranslog().acquireRetentionLock(); + return getEngine().acquireTranslogRetentionLock(); + } + + /** + * Creates a new translog snapshot for reading translog operations whose seq# at least the provided seq#. + * The caller has to close the returned snapshot after finishing the reading. + */ + public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException { + return getEngine().newTranslogSnapshotFromMinSeqNo(minSeqNo); + } + + /** + * Returns the estimated number of operations in translog whose seq# at least the provided seq#. + */ + public int estimateTranslogOperationsFromMinSeq(long minSeqNo) { + return getEngine().estimateTranslogOperationsFromMinSeq(minSeqNo); } public List segments(boolean verbose) { @@ -1592,10 +1608,6 @@ public void flushAndCloseEngine() throws IOException { getEngine().flushAndClose(); } - public Translog getTranslog() { - return getEngine().getTranslog(); - } - public String getHistoryUUID() { return getEngine().getHistoryUUID(); } @@ -1733,6 +1745,13 @@ public long getGlobalCheckpoint() { return replicationTracker.getGlobalCheckpoint(); } + /** + * Returns the latest global checkpoint value that has been persisted in the underlying storage (i.e. translog's checkpoint) + */ + public long getLastSyncedGlobalCheckpoint() { + return getEngine().getLastSyncedGlobalCheckpoint(); + } + /** * Get the local knowledge of the global checkpoints for all in-sync allocation IDs. * @@ -2308,6 +2327,13 @@ public void sync() throws IOException { getEngine().syncTranslog(); } + /** + * Checks if the underlying storage sync is required. + */ + public boolean isSyncNeeded() { + return getEngine().isTranslogSyncNeeded(); + } + /** * Returns the current translog durability mode */ @@ -2467,7 +2493,7 @@ final long getLastSearcherAccess() { } private void setRefreshPending(Engine engine) { - Translog.Location lastWriteLocation = engine.getTranslog().getLastWriteLocation(); + Translog.Location lastWriteLocation = engine.getTranslogLastWriteLocation(); Translog.Location location; do { location = this.pendingRefreshLocation.get(); diff --git a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index 1e31eae7d417f..af8c9bdd0272f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -83,7 +83,7 @@ public void resync(final IndexShard indexShard, final ActionListener ActionListener resyncListener = null; try { final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1; - Translog.Snapshot snapshot = indexShard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo); + Translog.Snapshot snapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo); resyncListener = new ActionListener() { @Override public void onResponse(final ResyncTask resyncTask) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java b/server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java index 17e824eb046c7..d8a51d58ad956 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java @@ -32,6 +32,7 @@ import java.util.concurrent.Executor; import java.util.function.Consumer; import java.util.function.IntSupplier; +import java.util.function.Supplier; import static java.util.Objects.requireNonNull; @@ -153,21 +154,20 @@ public int pendingCount() { /** * Setup the translog used to find the last refreshed location. */ - public void setTranslog(Translog translog) { - this.translog = translog; + public void setCurrentRefreshLocationSupplier(Supplier currentRefreshLocationSupplier) { + this.currentRefreshLocationSupplier = currentRefreshLocationSupplier; } - // Implementation of ReferenceManager.RefreshListener that adapts Lucene's RefreshListener into Elasticsearch's refresh listeners. - private Translog translog; /** * Snapshot of the translog location before the current refresh if there is a refresh going on or null. Doesn't have to be volatile * because when it is used by the refreshing thread. */ private Translog.Location currentRefreshLocation; + private Supplier currentRefreshLocationSupplier; @Override public void beforeRefresh() throws IOException { - currentRefreshLocation = translog.getLastWriteLocation(); + currentRefreshLocation = currentRefreshLocationSupplier.get(); } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 710b4bc46e235..78f44ee723114 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -145,9 +145,6 @@ public RecoveryResponse recoverToTarget() throws IOException { }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered "); try (Closeable ignored = shard.acquireTranslogRetentionLock()) { - - final Translog translog = shard.getTranslog(); - final long startingSeqNo; final long requiredSeqNoRangeStart; final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && @@ -170,7 +167,7 @@ public RecoveryResponse recoverToTarget() throws IOException { requiredSeqNoRangeStart = Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; try { - phase1(phase1Snapshot.getIndexCommit(), translog::totalOperations); + phase1(phase1Snapshot.getIndexCommit(), () -> shard.estimateTranslogOperationsFromMinSeq(startingSeqNo)); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e); } finally { @@ -187,7 +184,7 @@ public RecoveryResponse recoverToTarget() throws IOException { try { // For a sequence based recovery, the target can keep its local translog - prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); + prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, shard.estimateTranslogOperationsFromMinSeq(startingSeqNo)); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e); } @@ -210,9 +207,9 @@ public RecoveryResponse recoverToTarget() throws IOException { logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo); - logger.trace("snapshot translog for recovery; current size is [{}]", translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); + logger.trace("snapshot translog for recovery; current size is [{}]", shard.estimateTranslogOperationsFromMinSeq(startingSeqNo)); final long targetLocalCheckpoint; - try(Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(startingSeqNo)) { + try(Translog.Snapshot snapshot = shard.newTranslogSnapshotFromMinSeqNo(startingSeqNo)) { targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot); } catch (Exception e) { throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e); @@ -261,7 +258,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException { // the start recovery request is initialized with the starting sequence number set to the target shard's local checkpoint plus one if (startingSeqNo - 1 <= localCheckpoint) { final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); - try (Translog.Snapshot snapshot = shard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { + try (Translog.Snapshot snapshot = shard.newTranslogSnapshotFromMinSeqNo(startingSeqNo)) { Translog.Operation operation; while ((operation = snapshot.next()) != null) { if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { diff --git a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java index 5000af6688f83..4dc6a859a5c5a 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -250,7 +250,7 @@ public void testAsyncFsyncActuallyWorks() throws Exception { client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON).get(); IndexShard shard = indexService.getShard(0); assertBusy(() -> { - assertFalse(shard.getTranslog().syncNeeded()); + assertFalse(shard.isSyncNeeded()); }); } @@ -275,7 +275,7 @@ public void testRescheduleAsyncFsync() throws Exception { client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON).get(); assertNotNull(indexService.getFsyncTask()); final IndexShard shard = indexService.getShard(0); - assertBusy(() -> assertFalse(shard.getTranslog().syncNeeded())); + assertBusy(() -> assertFalse(shard.isSyncNeeded())); client() .admin() @@ -311,7 +311,7 @@ public void testAsyncTranslogTrimActuallyWorks() throws Exception { indexService.updateMetaData(metaData); IndexShard shard = indexService.getShard(0); - assertBusy(() -> assertThat(shard.getTranslog().totalOperations(), equalTo(0))); + assertBusy(() -> assertThat(shard.estimateTranslogOperationsFromMinSeq(0L), equalTo(0))); } public void testIllegalFsyncInterval() { diff --git a/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 97fc1b528acf3..fd62318f96c3a 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -681,7 +681,7 @@ class GlobalCheckpointSync extends ReplicationAction< @Override protected PrimaryResult performOnPrimary( final IndexShard primary, final GlobalCheckpointSyncAction.Request request) throws Exception { - primary.getTranslog().sync(); + primary.sync(); return new PrimaryResult(request, new ReplicationResponse()); } diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index baa56ee9585f6..2d2aaac7bbd26 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -330,7 +330,7 @@ public void testSeqNoCollision() throws Exception { final Translog.Operation op1; final List initOperations = new ArrayList<>(initDocs); - try (Translog.Snapshot snapshot = replica2.getTranslog().newSnapshot()) { + try (Translog.Snapshot snapshot = getTranslog(replica2).newSnapshot()) { assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); for (int i = 0; i < initDocs; i++) { Translog.Operation op = snapshot.next(); @@ -347,7 +347,7 @@ public void testSeqNoCollision() throws Exception { shards.promoteReplicaToPrimary(replica1).get(); // wait until resync completed. shards.index(new IndexRequest(index.getName(), "type", "d2").source("{}", XContentType.JSON)); final Translog.Operation op2; - try (Translog.Snapshot snapshot = replica2.getTranslog().newSnapshot()) { + try (Translog.Snapshot snapshot = getTranslog(replica2).newSnapshot()) { assertThat(snapshot.totalOperations(), equalTo(initDocs + 2)); op2 = snapshot.next(); assertThat(op2.seqNo(), equalTo(op1.seqNo())); @@ -362,7 +362,7 @@ public void testSeqNoCollision() throws Exception { shards.promoteReplicaToPrimary(replica2); logger.info("--> Recover replica3 from replica2"); recoverReplica(replica3, replica2); - try (Translog.Snapshot snapshot = replica3.getTranslog().newSnapshot()) { + try (Translog.Snapshot snapshot = getTranslog(replica3).newSnapshot()) { assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); assertThat(snapshot.next(), equalTo(op2)); assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations)); @@ -468,7 +468,7 @@ private static void assertNoOpTranslogOperationForDocumentFailure( long expectedPrimaryTerm, String failureMessage) throws IOException { for (IndexShard indexShard : replicationGroup) { - try(Translog.Snapshot snapshot = indexShard.getTranslog().newSnapshot()) { + try(Translog.Snapshot snapshot = getTranslog(indexShard).newSnapshot()) { assertThat(snapshot.totalOperations(), equalTo(expectedOperation)); long expectedSeqNo = 0L; Translog.Operation op = snapshot.next(); diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index c7469f2432ad3..323b0364dfb93 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -128,7 +128,7 @@ public void testRecoveryOfDisconnectedReplica() throws Exception { shards.flush(); translogTrimmed = randomBoolean(); if (translogTrimmed) { - final Translog translog = shards.getPrimary().getTranslog(); + final Translog translog = getTranslog(shards.getPrimary()); translog.getDeletionPolicy().setRetentionAgeInMillis(0); translog.trimUnreferencedReaders(); } @@ -271,7 +271,7 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { // otherwise the deletion policy won't trim translog assertBusy(() -> { shards.syncGlobalCheckpoint(); - assertThat(newPrimary.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(newPrimary.seqNoStats().getMaxSeqNo())); + assertThat(newPrimary.getLastSyncedGlobalCheckpoint(), equalTo(newPrimary.seqNoStats().getMaxSeqNo())); }); newPrimary.flush(new FlushRequest()); uncommittedOpsOnPrimary = shards.indexDocs(randomIntBetween(0, 10)); @@ -340,7 +340,7 @@ public void testReplicaRollbackStaleDocumentsInPeerRecovery() throws Exception { // Index more docs - move the global checkpoint >= seqno of the stale operations. goodDocs += shards.indexDocs(scaledRandomIntBetween(staleDocs, staleDocs * 5)); shards.syncGlobalCheckpoint(); - assertThat(replica.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(replica.seqNoStats().getMaxSeqNo())); + assertThat(replica.getLastSyncedGlobalCheckpoint(), equalTo(replica.seqNoStats().getMaxSeqNo())); // Recover a replica again should also rollback the stale documents. shards.removeReplica(replica); replica.close("recover replica - second time", false); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java index 70813531aeb0e..596575abc3025 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java @@ -90,9 +90,6 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception { final Translog.Durability durability = randomFrom(Translog.Durability.ASYNC, Translog.Durability.REQUEST); when(indexShard.getTranslogDurability()).thenReturn(durability); - final Translog translog = mock(Translog.class); - when(indexShard.getTranslog()).thenReturn(translog); - final long globalCheckpoint = randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Integer.MAX_VALUE); final long lastSyncedGlobalCheckpoint; if (randomBoolean() && globalCheckpoint != SequenceNumbers.NO_OPS_PERFORMED) { @@ -104,7 +101,7 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception { } when(indexShard.getGlobalCheckpoint()).thenReturn(globalCheckpoint); - when(translog.getLastSyncedGlobalCheckpoint()).thenReturn(lastSyncedGlobalCheckpoint); + when(indexShard.getLastSyncedGlobalCheckpoint()).thenReturn(lastSyncedGlobalCheckpoint); final GlobalCheckpointSyncAction action = new GlobalCheckpointSyncAction( Settings.EMPTY, diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index f7ee54b32ee84..bc34aa60c4925 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -107,6 +107,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.elasticsearch.index.shard.IndexShardTestCase.getTranslog; public class IndexShardIT extends ESSingleNodeTestCase { @@ -167,7 +168,7 @@ public void testDurableFlagHasEffect() throws Exception { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService test = indicesService.indexService(resolveIndex("test")); IndexShard shard = test.getShardOrNull(0); - Translog translog = ShardUtilsTests.getShardEngine(shard).getTranslog(); + Translog translog = getTranslog(shard); Predicate needsSync = (tlog) -> { // we can't use tlog.needsSync() here since it also takes the global checkpoint into account // we explicitly want to check here if our durability checks are taken into account so we only @@ -343,7 +344,7 @@ public void testMaybeFlush() throws Exception { SourceToParse.source("test", "test", "1", new BytesArray("{}"), XContentType.JSON), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, update -> {}); assertTrue(shard.shouldPeriodicallyFlush()); - final Translog translog = shard.getEngine().getTranslog(); + final Translog translog = getTranslog(shard); assertEquals(2, translog.stats().getUncommittedOperations()); client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON) .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); @@ -384,7 +385,7 @@ public void testMaybeRollTranslogGeneration() throws Exception { final IndexService test = indicesService.indexService(resolveIndex("test")); final IndexShard shard = test.getShardOrNull(0); int rolls = 0; - final Translog translog = shard.getEngine().getTranslog(); + final Translog translog = getTranslog(shard); final long generation = translog.currentFileGeneration(); final int numberOfDocuments = randomIntBetween(32, 128); for (int i = 0; i < numberOfDocuments; i++) { @@ -454,11 +455,11 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception { assertThat(shard.flushStats().getPeriodic(), equalTo(periodic + 1)); }; } else { - final long generation = shard.getEngine().getTranslog().currentFileGeneration(); + final long generation = getTranslog(shard).currentFileGeneration(); client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get(); check = () -> assertEquals( generation + 1, - shard.getEngine().getTranslog().currentFileGeneration()); + getTranslog(shard).currentFileGeneration()); } assertBusy(check); running.set(false); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 5506bc515f24c..b608bc9cc5081 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -72,7 +72,6 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; @@ -518,7 +517,7 @@ public void onFailure(Exception e) { public void testPrimaryPromotionRollsGeneration() throws Exception { final IndexShard indexShard = newStartedShard(false); - final long currentTranslogGeneration = indexShard.getTranslog().getGeneration().translogFileGeneration; + final long currentTranslogGeneration = getTranslog(indexShard).getGeneration().translogFileGeneration; // promote the replica final ShardRouting replicaRouting = indexShard.routingEntry(); @@ -556,8 +555,8 @@ public void onFailure(Exception e) { ThreadPool.Names.GENERIC, ""); latch.await(); - assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(currentTranslogGeneration + 1)); - assertThat(TestTranslog.getCurrentTerm(indexShard.getTranslog()), equalTo(newPrimaryTerm)); + assertThat(getTranslog(indexShard).getGeneration().translogFileGeneration, equalTo(currentTranslogGeneration + 1)); + assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm)); closeShards(indexShard); } @@ -578,7 +577,7 @@ public void testOperationPermitsOnPrimaryShards() throws InterruptedException, E true, ShardRoutingState.STARTED, replicaRouting.allocationId()); final long newPrimaryTerm = indexShard.getPrimaryTerm() + between(1, 1000); indexShard.updateShardState(primaryRouting, newPrimaryTerm, (shard, listener) -> { - assertThat(TestTranslog.getCurrentTerm(indexShard.getTranslog()), equalTo(newPrimaryTerm)); + assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm)); }, 0L, Collections.singleton(indexShard.routingEntry().allocationId().getId()), new IndexShardRoutingTable.Builder(indexShard.shardId()).addShard(primaryRouting).build(), @@ -669,7 +668,7 @@ public void testOperationPermitOnReplicaShards() throws Exception { } final long primaryTerm = indexShard.getPrimaryTerm(); - final long translogGen = engineClosed ? -1 : indexShard.getTranslog().getGeneration().translogFileGeneration; + final long translogGen = engineClosed ? -1 : getTranslog(indexShard).getGeneration().translogFileGeneration; final Releasable operation1; final Releasable operation2; @@ -747,7 +746,7 @@ public void onFailure(Exception e) { @Override public void onResponse(Releasable releasable) { assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm)); - assertThat(TestTranslog.getCurrentTerm(indexShard.getTranslog()), equalTo(newPrimaryTerm)); + assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm)); assertThat(indexShard.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint)); assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint)); onResponse.set(true); @@ -793,25 +792,25 @@ private void finish() { assertFalse(onResponse.get()); assertNull(onFailure.get()); assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm)); - assertThat(TestTranslog.getCurrentTerm(indexShard.getTranslog()), equalTo(primaryTerm)); + assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(primaryTerm)); Releasables.close(operation1); // our operation should still be blocked assertFalse(onResponse.get()); assertNull(onFailure.get()); assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm)); - assertThat(TestTranslog.getCurrentTerm(indexShard.getTranslog()), equalTo(primaryTerm)); + assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(primaryTerm)); Releasables.close(operation2); barrier.await(); // now lock acquisition should have succeeded assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm)); - assertThat(TestTranslog.getCurrentTerm(indexShard.getTranslog()), equalTo(newPrimaryTerm)); + assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm)); if (engineClosed) { assertFalse(onResponse.get()); assertThat(onFailure.get(), instanceOf(AlreadyClosedException.class)); } else { assertTrue(onResponse.get()); assertNull(onFailure.get()); - assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(translogGen + 1)); + assertThat(getTranslog(indexShard).getGeneration().translogFileGeneration, equalTo(translogGen + 1)); assertThat(indexShard.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint)); assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint)); } @@ -1647,7 +1646,7 @@ public void testRecoverFromStoreWithNoOps() throws IOException { assertEquals(1, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(1, newShard.recoveryState().getTranslog().totalOperationsOnStart()); assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); - try (Translog.Snapshot snapshot = newShard.getTranslog().newSnapshot()) { + try (Translog.Snapshot snapshot = getTranslog(newShard).newSnapshot()) { Translog.Operation operation; int numNoops = 0; while ((operation = snapshot.next()) != null) { @@ -2048,7 +2047,7 @@ public void testTranslogRecoverySyncsTranslog() throws IOException { @Override public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps); - assertFalse(replica.getTranslog().syncNeeded()); + assertFalse(replica.isSyncNeeded()); return localCheckpoint; } }, true); diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 5803bf263633d..2d1c1d4e15af8 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -133,7 +133,7 @@ indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilari (e, s) -> 0, new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED, () -> primaryTerm); engine = new InternalEngine(config); engine.recoverFromTranslog(); - listeners.setTranslog(engine.getTranslog()); + listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); } @After diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index d65d40e5bcdaa..91b35594772cf 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -59,7 +59,7 @@ public void testGetStartingSeqNo() throws Exception { } flushShard(replica); replica.updateGlobalCheckpointOnReplica(initDocs - 1, "test"); - replica.getTranslog().sync(); + replica.sync(); final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null); assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs)); recoveryTarget.decRef(); @@ -81,7 +81,7 @@ public void testGetStartingSeqNo() throws Exception { // Advances the global checkpoint, a safe commit also advances { replica.updateGlobalCheckpointOnReplica(initDocs + moreDocs - 1, "test"); - replica.getTranslog().sync(); + replica.sync(); final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null); assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs + moreDocs)); recoveryTarget.decRef(); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index f46ab7ebbd603..4e9d0ccb22e11 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -61,7 +61,7 @@ public void testTranslogHistoryTransferred() throws Exception { try (ReplicationGroup shards = createGroup(0)) { shards.startPrimary(); int docs = shards.indexDocs(10); - shards.getPrimary().getTranslog().rollGeneration(); + getTranslog(shards.getPrimary()).rollGeneration(); shards.flush(); if (randomBoolean()) { docs += shards.indexDocs(10); @@ -69,7 +69,7 @@ public void testTranslogHistoryTransferred() throws Exception { shards.addReplica(); shards.startAll(); final IndexShard replica = shards.getReplicas().get(0); - assertThat(replica.getTranslog().totalOperations(), equalTo(docs)); + assertThat(replica.estimateTranslogOperationsFromMinSeq(0), equalTo(docs)); } } @@ -77,7 +77,7 @@ public void testRetentionPolicyChangeDuringRecovery() throws Exception { try (ReplicationGroup shards = createGroup(0)) { shards.startPrimary(); shards.indexDocs(10); - shards.getPrimary().getTranslog().rollGeneration(); + getTranslog(shards.getPrimary()).rollGeneration(); shards.flush(); shards.indexDocs(10); final IndexShard replica = shards.addReplica(); @@ -99,7 +99,7 @@ public void testRetentionPolicyChangeDuringRecovery() throws Exception { releaseRecovery.countDown(); future.get(); // rolling/flushing is async - assertBusy(() -> assertThat(replica.getTranslog().totalOperations(), equalTo(0))); + assertBusy(() -> assertThat(replica.estimateTranslogOperationsFromMinSeq(0), equalTo(0))); } } @@ -123,7 +123,7 @@ public void testRecoveryWithOutOfOrderDelete() throws Exception { // delete #1 orgReplica.applyDeleteOperationOnReplica(1, 2, "type", "id", VersionType.EXTERNAL, u -> {}); - orgReplica.getTranslog().rollGeneration(); // isolate the delete in it's own generation + getTranslog(orgReplica).rollGeneration(); // isolate the delete in it's own generation // index #0 orgReplica.applyIndexOperationOnReplica(0, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(indexName, "type", "id", new BytesArray("{}"), XContentType.JSON), u -> {}); @@ -167,7 +167,7 @@ public void testRecoveryWithOutOfOrderDelete() throws Exception { shards.recoverReplica(newReplica); shards.assertAllEqual(3); - assertThat(newReplica.getTranslog().totalOperations(), equalTo(translogOps)); + assertThat(newReplica.estimateTranslogOperationsFromMinSeq(0), equalTo(translogOps)); } } @@ -184,7 +184,7 @@ public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception { IndexShard replica = shards.getReplicas().get(0); final String historyUUID = replica.getHistoryUUID(); - Translog.TranslogGeneration translogGeneration = replica.getTranslog().getGeneration(); + Translog.TranslogGeneration translogGeneration = getTranslog(replica).getGeneration(); shards.removeReplica(replica); replica.close("test", false); IndexWriterConfig iwc = new IndexWriterConfig(null) @@ -219,7 +219,7 @@ public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception { shards.recoverReplica(newReplica); // file based recovery should be made assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty())); - assertThat(newReplica.getTranslog().totalOperations(), equalTo(numDocs)); + assertThat(newReplica.estimateTranslogOperationsFromMinSeq(0), equalTo(numDocs)); // history uuid was restored assertThat(newReplica.getHistoryUUID(), equalTo(historyUUID)); @@ -238,7 +238,7 @@ public void testPeerRecoveryPersistGlobalCheckpoint() throws Exception { } final IndexShard replica = shards.addReplica(); shards.recoverReplica(replica); - assertThat(replica.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(numDocs - 1)); + assertThat(replica.getLastSyncedGlobalCheckpoint(), equalTo(numDocs - 1)); } } @@ -291,7 +291,7 @@ public void testSequenceBasedRecoveryKeepsTranslog() throws Exception { final IndexShard newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId()); shards.recoverReplica(newReplica); - try (Translog.Snapshot snapshot = newReplica.getTranslog().newSnapshot()) { + try (Translog.Snapshot snapshot = getTranslog(newReplica).newSnapshot()) { assertThat("Sequence based recovery should keep existing translog", snapshot, SnapshotMatchers.size(initDocs + moreDocs)); } assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(uncommittedDocs + moreDocs)); @@ -321,7 +321,7 @@ public void testShouldFlushAfterPeerRecovery() throws Exception { shards.recoverReplica(replica); // Make sure the flushing will eventually be completed (eg. `shouldPeriodicallyFlush` is false) assertBusy(() -> assertThat(getEngine(replica).shouldPeriodicallyFlush(), equalTo(false))); - assertThat(replica.getTranslog().totalOperations(), equalTo(numDocs)); + assertThat(replica.estimateTranslogOperationsFromMinSeq(0), equalTo(numDocs)); shards.assertAllEqual(numDocs); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index dea92c2927d86..8fff17900b072 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -493,4 +493,11 @@ protected Engine.Delete replicaDeleteForDoc(String id, long version, long seqNo, return new Engine.Delete("test", id, newUid(id), seqNo, 1, version, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, startTime); } + + /** + * Exposes a translog associated with the given engine for testing purpose. + */ + public static Translog getTranslog(Engine engine) { + return engine.getTranslog(); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 0d535d9af3851..a0e1cfc334110 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -55,6 +55,7 @@ import org.elasticsearch.index.cache.query.DisabledQueryCache; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; @@ -66,6 +67,7 @@ import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; @@ -643,6 +645,10 @@ public static Engine getEngine(IndexShard indexShard) { return indexShard.getEngine(); } + public static Translog getTranslog(IndexShard shard) { + return EngineTestCase.getTranslog(getEngine(shard)); + } + public static ReplicationTracker getReplicationTracker(IndexShard indexShard) { return indexShard.getReplicationTracker(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index d82b5052dbf54..12acd21903ec4 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -76,6 +76,7 @@ import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; @@ -1158,7 +1159,7 @@ private void assertOpenTranslogReferences() throws Exception { for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { try { - indexShard.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); + IndexShardTestCase.getTranslog(indexShard).getDeletionPolicy().assertNoOpenTranslogRefs(); } catch (AlreadyClosedException ok) { // all good }