From 8de46596aa64a97b4a6fc5d2a3479692eb896201 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 21 Jun 2018 09:30:08 -0400 Subject: [PATCH] Use Lucene soft-deletes in peer recovery (#30522) This commit adds Lucene soft-deletes as another source for peer-recovery besides translog. Relates #29530 --- docs/reference/indices/flush.asciidoc | 3 +- .../index/engine/CombinedDeletionPolicy.java | 12 +- .../elasticsearch/index/engine/Engine.java | 24 +++- .../index/engine/InternalEngine.java | 111 +++++++++++---- .../index/engine/LuceneChangesSnapshot.java | 2 +- .../index/engine/SoftDeletesPolicy.java | 120 ++++++++++++++++ .../elasticsearch/index/shard/IndexShard.java | 29 +++- .../index/shard/PrimaryReplicaSyncer.java | 1 + .../index/translog/TranslogWriter.java | 21 ++- .../recovery/RecoverySourceHandler.java | 52 ++----- .../gateway/RecoveryFromGatewayIT.java | 17 ++- .../index/IndexServiceTests.java | 3 +- .../engine/CombinedDeletionPolicyTests.java | 77 ++++++++--- .../index/engine/InternalEngineTests.java | 128 +++++++++++++++--- .../engine/LuceneChangesSnapshotTests.java | 2 +- .../index/engine/SoftDeletesPolicyTests.java | 75 ++++++++++ .../IndexLevelReplicationTests.java | 66 +++++---- .../RecoveryDuringReplicationTests.java | 17 ++- .../index/shard/IndexShardTests.java | 8 +- .../recovery/RecoverySourceHandlerTests.java | 6 - .../indices/recovery/RecoveryTests.java | 24 ++-- .../indices/stats/IndexStatsIT.java | 6 +- .../index/engine/EngineTestCase.java | 7 +- .../ESIndexLevelReplicationTestCase.java | 23 +++- .../index/shard/IndexShardTestCase.java | 7 +- 25 files changed, 652 insertions(+), 189 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java create mode 100644 server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java diff --git a/docs/reference/indices/flush.asciidoc b/docs/reference/indices/flush.asciidoc index db1f7c2fe00a9..0e7833a6f847e 100644 --- a/docs/reference/indices/flush.asciidoc +++ b/docs/reference/indices/flush.asciidoc @@ -101,7 +101,8 @@ which returns something similar to: "translog_generation" : "2", "max_seq_no" : "-1", "sync_id" : "AVvFY-071siAOuFGEO9P", <1> - "max_unsafe_auto_id_timestamp" : "-1" + "max_unsafe_auto_id_timestamp" : "-1", + "min_retained_seq_no": "0" }, "num_docs" : 0 } diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 774224cd29c18..ddb374fa50cf1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -46,14 +46,17 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { private final Logger logger; private final TranslogDeletionPolicy translogDeletionPolicy; + private final SoftDeletesPolicy softDeletesPolicy; private final LongSupplier globalCheckpointSupplier; private final ObjectIntHashMap snapshottedCommits; // Number of snapshots held against each commit point. private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint. private volatile IndexCommit lastCommit; // the most recent commit point - CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier) { + CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy, + SoftDeletesPolicy softDeletesPolicy, LongSupplier globalCheckpointSupplier) { this.logger = logger; this.translogDeletionPolicy = translogDeletionPolicy; + this.softDeletesPolicy = softDeletesPolicy; this.globalCheckpointSupplier = globalCheckpointSupplier; this.snapshottedCommits = new ObjectIntHashMap<>(); } @@ -80,7 +83,7 @@ public synchronized void onCommit(List commits) throws IO deleteCommit(commits.get(i)); } } - updateTranslogDeletionPolicy(); + updateRetentionPolicy(); } private void deleteCommit(IndexCommit commit) throws IOException { @@ -90,7 +93,7 @@ private void deleteCommit(IndexCommit commit) throws IOException { assert commit.isDeleted() : "Deletion commit [" + commitDescription(commit) + "] was suppressed"; } - private void updateTranslogDeletionPolicy() throws IOException { + private void updateRetentionPolicy() throws IOException { assert Thread.holdsLock(this); logger.debug("Safe commit [{}], last commit [{}]", commitDescription(safeCommit), commitDescription(lastCommit)); assert safeCommit.isDeleted() == false : "The safe commit must not be deleted"; @@ -101,6 +104,9 @@ private void updateTranslogDeletionPolicy() throws IOException { assert minRequiredGen <= lastGen : "minRequiredGen must not be greater than lastGen"; translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastGen); translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen); + + softDeletesPolicy.setLocalCheckpointOfSafeCommit( + Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY))); } /** diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index a07ed4e3177c9..d5cd8bad22832 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -98,6 +98,7 @@ public abstract class Engine implements Closeable { public static final String SYNC_COMMIT_ID = "sync_id"; public static final String HISTORY_UUID_KEY = "history_uuid"; + public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no"; protected final ShardId shardId; protected final String allocationId; @@ -578,7 +579,10 @@ public enum SearcherScope { public abstract void syncTranslog() throws IOException; - public abstract Closeable acquireTranslogRetentionLock(); + /** + * Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed + */ + public abstract Closeable acquireRetentionLockForPeerRecovery(); /** * Creates a new translog snapshot from this engine for reading translog operations whose seq# in the provided range. @@ -586,11 +590,6 @@ public enum SearcherScope { */ public abstract Translog.Snapshot newTranslogSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException; - /** - * Returns the estimated number of translog operations in this engine whose seq# at least the provided seq#. - */ - public abstract int estimateTranslogOperationsFromMinSeq(long minSeqNo); - public abstract TranslogStats getTranslogStats(); /** @@ -604,6 +603,19 @@ public enum SearcherScope { public abstract Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService mapperService, long minSeqNo, long maxSeqNo, boolean requiredFullRange) throws IOException; + /** + * Creates a new history snapshot for reading operations since the provided seqno. + * The returned snapshot can be retrieved from either Lucene index or translog files. + */ + public abstract Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException; + + /** + * Returns the estimated number of history operations whose seq# at least the provided seq# in this engine. + */ + public abstract int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException; + + public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException; + protected final void ensureOpen(Exception suppressed) { if (isClosed.get()) { AlreadyClosedException ace = new AlreadyClosedException(shardId + " engine is closed", failedEngine.get()); diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 2bb3aa54b6303..c7b77a4e9337d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -22,7 +22,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.document.Field; -import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; @@ -38,7 +37,6 @@ import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.Query; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.SearcherFactory; import org.apache.lucene.search.SearcherManager; @@ -157,6 +155,7 @@ public class InternalEngine extends Engine { private final CounterMetric numDocUpdates = new CounterMetric(); private final NumericDocValuesField softDeleteField = Lucene.newSoftDeleteField(); private final boolean softDeleteEnabled; + private final SoftDeletesPolicy softDeletesPolicy; private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener; /** @@ -182,7 +181,6 @@ public InternalEngine(EngineConfig engineConfig) { maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE); } this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME; - this.softDeleteEnabled = engineConfig.getIndexSettings().isSoftDeleteEnabled(); final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis() @@ -204,8 +202,10 @@ public InternalEngine(EngineConfig engineConfig) { assert translog.getGeneration() != null; this.translog = translog; this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier); + this.softDeleteEnabled = engineConfig.getIndexSettings().isSoftDeleteEnabled(); + this.softDeletesPolicy = newSoftDeletesPolicy(); this.combinedDeletionPolicy = - new CombinedDeletionPolicy(logger, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint); + new CombinedDeletionPolicy(logger, translogDeletionPolicy, softDeletesPolicy, translog::getLastSyncedGlobalCheckpoint); writer = createWriter(); bootstrapAppendOnlyInfoFromWriter(writer); historyUUID = loadHistoryUUID(writer); @@ -262,6 +262,18 @@ private LocalCheckpointTracker createLocalCheckpointTracker( return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint); } + private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException { + final Map commitUserData = store.readLastCommittedSegmentsInfo().userData; + final long lastMinRetainedSeqNo; + if (commitUserData.containsKey(Engine.MIN_RETAINED_SEQNO)) { + lastMinRetainedSeqNo = Long.parseLong(commitUserData.get(Engine.MIN_RETAINED_SEQNO)); + } else { + lastMinRetainedSeqNo = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO)) + 1; + } + return new SoftDeletesPolicy(translog::getLastSyncedGlobalCheckpoint, lastMinRetainedSeqNo, + engineConfig.getIndexSettings().getSoftDeleteRetentionOperations()); + } + /** * This reference manager delegates all it's refresh calls to another (internal) SearcherManager * The main purpose for this is that if we have external refreshes happening we don't issue extra @@ -481,18 +493,39 @@ public void syncTranslog() throws IOException { } @Override - public Closeable acquireTranslogRetentionLock() { - return getTranslog().acquireRetentionLock(); + public Translog.Snapshot newTranslogSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException { + return getTranslog().getSnapshotBetween(minSeqNo, maxSeqNo); } + /** + * Creates a new history snapshot for reading operations since the provided seqno. + * The returned snapshot can be retrieved from either Lucene index or translog files. + */ @Override - public Translog.Snapshot newTranslogSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException { - return getTranslog().getSnapshotBetween(minSeqNo, maxSeqNo); + public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException { + if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { + return newLuceneChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false); + } else { + return getTranslog().getSnapshotBetween(startingSeqNo, Long.MAX_VALUE); + } } + /** + * Returns the estimated number of history operations whose seq# at least the provided seq# in this engine. + */ @Override - public int estimateTranslogOperationsFromMinSeq(long minSeqNo) { - return getTranslog().estimateTotalOperationsFromMinSeq(minSeqNo); + public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException { + if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { + try (Translog.Snapshot snapshot = + newLuceneChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false)) { + return snapshot.totalOperations(); + } catch (IOException ex) { + maybeFailEngine(source, ex); + throw ex; + } + } else { + return getTranslog().estimateTotalOperationsFromMinSeq(startingSeqNo); + } } @Override @@ -2127,8 +2160,8 @@ private IndexWriterConfig getIndexWriterConfig() { MergePolicy mergePolicy = config().getMergePolicy(); if (softDeleteEnabled) { iwc.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD); - mergePolicy = new RecoverySourcePruneMergePolicy(SourceFieldMapper.RECOVERY_SOURCE_NAME, this::softDeletesRetentionQuery, - new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, this::softDeletesRetentionQuery, mergePolicy)); + mergePolicy = new RecoverySourcePruneMergePolicy(SourceFieldMapper.RECOVERY_SOURCE_NAME, softDeletesPolicy::getRetentionQuery, + new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, softDeletesPolicy::getRetentionQuery, mergePolicy)); } iwc.setMergePolicy(new ElasticsearchMergePolicy(mergePolicy)); iwc.setSimilarity(engineConfig.getSimilarity()); @@ -2141,20 +2174,6 @@ private IndexWriterConfig getIndexWriterConfig() { return iwc; } - /** - * Documents including tombstones are soft-deleted and matched this query will be retained and won't cleaned up by merges. - */ - private Query softDeletesRetentionQuery() { - ensureOpen(); - // TODO: We send the safe commit in peer-recovery, thus we need to retain all operations after the local checkpoint of that commit. - final long retainedExtraOps = engineConfig.getIndexSettings().getSoftDeleteRetentionOperations(); - // Prefer using the global checkpoint which is persisted on disk than an in-memory value. - // If we failed to fsync checkpoint but already used a higher global checkpoint value to clean up soft-deleted ops, - // then we may not have all required operations whose seq# greater than the global checkpoint after restarted. - final long persistedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint(); - return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, persistedGlobalCheckpoint + 1 - retainedExtraOps, Long.MAX_VALUE); - } - /** Extended SearcherFactory that warms the segments if needed when acquiring a new searcher */ static final class SearchFactory extends EngineSearcherFactory { private final Engine.Warmer warmer; @@ -2341,6 +2360,9 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo())); commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); commitData.put(HISTORY_UUID_KEY, historyUUID); + if (softDeleteEnabled) { + commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo())); + } logger.trace("committing writer with commit data [{}]", commitData); return commitData.entrySet().iterator(); }); @@ -2396,6 +2418,8 @@ public void onSettingsChanged() { final IndexSettings indexSettings = engineConfig.getIndexSettings(); translogDeletionPolicy.setRetentionAgeInMillis(indexSettings.getTranslogRetentionAge().getMillis()); translogDeletionPolicy.setRetentionSizeInBytes(indexSettings.getTranslogRetentionSize().getBytes()); + + softDeletesPolicy.setRetentionOperations(indexSettings.getSoftDeleteRetentionOperations()); } public MergeStats getMergeStats() { @@ -2509,6 +2533,41 @@ public Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService m } } + @Override + public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException { + if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { + return getMinRetainedSeqNo() <= startingSeqNo; + } else { + final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint(); + final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); + try (Translog.Snapshot snapshot = getTranslog().getSnapshotBetween(startingSeqNo, Long.MAX_VALUE)) { + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { + tracker.markSeqNoAsCompleted(operation.seqNo()); + } + } + } + return tracker.getCheckpoint() >= currentLocalCheckpoint; + } + } + + /** + * Returns the minimum seqno that is retained in the Lucene index. + * Operations whose seq# are at least this value should exist in the Lucene index. + */ + final long getMinRetainedSeqNo() { + assert softDeleteEnabled : Thread.currentThread().getName(); + return softDeletesPolicy.getMinRetainedSeqNo(); + } + + @Override + public Closeable acquireRetentionLockForPeerRecovery() { + final Closeable translogLock = translog.acquireRetentionLock(); + final Releasable softDeletesLock = softDeletesPolicy.acquireRetentionLock(); + return () -> IOUtils.close(translogLock, softDeletesLock); + } + @Override public boolean isRecovering() { return pendingTranslogRecovery.get(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java index c463e9fc2f3e5..2caf1e67a9ebd 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java @@ -116,7 +116,7 @@ public int totalOperations() { } @Override - public int overriddenOperations() { + public int skippedOperations() { return skippedOperations; } diff --git a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java new file mode 100644 index 0000000000000..af2ded8c46620 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java @@ -0,0 +1,120 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.search.Query; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.translog.Translog; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.LongSupplier; + +/** + * A policy that controls how many soft-deleted documents should be retained for peer-recovery and querying history changes purpose. + */ +final class SoftDeletesPolicy { + private final LongSupplier globalCheckpointSupplier; + private long localCheckpointOfSafeCommit; + // This lock count is used to prevent `minRetainedSeqNo` from advancing. + private int retentionLockCount; + // The extra number of operations before the global checkpoint are retained + private long retentionOperations; + // The min seq_no value that is retained - ops after this seq# should exist in the Lucene index. + private long minRetainedSeqNo; + + SoftDeletesPolicy(LongSupplier globalCheckpointSupplier, long minRetainedSeqNo, long retentionOperations) { + this.globalCheckpointSupplier = globalCheckpointSupplier; + this.retentionOperations = retentionOperations; + this.minRetainedSeqNo = minRetainedSeqNo; + this.localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED; + this.retentionLockCount = 0; + } + + /** + * Updates the number of soft-deleted documents prior to the global checkpoint to be retained + * See {@link org.elasticsearch.index.IndexSettings#INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING} + */ + synchronized void setRetentionOperations(long retentionOperations) { + this.retentionOperations = retentionOperations; + } + + /** + * Sets the local checkpoint of the current safe commit + */ + synchronized void setLocalCheckpointOfSafeCommit(long newCheckpoint) { + if (newCheckpoint < this.localCheckpointOfSafeCommit) { + throw new IllegalArgumentException("Local checkpoint can't go backwards; " + + "new checkpoint [" + newCheckpoint + "]," + "current checkpoint [" + localCheckpointOfSafeCommit + "]"); + } + this.localCheckpointOfSafeCommit = newCheckpoint; + } + + /** + * Acquires a lock on soft-deleted documents to prevent them from cleaning up in merge processes. This is necessary to + * make sure that all operations that are being retained will be retained until the lock is released. + * This is a analogy to the translog's retention lock; see {@link Translog#acquireRetentionLock()} + */ + synchronized Releasable acquireRetentionLock() { + assert retentionLockCount >= 0 : "Invalid number of retention locks [" + retentionLockCount + "]"; + retentionLockCount++; + final AtomicBoolean released = new AtomicBoolean(); + return () -> { + if (released.compareAndSet(false, true)) { + releaseRetentionLock(); + } + }; + } + + private synchronized void releaseRetentionLock() { + assert retentionLockCount > 0 : "Invalid number of retention locks [" + retentionLockCount + "]"; + retentionLockCount--; + } + + /** + * Returns the min seqno that is retained in the Lucene index. + * Operations whose seq# is least this value should exist in the Lucene index. + */ + synchronized long getMinRetainedSeqNo() { + // Do not advance if the retention lock is held + if (retentionLockCount == 0) { + // This policy retains operations for two purposes: peer-recovery and querying changes history. + // - Peer-recovery is driven by the local checkpoint of the safe commit. In peer-recovery, the primary transfers a safe commit, + // then sends ops after the local checkpoint of that commit. This requires keeping all ops after localCheckpointOfSafeCommit; + // - Changes APIs are driven the combination of the global checkpoint and retention ops. Here we prefer using the global + // checkpoint instead of max_seqno because only operations up to the global checkpoint are exposed in the the changes APIs. + final long minSeqNoForQueryingChanges = globalCheckpointSupplier.getAsLong() - retentionOperations; + final long minSeqNoToRetain = Math.min(minSeqNoForQueryingChanges, localCheckpointOfSafeCommit) + 1; + // This can go backward as the retentionOperations value can be changed in settings. + minRetainedSeqNo = Math.max(minRetainedSeqNo, minSeqNoToRetain); + } + return minRetainedSeqNo; + } + + /** + * Returns a soft-deletes retention query that will be used in {@link org.apache.lucene.index.SoftDeletesRetentionMergePolicy} + * Documents including tombstones are soft-deleted and matched this query will be retained and won't cleaned up by merges. + */ + Query getRetentionQuery() { + return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getMinRetainedSeqNo(), Long.MAX_VALUE); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index a77ebac49bfab..6ef559c5adeb8 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1628,10 +1628,10 @@ public void onSettingsChanged() { } /** - * Acquires a lock on the translog files, preventing them from being trimmed. + * Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed */ - public Closeable acquireTranslogRetentionLock() { - return getEngine().acquireTranslogRetentionLock(); + public Closeable acquireRetentionLockForPeerRecovery() { + return getEngine().acquireRetentionLockForPeerRecovery(); } /** @@ -1639,14 +1639,31 @@ public Closeable acquireTranslogRetentionLock() { * The caller has to close the returned snapshot after finishing the reading. */ public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException { + // TODO: Remove this method after primary-replica resync use soft-deletes return getEngine().newTranslogSnapshotBetween(minSeqNo, Long.MAX_VALUE); } /** - * Returns the estimated number of operations in translog whose seq# at least the provided seq#. + * Returns the estimated number of history operations whose seq# at least the provided seq# in this shard. */ - public int estimateTranslogOperationsFromMinSeq(long minSeqNo) { - return getEngine().estimateTranslogOperationsFromMinSeq(minSeqNo); + public int estimateNumberOfHistoryOperations(String source, long startingSeqNo) throws IOException { + return getEngine().estimateNumberOfHistoryOperations(source, mapperService, startingSeqNo); + } + + /** + * Creates a new history snapshot for reading operations since the provided starting seqno (inclusive). + * The returned snapshot can be retrieved from either Lucene index or translog files. + */ + public Translog.Snapshot getHistoryOperations(String source, long startingSeqNo) throws IOException { + return getEngine().readHistoryOperations(source, mapperService, startingSeqNo); + } + + /** + * Checks if we have a completed history of operations since the given starting seqno (inclusive). + * This method should be called after acquiring the retention lock; See {@link #acquireRetentionLockForPeerRecovery()} + */ + public boolean hasCompleteHistoryOperations(String source, long startingSeqNo) throws IOException { + return getEngine().hasCompleteOperationHistory(source, mapperService, startingSeqNo); } /** diff --git a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index b39ebd51f2bc8..22fc3c4575ea6 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -83,6 +83,7 @@ public void resync(final IndexShard indexShard, final ActionListener ActionListener resyncListener = null; try { final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1; + // TODO: A follow-up to make resync using soft-deletes Translog.Snapshot snapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo); final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo(); resyncListener = new ActionListener() { diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index b89b21c52588a..85f9da89917df 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -40,6 +40,7 @@ import java.nio.file.StandardOpenOption; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.LongSupplier; @@ -202,9 +203,23 @@ private synchronized boolean assertNoSeqNumberConflict(long seqNo, BytesReferenc if (previous.v1().equals(data) == false) { Translog.Operation newOp = Translog.readOperation(new BufferedChecksumStreamInput(data.streamInput())); Translog.Operation prvOp = Translog.readOperation(new BufferedChecksumStreamInput(previous.v1().streamInput())); - throw new AssertionError( - "seqNo [" + seqNo + "] was processed twice in generation [" + generation + "], with different data. " + - "prvOp [" + prvOp + "], newOp [" + newOp + "]", previous.v2()); + // TODO: We haven't had timestamp for Index operations in Lucene yet, we need to loosen this check without timestamp. + final boolean sameOp; + if (newOp instanceof Translog.Index && prvOp instanceof Translog.Index) { + final Translog.Index o1 = (Translog.Index) newOp; + final Translog.Index o2 = (Translog.Index) prvOp; + sameOp = Objects.equals(o1.id(), o2.id()) && Objects.equals(o1.type(), o2.type()) + && Objects.equals(o1.source(), o2.source()) && Objects.equals(o1.routing(), o2.routing()) + && o1.primaryTerm() == o2.primaryTerm() && o1.seqNo() == o2.seqNo() + && o1.version() == o2.version() && o1.versionType() == o2.versionType(); + } else { + sameOp = false; + } + if (sameOp == false) { + throw new AssertionError( + "seqNo [" + seqNo + "] was processed twice in generation [" + generation + "], with different data. " + + "prvOp [" + prvOp + "], newOp [" + newOp + "]", previous.v2()); + } } } else { seenSequenceNumbers.put(seqNo, diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 45500349865f7..b504cf40f5817 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -146,11 +146,11 @@ public RecoveryResponse recoverToTarget() throws IOException { assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger); - try (Closeable ignored = shard.acquireTranslogRetentionLock()) { + try (Closeable ignored = shard.acquireRetentionLockForPeerRecovery()) { final long startingSeqNo; final long requiredSeqNoRangeStart; final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && - isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery(); + isTargetSameHistory() && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo()); if (isSequenceNumberBasedRecovery) { logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); startingSeqNo = request.startingSeqNo(); @@ -164,12 +164,13 @@ public RecoveryResponse recoverToTarget() throws IOException { } // we set this to 0 to create a translog roughly according to the retention policy // on the target. Note that it will still filter out legacy operations with no sequence numbers - startingSeqNo = 0; + startingSeqNo = 0; //TODO: A follow-up to send only ops above the local checkpoint if soft-deletes enabled. // but we must have everything above the local checkpoint in the commit requiredSeqNoRangeStart = Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; try { - phase1(phase1Snapshot.getIndexCommit(), () -> shard.estimateTranslogOperationsFromMinSeq(startingSeqNo)); + final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo); + phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e); } finally { @@ -186,7 +187,8 @@ public RecoveryResponse recoverToTarget() throws IOException { try { // For a sequence based recovery, the target can keep its local translog - prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, shard.estimateTranslogOperationsFromMinSeq(startingSeqNo)); + prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, + shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo)); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e); } @@ -207,11 +209,13 @@ public RecoveryResponse recoverToTarget() throws IOException { */ cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); - logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo); - - logger.trace("snapshot translog for recovery; current size is [{}]", shard.estimateTranslogOperationsFromMinSeq(startingSeqNo)); + if (logger.isTraceEnabled()) { + logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo); + logger.trace("snapshot translog for recovery; current size is [{}]", + shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo)); + } final long targetLocalCheckpoint; - try(Translog.Snapshot snapshot = shard.newTranslogSnapshotFromMinSeqNo(startingSeqNo)) { + try (Translog.Snapshot snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo)) { targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot); } catch (Exception e) { throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e); @@ -268,36 +272,6 @@ public void onFailure(Exception e) { }); } - /** - * Determines if the source translog is ready for a sequence-number-based peer recovery. The main condition here is that the source - * translog contains all operations above the local checkpoint on the target. We already know the that translog contains or will contain - * all ops above the source local checkpoint, so we can stop check there. - * - * @return {@code true} if the source is ready for a sequence-number-based recovery - * @throws IOException if an I/O exception occurred reading the translog snapshot - */ - boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException { - final long startingSeqNo = request.startingSeqNo(); - assert startingSeqNo >= 0; - final long localCheckpoint = shard.getLocalCheckpoint(); - logger.trace("testing sequence numbers in range: [{}, {}]", startingSeqNo, localCheckpoint); - // the start recovery request is initialized with the starting sequence number set to the target shard's local checkpoint plus one - if (startingSeqNo - 1 <= localCheckpoint) { - final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); - try (Translog.Snapshot snapshot = shard.newTranslogSnapshotFromMinSeqNo(startingSeqNo)) { - Translog.Operation operation; - while ((operation = snapshot.next()) != null) { - if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { - tracker.markSeqNoAsCompleted(operation.seqNo()); - } - } - } - return tracker.getCheckpoint() >= localCheckpoint; - } else { - return false; - } - } - /** * Perform phase1 of the recovery operations. Once this {@link IndexCommit} * snapshot has been performed no commit operations (files being fsync'd) diff --git a/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index 154d702e7fb77..861ec78d21f39 100644 --- a/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -39,6 +39,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; @@ -396,7 +397,8 @@ public void testReuseInFileBasedPeerRecovery() throws Exception { .get(); logger.info("--> indexing docs"); - for (int i = 0; i < randomIntBetween(1, 1024); i++) { + int numDocs = randomIntBetween(1, 1024); + for (int i = 0; i < numDocs; i++) { client(primaryNode).prepareIndex("test", "type").setSource("field", "value").execute().actionGet(); } @@ -418,12 +420,15 @@ public void testReuseInFileBasedPeerRecovery() throws Exception { } logger.info("--> restart replica node"); + boolean softDeleteEnabled = internalCluster().getInstance(IndicesService.class, primaryNode) + .indexServiceSafe(resolveIndex("test")).getShard(0).indexSettings().isSoftDeleteEnabled(); + int moreDocs = randomIntBetween(1, 1024); internalCluster().restartNode(replicaNode, new RestartCallback() { @Override public Settings onNodeStopped(String nodeName) throws Exception { // index some more documents; we expect to reuse the files that already exist on the replica - for (int i = 0; i < randomIntBetween(1, 1024); i++) { + for (int i = 0; i < moreDocs; i++) { client(primaryNode).prepareIndex("test", "type").setSource("field", "value").execute().actionGet(); } @@ -431,8 +436,12 @@ public Settings onNodeStopped(String nodeName) throws Exception { client(primaryNode).admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder() .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1") .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1") + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0) ).get(); client(primaryNode).admin().indices().prepareFlush("test").setForce(true).get(); + if (softDeleteEnabled) { // We need an extra flush to advance the min_retained_seqno of the SoftDeletesPolicy + client(primaryNode).admin().indices().prepareFlush("test").setForce(true).get(); + } return super.onNodeStopped(nodeName); } }); @@ -472,7 +481,9 @@ public Settings onNodeStopped(String nodeName) throws Exception { assertThat("all existing files should be reused, file count mismatch", recoveryState.getIndex().reusedFileCount(), equalTo(filesReused)); assertThat(recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount() - filesRecovered)); assertThat("> 0 files should be reused", recoveryState.getIndex().reusedFileCount(), greaterThan(0)); - assertThat("no translog ops should be recovered", recoveryState.getTranslog().recoveredOperations(), equalTo(0)); + // both cases will be zero once we start sending only ops after local checkpoint of the safe commit + int expectedTranslogOps = softDeleteEnabled ? numDocs + moreDocs : 0; + assertThat("no translog ops should be recovered", recoveryState.getTranslog().recoveredOperations(), equalTo(expectedTranslogOps)); } } } diff --git a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java index 50719a1c3fed0..18fd9b08a5101 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; @@ -289,7 +290,7 @@ public void testAsyncTranslogTrimActuallyWorks() throws Exception { .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), -1)) .get(); IndexShard shard = indexService.getShard(0); - assertBusy(() -> assertThat(shard.estimateTranslogOperationsFromMinSeq(0L), equalTo(0))); + assertBusy(() -> assertThat(IndexShardTestCase.getTranslog(shard).totalOperations(), equalTo(0))); } public void testIllegalFsyncInterval() { diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index 5ea55183c610e..d74b9b41a8867 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -52,20 +52,24 @@ public class CombinedDeletionPolicyTests extends ESTestCase { public void testKeepCommitsAfterGlobalCheckpoint() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(); + final int extraRetainedOps = between(0, 100); + final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); final LongArrayList maxSeqNoList = new LongArrayList(); final LongArrayList translogGenList = new LongArrayList(); final List commitList = new ArrayList<>(); int totalCommits = between(2, 20); long lastMaxSeqNo = 0; + long lastCheckpoint = lastMaxSeqNo; long lastTranslogGen = 0; final UUID translogUUID = UUID.randomUUID(); for (int i = 0; i < totalCommits; i++) { lastMaxSeqNo += between(1, 10000); + lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo); lastTranslogGen += between(1, 100); - commitList.add(mockIndexCommit(lastMaxSeqNo, translogUUID, lastTranslogGen)); + commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID, lastTranslogGen)); maxSeqNoList.add(lastMaxSeqNo); translogGenList.add(lastTranslogGen); } @@ -86,14 +90,19 @@ public void testKeepCommitsAfterGlobalCheckpoint() throws Exception { } assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(translogGenList.get(keptIndex))); assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen)); + assertThat(softDeletesPolicy.getMinRetainedSeqNo(), + equalTo(Math.min(getLocalCheckpoint(commitList.get(keptIndex)) + 1, globalCheckpoint.get() + 1 - extraRetainedOps))); } public void testAcquireIndexCommit() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(); + final int extraRetainedOps = between(0, 100); + final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps); final UUID translogUUID = UUID.randomUUID(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); long lastMaxSeqNo = between(1, 1000); + long lastCheckpoint = randomLongBetween(-1, lastMaxSeqNo); long lastTranslogGen = between(1, 20); int safeIndex = 0; List commitList = new ArrayList<>(); @@ -103,8 +112,9 @@ public void testAcquireIndexCommit() throws Exception { int newCommits = between(1, 10); for (int n = 0; n < newCommits; n++) { lastMaxSeqNo += between(1, 1000); + lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo); lastTranslogGen += between(1, 20); - commitList.add(mockIndexCommit(lastMaxSeqNo, translogUUID, lastTranslogGen)); + commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID, lastTranslogGen)); } // Advance the global checkpoint to between [safeIndex, safeIndex + 1) safeIndex = randomIntBetween(safeIndex, commitList.size() - 1); @@ -115,6 +125,9 @@ public void testAcquireIndexCommit() throws Exception { globalCheckpoint.set(randomLongBetween(lower, upper)); commitList.forEach(this::resetDeletion); indexPolicy.onCommit(commitList); + IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commitList, globalCheckpoint.get()); + assertThat(softDeletesPolicy.getMinRetainedSeqNo(), + equalTo(Math.min(getLocalCheckpoint(safeCommit) + 1, globalCheckpoint.get() + 1 - extraRetainedOps))); // Captures and releases some commits int captures = between(0, 5); for (int n = 0; n < captures; n++) { @@ -133,7 +146,7 @@ public void testAcquireIndexCommit() throws Exception { snapshottingCommits.remove(snapshot); final long pendingSnapshots = snapshottingCommits.stream().filter(snapshot::equals).count(); final IndexCommit lastCommit = commitList.get(commitList.size() - 1); - final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commitList, globalCheckpoint.get()); + safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commitList, globalCheckpoint.get()); assertThat(indexPolicy.releaseCommit(snapshot), equalTo(pendingSnapshots == 0 && snapshot.equals(lastCommit) == false && snapshot.equals(safeCommit) == false)); } @@ -144,6 +157,8 @@ public void testAcquireIndexCommit() throws Exception { equalTo(Long.parseLong(commitList.get(safeIndex).getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(Long.parseLong(commitList.get(commitList.size() - 1).getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); + assertThat(softDeletesPolicy.getMinRetainedSeqNo(), + equalTo(Math.min(getLocalCheckpoint(commitList.get(safeIndex)) + 1, globalCheckpoint.get() + 1 - extraRetainedOps))); } snapshottingCommits.forEach(indexPolicy::releaseCommit); globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE)); @@ -155,25 +170,27 @@ public void testAcquireIndexCommit() throws Exception { assertThat(commitList.get(commitList.size() - 1).isDeleted(), equalTo(false)); assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen)); assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen)); + IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commitList, globalCheckpoint.get()); + assertThat(softDeletesPolicy.getMinRetainedSeqNo(), + equalTo(Math.min(getLocalCheckpoint(safeCommit) + 1, globalCheckpoint.get() + 1 - extraRetainedOps))); } public void testLegacyIndex() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(); + final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0); final UUID translogUUID = UUID.randomUUID(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); long legacyTranslogGen = randomNonNegativeLong(); IndexCommit legacyCommit = mockLegacyIndexCommit(translogUUID, legacyTranslogGen); - indexPolicy.onCommit(singletonList(legacyCommit)); - verify(legacyCommit, never()).delete(); - assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(legacyTranslogGen)); - assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(legacyTranslogGen)); + assertThat(CombinedDeletionPolicy.findSafeCommitPoint(singletonList(legacyCommit), globalCheckpoint.get()), + equalTo(legacyCommit)); long safeTranslogGen = randomLongBetween(legacyTranslogGen, Long.MAX_VALUE); long maxSeqNo = randomLongBetween(1, Long.MAX_VALUE); - final IndexCommit freshCommit = mockIndexCommit(maxSeqNo, translogUUID, safeTranslogGen); + final IndexCommit freshCommit = mockIndexCommit(randomLongBetween(-1, maxSeqNo), maxSeqNo, translogUUID, safeTranslogGen); globalCheckpoint.set(randomLongBetween(0, maxSeqNo - 1)); indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit)); @@ -190,20 +207,23 @@ public void testLegacyIndex() throws Exception { verify(freshCommit, times(0)).delete(); assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(safeTranslogGen)); assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen)); + assertThat(softDeletesPolicy.getMinRetainedSeqNo(), equalTo(getLocalCheckpoint(freshCommit) + 1)); } public void testKeepSingleNoOpsCommits() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(randomLong()); final UUID translogUUID = UUID.randomUUID(); + final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); final List commitList = new ArrayList<>(); final int numOfNoOpsCommits = between(1, 10); long lastNoopTranslogGen = 0; for (int i = 0; i < numOfNoOpsCommits; i++) { lastNoopTranslogGen += between(1, 20); - commitList.add(mockIndexCommit(SequenceNumbers.NO_OPS_PERFORMED, translogUUID, lastNoopTranslogGen)); + commitList.add( + mockIndexCommit(SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED, translogUUID, lastNoopTranslogGen)); } // Keep only one no_ops commit. indexPolicy.onCommit(commitList); @@ -220,7 +240,7 @@ public void testKeepSingleNoOpsCommits() throws Exception { for (int i = 0; i < numOfGoodCommits; i++) { maxSeqNo += between(1, 1000); lastTranslogGen += between(1, 20); - commitList.add(mockIndexCommit(maxSeqNo, translogUUID, lastTranslogGen)); + commitList.add(mockIndexCommit(maxSeqNo, maxSeqNo, translogUUID, lastTranslogGen)); } // If the global checkpoint is still unassigned, we should still keep one NO_OPS_PERFORMED commit. globalCheckpoint.set(SequenceNumbers.UNASSIGNED_SEQ_NO); @@ -244,21 +264,27 @@ public void testKeepSingleNoOpsCommits() throws Exception { public void testDeleteInvalidCommits() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong()); + final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); final int invalidCommits = between(1, 10); final List commitList = new ArrayList<>(); for (int i = 0; i < invalidCommits; i++) { - commitList.add(mockIndexCommit(randomNonNegativeLong(), UUID.randomUUID(), randomNonNegativeLong())); + long maxSeqNo = randomNonNegativeLong(); + commitList.add(mockIndexCommit(randomLongBetween(-1, maxSeqNo), maxSeqNo, UUID.randomUUID(), randomNonNegativeLong())); } final UUID expectedTranslogUUID = UUID.randomUUID(); long lastTranslogGen = 0; final int validCommits = between(1, 10); + long lastMaxSeqNo = between(1, 1000); + long lastCheckpoint = randomLongBetween(-1, lastMaxSeqNo); for (int i = 0; i < validCommits; i++) { lastTranslogGen += between(1, 1000); - commitList.add(mockIndexCommit(randomNonNegativeLong(), expectedTranslogUUID, lastTranslogGen)); + lastMaxSeqNo += between(1, 1000); + lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo); + commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, expectedTranslogUUID, lastTranslogGen)); } // We should never keep invalid commits regardless of the value of the global checkpoint. @@ -266,21 +292,26 @@ public void testDeleteInvalidCommits() throws Exception { for (int i = 0; i < invalidCommits - 1; i++) { verify(commitList.get(i), times(1)).delete(); } + assertThat(softDeletesPolicy.getMinRetainedSeqNo(), + equalTo(getLocalCheckpoint(CombinedDeletionPolicy.findSafeCommitPoint(commitList, globalCheckpoint.get())) + 1)); } public void testCheckUnreferencedCommits() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); + final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0); final UUID translogUUID = UUID.randomUUID(); final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); final List commitList = new ArrayList<>(); int totalCommits = between(2, 20); long lastMaxSeqNo = between(1, 1000); + long lastCheckpoint = randomLongBetween(-1, lastMaxSeqNo); long lastTranslogGen = between(1, 50); for (int i = 0; i < totalCommits; i++) { lastMaxSeqNo += between(1, 10000); lastTranslogGen += between(1, 100); - commitList.add(mockIndexCommit(lastMaxSeqNo, translogUUID, lastTranslogGen)); + lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo); + commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID, lastTranslogGen)); } IndexCommit safeCommit = randomFrom(commitList); globalCheckpoint.set(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO))); @@ -307,8 +338,9 @@ public void testCheckUnreferencedCommits() throws Exception { } } - IndexCommit mockIndexCommit(long maxSeqNo, UUID translogUUID, long translogGen) throws IOException { + IndexCommit mockIndexCommit(long localCheckpoint, long maxSeqNo, UUID translogUUID, long translogGen) throws IOException { final Map userData = new HashMap<>(); + userData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint)); userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID.toString()); userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGen)); @@ -329,6 +361,10 @@ void resetDeletion(IndexCommit commit) { }).when(commit).delete(); } + private long getLocalCheckpoint(IndexCommit commit) throws IOException { + return Long.parseLong(commit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + } + IndexCommit mockLegacyIndexCommit(UUID translogUUID, long translogGen) throws IOException { final Map userData = new HashMap<>(); userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID.toString()); @@ -338,4 +374,5 @@ IndexCommit mockLegacyIndexCommit(UUID translogUUID, long translogGen) throws IO resetDeletion(commit); return commit; } + } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index cdda55dd54f65..4ceb70313509a 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -127,6 +127,7 @@ import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; +import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.charset.Charset; @@ -253,9 +254,13 @@ public void testVersionMapAfterAutoIDDocument() throws IOException { } public void testSegments() throws Exception { - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + Settings settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false).build(); + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( + IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build()); try (Store store = createStore(); - InternalEngine engine = createEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get))) { + InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null))) { List segments = engine.segments(false); assertThat(segments.isEmpty(), equalTo(true)); assertThat(engine.segmentsStats(false).getCount(), equalTo(0L)); @@ -327,8 +332,6 @@ public void testSegments() throws Exception { engine.delete(new Engine.Delete("test", "1", newUid(doc), primaryTerm.get())); - globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint()); - engine.getTranslog().sync(); engine.refresh("test"); segments = engine.segments(false); @@ -1360,18 +1363,27 @@ public void testForceMergeWithSoftDeletesRetention() throws Exception { engine.index(indexForDoc(doc)); liveDocs.add(doc.id()); } + if (randomBoolean()) { + engine.flush(randomBoolean(), true); + } } + engine.flush(); + long localCheckpoint = engine.getLocalCheckpoint(); globalCheckpoint.set(randomLongBetween(0, localCheckpoint)); - engine.getTranslog().sync(); + engine.syncTranslog(); + final long safeCommitCheckpoint; + try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) { + safeCommitCheckpoint = Long.parseLong(safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + } engine.forceMerge(true, 1, false, false, false); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService); Map ops = readAllOperationsInLucene(engine, mapperService) .stream().collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity())); for (long seqno = 0; seqno <= localCheckpoint; seqno++) { - long keptIndex = globalCheckpoint.get() + 1 - retainedExtraOps; + long minSeqNoToRetain = Math.min(globalCheckpoint.get() + 1 - retainedExtraOps, safeCommitCheckpoint + 1); String msg = "seq# [" + seqno + "], global checkpoint [" + globalCheckpoint + "], retained-ops [" + retainedExtraOps + "]"; - if (seqno < keptIndex) { + if (seqno < minSeqNoToRetain) { Translog.Operation op = ops.get(seqno); if (op != null) { assertThat(op, instanceOf(Translog.Index.class)); @@ -1384,8 +1396,10 @@ public void testForceMergeWithSoftDeletesRetention() throws Exception { } settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0); indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build()); + engine.onSettingsChanged(); globalCheckpoint.set(localCheckpoint); - engine.getTranslog().sync(); + engine.syncTranslog(); + engine.forceMerge(true, 1, false, false, false); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService); assertThat(readAllOperationsInLucene(engine, mapperService), hasSize(liveDocs.size())); @@ -1414,7 +1428,6 @@ public void testForceMergeWithSoftDeletesRetentionAndRecoverySource() throws Exc engine.index(indexForDoc(doc)); liveDocs.add(doc.id()); } - engine.flush(); for (int i = 0; i < numDocs; i++) { ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null, randomBoolean() || omitSourceAllTheTime); @@ -1426,23 +1439,31 @@ public void testForceMergeWithSoftDeletesRetentionAndRecoverySource() throws Exc engine.index(indexForDoc(doc)); liveDocs.add(doc.id()); } + if (randomBoolean()) { + engine.flush(randomBoolean(), true); + } + } + engine.flush(); + globalCheckpoint.set(randomLongBetween(0, engine.getLocalCheckpoint())); + engine.syncTranslog(); + final long minSeqNoToRetain; + try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) { + long safeCommitLocalCheckpoint = Long.parseLong( + safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + minSeqNoToRetain = Math.min(globalCheckpoint.get() + 1 - retainedExtraOps, safeCommitLocalCheckpoint + 1); } - long localCheckpoint = engine.getLocalCheckpoint(); - globalCheckpoint.set(randomLongBetween(0, localCheckpoint)); - engine.getTranslog().sync(); - long keptIndex = globalCheckpoint.get() + 1 - retainedExtraOps; engine.forceMerge(true, 1, false, false, false); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService, (luceneOp, translogOp) -> { - if (luceneOp.seqNo() >= keptIndex) { + if (luceneOp.seqNo() >= minSeqNoToRetain) { assertNotNull(luceneOp.getSource()); assertThat(luceneOp.getSource().source, equalTo(translogOp.getSource().source)); } }); Map ops = readAllOperationsInLucene(engine, mapperService) .stream().collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity())); - for (long seqno = 0; seqno <= localCheckpoint; seqno++) { + for (long seqno = 0; seqno <= engine.getLocalCheckpoint(); seqno++) { String msg = "seq# [" + seqno + "], global checkpoint [" + globalCheckpoint + "], retained-ops [" + retainedExtraOps + "]"; - if (seqno < keptIndex) { + if (seqno < minSeqNoToRetain) { Translog.Operation op = ops.get(seqno); if (op != null) { assertThat(op, instanceOf(Translog.Index.class)); @@ -1458,8 +1479,9 @@ public void testForceMergeWithSoftDeletesRetentionAndRecoverySource() throws Exc } settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0); indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build()); - globalCheckpoint.set(localCheckpoint); - engine.getTranslog().sync(); + engine.onSettingsChanged(); + globalCheckpoint.set(engine.getLocalCheckpoint()); + engine.syncTranslog(); engine.forceMerge(true, 1, false, false, false); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService, (luceneOp, translogOp) -> { assertEquals(translogOp.getSource().source, B_1); @@ -4862,6 +4884,76 @@ private void assertOperationHistoryInLucene(List operations) t } } + public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { + IOUtils.close(engine, store); + Settings.Builder settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), randomLongBetween(0, 10)); + final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final List operations = generateSingleDocHistory(true, + randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 10, 300, "2"); + Randomness.shuffle(operations); + Set existingSeqNos = new HashSet<>(); + store = createStore(); + engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get)); + assertThat(engine.getMinRetainedSeqNo(), equalTo(0L)); + long lastMinRetainedSeqNo = engine.getMinRetainedSeqNo(); + for (Engine.Operation op : operations) { + final Engine.Result result; + if (op instanceof Engine.Index) { + result = engine.index((Engine.Index) op); + } else { + result = engine.delete((Engine.Delete) op); + } + existingSeqNos.add(result.getSeqNo()); + if (randomBoolean()) { + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpointTracker().getCheckpoint())); + } + if (rarely()) { + settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), randomLongBetween(0, 10)); + indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build()); + engine.onSettingsChanged(); + } + if (rarely()) { + engine.refresh("test"); + } + if (rarely()) { + engine.flush(true, true); + assertThat(Long.parseLong(engine.getLastCommittedSegmentInfos().userData.get(Engine.MIN_RETAINED_SEQNO)), + equalTo(engine.getMinRetainedSeqNo())); + } + if (rarely()) { + engine.forceMerge(randomBoolean()); + } + try (Closeable ignored = engine.acquireRetentionLockForPeerRecovery()) { + long minRetainSeqNos = engine.getMinRetainedSeqNo(); + assertThat(minRetainSeqNos, lessThanOrEqualTo(globalCheckpoint.get() + 1)); + Long[] expectedOps = existingSeqNos.stream().filter(seqno -> seqno >= minRetainSeqNos).toArray(Long[]::new); + Set actualOps = readAllOperationsInLucene(engine, createMapperService("test")).stream() + .map(Translog.Operation::seqNo).collect(Collectors.toSet()); + assertThat(actualOps, containsInAnyOrder(expectedOps)); + } + try (Engine.IndexCommitRef commitRef = engine.acquireSafeIndexCommit()) { + IndexCommit safeCommit = commitRef.getIndexCommit(); + if (safeCommit.getUserData().containsKey(Engine.MIN_RETAINED_SEQNO)) { + lastMinRetainedSeqNo = Long.parseLong(safeCommit.getUserData().get(Engine.MIN_RETAINED_SEQNO)); + } + } + } + if (randomBoolean()) { + engine.close(); + } else { + engine.flushAndClose(); + } + trimUnsafeCommits(engine.config()); + try (InternalEngine recoveringEngine = new InternalEngine(engine.config())) { + assertThat(recoveringEngine.getMinRetainedSeqNo(), equalTo(lastMinRetainedSeqNo)); + } + } + private static void trimUnsafeCommits(EngineConfig config) throws IOException { final Store store = config.getStore(); final TranslogConfig translogConfig = config.getTranslogConfig(); diff --git a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java index 04b70ea5c0de6..f7275b0032a67 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java @@ -178,7 +178,7 @@ public void testDedupByPrimaryTerm() throws Exception { while ((op = snapshot.next()) != null) { assertThat(op.toString(), op.primaryTerm(), equalTo(latestOperations.get(op.seqNo()))); } - assertThat(snapshot.overriddenOperations(), equalTo(totalOps - latestOperations.size())); + assertThat(snapshot.skippedOperations(), equalTo(totalOps - latestOperations.size())); } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java new file mode 100644 index 0000000000000..f359010038284 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java @@ -0,0 +1,75 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import static org.hamcrest.Matchers.equalTo; + +public class SoftDeletesPolicyTests extends ESTestCase { + /** + * Makes sure we won't advance the retained seq# if the retention lock is held + */ + public void testSoftDeletesRetentionLock() { + long retainedOps = between(0, 10000); + AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + long safeCommitCheckpoint = globalCheckpoint.get(); + SoftDeletesPolicy policy = new SoftDeletesPolicy(globalCheckpoint::get, between(1, 10000), retainedOps); + long minRetainedSeqNo = policy.getMinRetainedSeqNo(); + List locks = new ArrayList<>(); + int iters = scaledRandomIntBetween(10, 1000); + for (int i = 0; i < iters; i++) { + if (randomBoolean()) { + locks.add(policy.acquireRetentionLock()); + } + // Advances the global checkpoint and the local checkpoint of a safe commit + globalCheckpoint.addAndGet(between(0, 1000)); + safeCommitCheckpoint = randomLongBetween(safeCommitCheckpoint, globalCheckpoint.get()); + policy.setLocalCheckpointOfSafeCommit(safeCommitCheckpoint); + if (rarely()) { + retainedOps = between(0, 10000); + policy.setRetentionOperations(retainedOps); + } + // Release some locks + List releasingLocks = randomSubsetOf(locks); + locks.removeAll(releasingLocks); + releasingLocks.forEach(Releasable::close); + + // We only expose the seqno to the merge policy if the retention lock is not held. + policy.getRetentionQuery(); + if (locks.isEmpty()) { + long retainedSeqNo = Math.min(safeCommitCheckpoint, globalCheckpoint.get() - retainedOps) + 1; + minRetainedSeqNo = Math.max(minRetainedSeqNo, retainedSeqNo); + } + assertThat(policy.getMinRetainedSeqNo(), equalTo(minRetainedSeqNo)); + } + + locks.forEach(Releasable::close); + long retainedSeqNo = Math.min(safeCommitCheckpoint, globalCheckpoint.get() - retainedOps) + 1; + minRetainedSeqNo = Math.max(minRetainedSeqNo, retainedSeqNo); + assertThat(policy.getMinRetainedSeqNo(), equalTo(minRetainedSeqNo)); + } +} diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 1a70de53f5f17..b4cc033ab40c4 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -38,10 +38,12 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.InternalEngineTests; import org.elasticsearch.index.engine.SegmentsStats; import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; @@ -136,7 +138,9 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa } public void testInheritMaxValidAutoIDTimestampOnRecovery() throws Exception { - try (ReplicationGroup shards = createGroup(0)) { + //TODO: Enables this test with soft-deletes once we have timestamp + Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false).build(); + try (ReplicationGroup shards = createGroup(0, settings)) { shards.startAll(); final IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON); indexRequest.onRetry(); // force an update of the timestamp @@ -239,18 +243,32 @@ public void testConflictingOpsOnReplica() throws Exception { * for primary and replica shards */ public void testDocumentFailureReplication() throws Exception { - final String failureMessage = "simulated document failure"; - final ThrowingDocumentFailureEngineFactory throwingDocumentFailureEngineFactory = - new ThrowingDocumentFailureEngineFactory(failureMessage); + String failureMessage = "simulated document failure"; + final EngineFactory failIndexingOpsEngine = new EngineFactory() { + @Override + public Engine newReadWriteEngine(EngineConfig config) { + return EngineTestCase.createInternalEngine((directory, writerConfig) -> + new IndexWriter(directory, writerConfig) { + @Override + public long addDocument(Iterable doc) throws IOException { + boolean isTombstone = false; + for (IndexableField field : doc) { + if (SeqNoFieldMapper.TOMBSTONE_NAME.equals(field.name())) { + isTombstone = true; + } + } + if (isTombstone) { + return super.addDocument(doc); + } else { + throw new IOException(failureMessage); + } + } + }, null, null, config); + } + }; try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(0)) { @Override - protected EngineFactory getEngineFactory(ShardRouting routing) { - if (routing.primary()){ - return throwingDocumentFailureEngineFactory; // Simulate exception only on the primary. - }else { - return InternalEngine::new; - } - }}) { + protected EngineFactory getEngineFactory(ShardRouting routing) { return failIndexingOpsEngine; }}) { // test only primary shards.startPrimary(); @@ -370,8 +388,9 @@ public void testSeqNoCollision() throws Exception { recoverReplica(replica3, replica2); try (Translog.Snapshot snapshot = getTranslog(replica3).newSnapshot()) { assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); - assertThat(snapshot.next(), equalTo(op2)); - assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations)); + final List expectedOps = new ArrayList<>(initOperations); + expectedOps.add(op2); + assertThat(snapshot, containsOperationsInAnyOrder(expectedOps)); assertThat("Peer-recovery should not send overridden operations", snapshot.skippedOperations(), equalTo(0)); } // TODO: We should assert the content of shards in the ReplicationGroup. @@ -447,27 +466,6 @@ public void testOutOfOrderDeliveryForAppendOnlyOperations() throws Exception { } } - /** Throws documentFailure on every indexing operation */ - static class ThrowingDocumentFailureEngineFactory implements EngineFactory { - final String documentFailureMessage; - - ThrowingDocumentFailureEngineFactory(String documentFailureMessage) { - this.documentFailureMessage = documentFailureMessage; - } - - @Override - public Engine newReadWriteEngine(EngineConfig config) { - return InternalEngineTests.createInternalEngine((directory, writerConfig) -> - new IndexWriter(directory, writerConfig) { - @Override - public long addDocument(Iterable doc) throws IOException { - assert documentFailureMessage != null; - throw new IOException(documentFailureMessage); - } - }, null, null, config); - } - } - private static void assertNoOpTranslogOperationForDocumentFailure( Iterable replicationGroup, int expectedOperation, diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index ee97ba14fe09e..bcde44832fcb8 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -98,7 +98,8 @@ public void testIndexingDuringFileRecovery() throws Exception { } public void testRecoveryOfDisconnectedReplica() throws Exception { - try (ReplicationGroup shards = createGroup(1)) { + Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false).build(); + try (ReplicationGroup shards = createGroup(1, settings)) { shards.startAll(); int docs = shards.indexDocs(randomInt(50)); shards.flush(); @@ -219,7 +220,8 @@ public void testRecoveryToReplicaThatReceivedExtraDocument() throws Exception { @TestLogging("org.elasticsearch.index.shard:TRACE,org.elasticsearch.indices.recovery:TRACE") public void testRecoveryAfterPrimaryPromotion() throws Exception { - try (ReplicationGroup shards = createGroup(2)) { + Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build(); + try (ReplicationGroup shards = createGroup(2, settings)) { shards.startAll(); int totalDocs = shards.indexDocs(randomInt(10)); int committedDocs = 0; @@ -231,6 +233,7 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { final IndexShard oldPrimary = shards.getPrimary(); final IndexShard newPrimary = shards.getReplicas().get(0); final IndexShard replica = shards.getReplicas().get(1); + boolean softDeleteEnabled = replica.indexSettings().isSoftDeleteEnabled(); if (randomBoolean()) { // simulate docs that were inflight when primary failed, these will be rolled back final int rollbackDocs = randomIntBetween(1, 5); @@ -267,6 +270,7 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { builder.settings(Settings.builder().put(newPrimary.indexSettings().getSettings()) .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1") .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1") + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0) ); newPrimary.indexSettings().updateIndexMetaData(builder.build()); newPrimary.onSettingsChanged(); @@ -276,9 +280,13 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { shards.syncGlobalCheckpoint(); assertThat(newPrimary.getLastSyncedGlobalCheckpoint(), equalTo(newPrimary.seqNoStats().getMaxSeqNo())); }); - newPrimary.flush(new FlushRequest()); + newPrimary.flush(new FlushRequest().force(true)); uncommittedOpsOnPrimary = shards.indexDocs(randomIntBetween(0, 10)); totalDocs += uncommittedOpsOnPrimary; + // we need an extra flush or refresh to advance the min_retained_seqno on the new primary so that ops-based won't happen + if (softDeleteEnabled) { + newPrimary.flush(new FlushRequest().force(true)); + } } if (randomBoolean()) { @@ -298,7 +306,8 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs - committedDocs)); } else { assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty())); - assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(uncommittedOpsOnPrimary)); + int expectOps = softDeleteEnabled ? totalDocs : uncommittedOpsOnPrimary; + assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(expectOps)); } // roll back the extra ops in the replica diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 34020466659c2..d1a1986344955 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2787,7 +2787,13 @@ public void testSegmentMemoryTrackedInBreaker() throws Exception { // Deleting a doc causes its memory to be freed from the breaker deleteDoc(primary, "_doc", "0"); - primary.sync(); // need to sync global checkpoint as the soft-deletes retention MergePolicy depends on it. + // Here we are testing that a fully deleted segment should be dropped and its memory usage is freed. + // In order to instruct the merge policy not to keep a fully deleted segment, + // we need to flush and make that commit safe so that the SoftDeletesPolicy can drop everything. + if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) { + primary.sync(); + flushShard(primary); + } primary.refresh("force refresh"); ss = primary.segmentStats(randomBoolean()); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index d3264451415d5..038cd4426b16d 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -411,12 +411,6 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE recoverySettings.getChunkSize().bytesAsInt(), Settings.EMPTY) { - - @Override - boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException { - return randomBoolean(); - } - @Override public void phase1(final IndexCommit snapshot, final Supplier translogOps) { phase1Called.set(true); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 537409f35d175..6ece124daeb79 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -69,7 +69,7 @@ public void testTranslogHistoryTransferred() throws Exception { shards.addReplica(); shards.startAll(); final IndexShard replica = shards.getReplicas().get(0); - assertThat(replica.estimateTranslogOperationsFromMinSeq(0), equalTo(docs)); + assertThat(getTranslog(replica).totalOperations(), equalTo(docs)); } } @@ -99,7 +99,7 @@ public void testRetentionPolicyChangeDuringRecovery() throws Exception { releaseRecovery.countDown(); future.get(); // rolling/flushing is async - assertBusy(() -> assertThat(replica.estimateTranslogOperationsFromMinSeq(0), equalTo(0))); + assertBusy(() -> assertThat(getTranslog(replica).totalOperations(), equalTo(0))); } } @@ -113,9 +113,10 @@ public void testRecoveryWithOutOfOrderDelete() throws Exception { * - flush (commit point has max_seqno 3, and local checkpoint 1 -> points at gen 2, previous commit point is maintained) * - index #2 * - index #5 - * - If flush and the translog retention disabled, delete #1 will be removed while index #0 is still retained and replayed. + * - If flush and the translog/lucene retention disabled, delete #1 will be removed while index #0 is still retained and replayed. */ - try (ReplicationGroup shards = createGroup(1)) { + Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10).build(); + try (ReplicationGroup shards = createGroup(1, settings)) { shards.startAll(); // create out of order delete and index op on replica final IndexShard orgReplica = shards.getReplicas().get(0); @@ -123,7 +124,7 @@ public void testRecoveryWithOutOfOrderDelete() throws Exception { // delete #1 orgReplica.applyDeleteOperationOnReplica(1, 2, "type", "id", VersionType.EXTERNAL); - getTranslog(orgReplica).rollGeneration(); // isolate the delete in it's own generation + orgReplica.flush(new FlushRequest().force(true)); // isolate delete#1 in its own translog generation and lucene segment // index #0 orgReplica.applyIndexOperationOnReplica(0, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(indexName, "type", "id", new BytesArray("{}"), XContentType.JSON)); @@ -143,16 +144,17 @@ public void testRecoveryWithOutOfOrderDelete() throws Exception { final int translogOps; if (randomBoolean()) { if (randomBoolean()) { - logger.info("--> flushing shard (translog will be trimmed)"); + logger.info("--> flushing shard (translog/soft-deletes will be trimmed)"); IndexMetaData.Builder builder = IndexMetaData.builder(orgReplica.indexSettings().getIndexMetaData()); builder.settings(Settings.builder().put(orgReplica.indexSettings().getSettings()) .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1") - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1")); + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1") + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0)); orgReplica.indexSettings().updateIndexMetaData(builder.build()); orgReplica.onSettingsChanged(); translogOps = 5; // 4 ops + seqno gaps (delete #1 is removed but index #0 will be replayed). } else { - logger.info("--> flushing shard (translog will be retained)"); + logger.info("--> flushing shard (translog/soft-deletes will be retained)"); translogOps = 6; // 5 ops + seqno gaps } flushShard(orgReplica); @@ -167,7 +169,7 @@ public void testRecoveryWithOutOfOrderDelete() throws Exception { shards.recoverReplica(newReplica); shards.assertAllEqual(3); - assertThat(newReplica.estimateTranslogOperationsFromMinSeq(0), equalTo(translogOps)); + assertThat(getTranslog(newReplica).totalOperations(), equalTo(translogOps)); } } @@ -219,7 +221,7 @@ public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception { shards.recoverReplica(newReplica); // file based recovery should be made assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty())); - assertThat(newReplica.estimateTranslogOperationsFromMinSeq(0), equalTo(numDocs)); + assertThat(getTranslog(newReplica).totalOperations(), equalTo(numDocs)); // history uuid was restored assertThat(newReplica.getHistoryUUID(), equalTo(historyUUID)); @@ -323,7 +325,7 @@ public void testShouldFlushAfterPeerRecovery() throws Exception { shards.recoverReplica(replica); // Make sure the flushing will eventually be completed (eg. `shouldPeriodicallyFlush` is false) assertBusy(() -> assertThat(getEngine(replica).shouldPeriodicallyFlush(), equalTo(false))); - assertThat(replica.estimateTranslogOperationsFromMinSeq(0), equalTo(numDocs)); + assertThat(getTranslog(replica).totalOperations(), equalTo(numDocs)); shards.assertAllEqual(numDocs); } } diff --git a/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java b/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java index 8c1b5a8691ba3..7cb39e280370d 100644 --- a/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java +++ b/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java @@ -1070,8 +1070,12 @@ public void testFilterCacheStats() throws Exception { assertEquals(DocWriteResponse.Result.DELETED, client().prepareDelete("index", "type", "1").get().getResult()); assertEquals(DocWriteResponse.Result.DELETED, client().prepareDelete("index", "type", "2").get().getResult()); + // Here we are testing that a fully deleted segment should be dropped and its cached is evicted. + // In order to instruct the merge policy not to keep a fully deleted segment, + // we need to flush and make that commit safe so that the SoftDeletesPolicy can drop everything. if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) { - persistGlobalCheckpoint("index"); // Need to persist the global checkpoint for the soft-deletes retention MP. + persistGlobalCheckpoint("index"); + flush("index"); } refresh(); response = client().admin().indices().prepareStats("index").setQueryCache(true).get(); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 206ea28ea05d4..a149ad3ef8d44 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -850,10 +850,15 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e final long globalCheckpoint = EngineTestCase.getTranslog(engine).getLastSyncedGlobalCheckpoint(); final long retainedOps = engine.config().getIndexSettings().getSoftDeleteRetentionOperations(); long maxSeqNo = Math.max(0, ((InternalEngine)engine).getLocalCheckpointTracker().getMaxSeqNo()); + final long seqNoForRecovery; + try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) { + seqNoForRecovery = Long.parseLong(safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; + } + final long minSeqNoToRetain = Math.min(seqNoForRecovery, globalCheckpoint + 1 - retainedOps); for (Translog.Operation translogOp : translogOps.values()) { final Translog.Operation luceneOp = luceneOps.get(translogOp.seqNo()); if (luceneOp == null) { - if (globalCheckpoint + 1 - retainedOps <= translogOp.seqNo() && translogOp.seqNo() <= maxSeqNo) { + if (minSeqNoToRetain <= translogOp.seqNo() && translogOp.seqNo() <= maxSeqNo) { fail("Operation not found seq# [" + translogOp.seqNo() + "], global checkpoint [" + globalCheckpoint + "], " + "retention policy [" + retainedOps + "], maxSeqNo [" + maxSeqNo + "], translog op [" + translogOp + "]"); } else { diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 70e8303043713..b9838968ad9b3 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -100,7 +100,11 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase private final Map indexMapping = Collections.singletonMap("type", "{ \"type\": {} }"); protected ReplicationGroup createGroup(int replicas) throws IOException { - IndexMetaData metaData = buildIndexMetaData(replicas); + return createGroup(replicas, Settings.EMPTY); + } + + protected ReplicationGroup createGroup(int replicas, Settings settings) throws IOException { + IndexMetaData metaData = buildIndexMetaData(replicas, settings, indexMapping); return new ReplicationGroup(metaData); } @@ -109,9 +113,14 @@ protected IndexMetaData buildIndexMetaData(int replicas) throws IOException { } protected IndexMetaData buildIndexMetaData(int replicas, Map mappings) throws IOException { + return buildIndexMetaData(replicas, Settings.EMPTY, mappings); + } + + protected IndexMetaData buildIndexMetaData(int replicas, Settings indexSettings, Map mappings) throws IOException { Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, replicas) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(indexSettings) .build(); IndexMetaData.Builder metaData = IndexMetaData.builder(index.getName()) .settings(settings) @@ -203,6 +212,18 @@ public BulkItemResponse index(IndexRequest indexRequest) throws Exception { return listener.get(); } + public BulkItemResponse delete(DeleteRequest deleteRequest) throws Exception { + PlainActionFuture listener = new PlainActionFuture<>(); + final ActionListener wrapBulkListener = ActionListener.wrap( + bulkShardResponse -> listener.onResponse(bulkShardResponse.getResponses()[0]), + listener::onFailure); + BulkItemRequest[] items = new BulkItemRequest[1]; + items[0] = new BulkItemRequest(0, deleteRequest); + BulkShardRequest request = new BulkShardRequest(shardId, deleteRequest.getRefreshPolicy(), items); + new IndexingAction(request, wrapBulkListener, this).execute(); + return listener.get(); + } + public synchronized void startAll() throws IOException { startReplicas(replicas.size()); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 6badc4fc9a1a8..1181dbec2fc56 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -610,11 +610,14 @@ protected void updateMappings(IndexShard shard, IndexMetaData indexMetadata) { } protected Engine.DeleteResult deleteDoc(IndexShard shard, String type, String id) throws IOException { + final Engine.DeleteResult result; if (shard.routingEntry().primary()) { - return shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, type, id, VersionType.INTERNAL); + result = shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, type, id, VersionType.INTERNAL); + shard.updateLocalCheckpointForShard(shard.routingEntry().allocationId().getId(), shard.getEngine().getLocalCheckpoint()); } else { - return shard.applyDeleteOperationOnReplica(shard.seqNoStats().getMaxSeqNo() + 1, 0L, type, id, VersionType.EXTERNAL); + result = shard.applyDeleteOperationOnReplica(shard.seqNoStats().getMaxSeqNo() + 1, 0L, type, id, VersionType.EXTERNAL); } + return result; } protected void flushShard(IndexShard shard) {